- Instrument Coordinator.process_queue() with timing and telemetry events - Instrument OrchestrationLoop.process_next_issue() with quality gate tracking - Add agent-to-telemetry mapping (model, provider, harness per agent name) - Map difficulty levels to Complexity enum and gate names to QualityGate enum - Track retry counts per issue (increment on failure, clear on success) - Emit FAILURE outcome on agent spawn failure or quality gate rejection - Non-blocking: telemetry errors are logged and swallowed, never delay tasks - Pass telemetry client from FastAPI lifespan to Coordinator constructor - Add 33 unit tests covering all telemetry scenarios Refs #372 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
224 lines
6.8 KiB
Python
224 lines
6.8 KiB
Python
"""FastAPI application for mosaic-coordinator webhook receiver."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from collections.abc import AsyncIterator
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI
|
|
from mosaicstack_telemetry import TelemetryClient # type: ignore[import-untyped]
|
|
from pydantic import BaseModel
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
|
from slowapi.errors import RateLimitExceeded
|
|
from slowapi.util import get_remote_address
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response
|
|
|
|
from .config import settings
|
|
from .coordinator import Coordinator
|
|
from .mosaic_telemetry import create_telemetry_config
|
|
from .queue import QueueManager
|
|
from .telemetry import TelemetryService, shutdown_telemetry
|
|
from .webhook import router as webhook_router
|
|
|
|
|
|
# Configure logging
|
|
def setup_logging() -> None:
|
|
"""Configure logging for the application."""
|
|
log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
|
|
# Setup logging on module import
|
|
setup_logging()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global instances for application state
|
|
_coordinator: Coordinator | None = None
|
|
_coordinator_task: asyncio.Task[None] | None = None
|
|
|
|
|
|
def get_coordinator() -> Coordinator | None:
|
|
"""Get the global coordinator instance.
|
|
|
|
Returns:
|
|
The Coordinator instance if initialized, None otherwise
|
|
"""
|
|
return _coordinator
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
|
|
"""Application lifespan manager.
|
|
|
|
Handles startup and shutdown logic including coordinator lifecycle.
|
|
|
|
Yields:
|
|
State dict with shared resources
|
|
"""
|
|
global _coordinator, _coordinator_task
|
|
|
|
# Startup
|
|
logger.info("Starting mosaic-coordinator webhook receiver")
|
|
logger.info(f"Gitea URL: {settings.gitea_url}")
|
|
logger.info(f"Log level: {settings.log_level}")
|
|
logger.info(f"Server: {settings.host}:{settings.port}")
|
|
|
|
# Initialize OpenTelemetry if enabled
|
|
telemetry_enabled = os.getenv("OTEL_ENABLED", "true").lower() != "false"
|
|
if telemetry_enabled:
|
|
telemetry_service = TelemetryService()
|
|
telemetry_service.initialize()
|
|
logger.info("OpenTelemetry telemetry initialized")
|
|
|
|
# Initialize Mosaic telemetry client
|
|
mosaic_telemetry_config = create_telemetry_config()
|
|
mosaic_telemetry_client: TelemetryClient | None = None
|
|
if mosaic_telemetry_config.enabled:
|
|
mosaic_telemetry_client = TelemetryClient(mosaic_telemetry_config)
|
|
await mosaic_telemetry_client.start_async()
|
|
app.state.mosaic_telemetry = mosaic_telemetry_client
|
|
logger.info("Mosaic telemetry client started")
|
|
else:
|
|
app.state.mosaic_telemetry = None
|
|
logger.info("Mosaic telemetry disabled via configuration")
|
|
|
|
# Initialize queue manager
|
|
queue_file = Path("queue.json")
|
|
queue_manager = QueueManager(queue_file=queue_file)
|
|
logger.info(f"Queue manager initialized (file: {queue_file})")
|
|
|
|
# Initialize and start coordinator if enabled
|
|
if settings.coordinator_enabled:
|
|
_coordinator = Coordinator(
|
|
queue_manager=queue_manager,
|
|
poll_interval=settings.coordinator_poll_interval,
|
|
telemetry_client=mosaic_telemetry_client,
|
|
instance_id=mosaic_telemetry_config.instance_id or "",
|
|
)
|
|
logger.info(
|
|
f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, "
|
|
f"max agents: {settings.coordinator_max_concurrent_agents})"
|
|
)
|
|
|
|
# Start coordinator in background
|
|
_coordinator_task = asyncio.create_task(_coordinator.start())
|
|
logger.info("Coordinator orchestration loop started")
|
|
else:
|
|
logger.info("Coordinator disabled via configuration")
|
|
|
|
yield {"queue_manager": queue_manager, "coordinator": _coordinator}
|
|
|
|
# Shutdown
|
|
logger.info("Shutting down mosaic-coordinator")
|
|
|
|
# Stop coordinator gracefully
|
|
if _coordinator is not None:
|
|
logger.info("Stopping coordinator...")
|
|
await _coordinator.stop()
|
|
if _coordinator_task is not None:
|
|
_coordinator_task.cancel()
|
|
try:
|
|
await _coordinator_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Coordinator stopped")
|
|
|
|
# Shutdown Mosaic telemetry client
|
|
if mosaic_telemetry_client is not None:
|
|
await mosaic_telemetry_client.stop_async()
|
|
logger.info("Mosaic telemetry client stopped")
|
|
|
|
# Shutdown OpenTelemetry
|
|
if telemetry_enabled:
|
|
shutdown_telemetry()
|
|
logger.info("OpenTelemetry telemetry shut down")
|
|
|
|
logger.info("Mosaic-coordinator shutdown complete")
|
|
|
|
|
|
# Initialize rate limiter
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
# Create FastAPI application
|
|
app = FastAPI(
|
|
title="Mosaic Coordinator",
|
|
description="Webhook receiver for Gitea issue events",
|
|
version="0.0.1",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# Instrument FastAPI with OpenTelemetry if enabled
|
|
if os.getenv("OTEL_ENABLED", "true").lower() != "false":
|
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
|
|
|
FastAPIInstrumentor.instrument_app(app)
|
|
logger.info("FastAPI instrumented with OpenTelemetry")
|
|
|
|
# Register rate limiter
|
|
app.state.limiter = limiter
|
|
|
|
|
|
def _rate_limit_handler(request: Request, exc: Exception) -> Response:
|
|
"""Wrapper for slowapi handler with Exception-compatible signature."""
|
|
if not isinstance(exc, RateLimitExceeded):
|
|
return Response(content="Rate limit error", status_code=429)
|
|
return _rate_limit_exceeded_handler(request, exc)
|
|
|
|
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_handler)
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response model."""
|
|
|
|
status: str
|
|
service: str
|
|
coordinator_running: bool = False
|
|
active_agents: int = 0
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check() -> HealthResponse:
|
|
"""Health check endpoint.
|
|
|
|
Returns:
|
|
HealthResponse indicating service is healthy with coordinator status
|
|
"""
|
|
coordinator_running = False
|
|
active_agents = 0
|
|
|
|
if _coordinator is not None:
|
|
coordinator_running = _coordinator.is_running
|
|
active_agents = _coordinator.get_active_agent_count()
|
|
|
|
return HealthResponse(
|
|
status="healthy",
|
|
service="mosaic-coordinator",
|
|
coordinator_running=coordinator_running,
|
|
active_agents=active_agents,
|
|
)
|
|
|
|
|
|
# Include webhook router
|
|
app.include_router(webhook_router)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"src.main:app",
|
|
host=settings.host,
|
|
port=settings.port,
|
|
reload=True,
|
|
log_level=settings.log_level.lower(),
|
|
)
|