Files
stack/apps/coordinator/src/main.py
Jason Woltje d6c6af10d9 feat(#372): track orchestrator agent task completions via telemetry
- Instrument Coordinator.process_queue() with timing and telemetry events
- Instrument OrchestrationLoop.process_next_issue() with quality gate tracking
- Add agent-to-telemetry mapping (model, provider, harness per agent name)
- Map difficulty levels to Complexity enum and gate names to QualityGate enum
- Track retry counts per issue (increment on failure, clear on success)
- Emit FAILURE outcome on agent spawn failure or quality gate rejection
- Non-blocking: telemetry errors are logged and swallowed, never delay tasks
- Pass telemetry client from FastAPI lifespan to Coordinator constructor
- Add 33 unit tests covering all telemetry scenarios

Refs #372

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 02:10:22 -06:00

224 lines
6.8 KiB
Python

"""FastAPI application for mosaic-coordinator webhook receiver."""
import asyncio
import logging
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from fastapi import FastAPI
from mosaicstack_telemetry import TelemetryClient # type: ignore[import-untyped]
from pydantic import BaseModel
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from starlette.requests import Request
from starlette.responses import Response
from .config import settings
from .coordinator import Coordinator
from .mosaic_telemetry import create_telemetry_config
from .queue import QueueManager
from .telemetry import TelemetryService, shutdown_telemetry
from .webhook import router as webhook_router
# Configure logging
def setup_logging() -> None:
"""Configure logging for the application."""
log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Setup logging on module import
setup_logging()
logger = logging.getLogger(__name__)
# Global instances for application state
_coordinator: Coordinator | None = None
_coordinator_task: asyncio.Task[None] | None = None
def get_coordinator() -> Coordinator | None:
"""Get the global coordinator instance.
Returns:
The Coordinator instance if initialized, None otherwise
"""
return _coordinator
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
"""Application lifespan manager.
Handles startup and shutdown logic including coordinator lifecycle.
Yields:
State dict with shared resources
"""
global _coordinator, _coordinator_task
# Startup
logger.info("Starting mosaic-coordinator webhook receiver")
logger.info(f"Gitea URL: {settings.gitea_url}")
logger.info(f"Log level: {settings.log_level}")
logger.info(f"Server: {settings.host}:{settings.port}")
# Initialize OpenTelemetry if enabled
telemetry_enabled = os.getenv("OTEL_ENABLED", "true").lower() != "false"
if telemetry_enabled:
telemetry_service = TelemetryService()
telemetry_service.initialize()
logger.info("OpenTelemetry telemetry initialized")
# Initialize Mosaic telemetry client
mosaic_telemetry_config = create_telemetry_config()
mosaic_telemetry_client: TelemetryClient | None = None
if mosaic_telemetry_config.enabled:
mosaic_telemetry_client = TelemetryClient(mosaic_telemetry_config)
await mosaic_telemetry_client.start_async()
app.state.mosaic_telemetry = mosaic_telemetry_client
logger.info("Mosaic telemetry client started")
else:
app.state.mosaic_telemetry = None
logger.info("Mosaic telemetry disabled via configuration")
# Initialize queue manager
queue_file = Path("queue.json")
queue_manager = QueueManager(queue_file=queue_file)
logger.info(f"Queue manager initialized (file: {queue_file})")
# Initialize and start coordinator if enabled
if settings.coordinator_enabled:
_coordinator = Coordinator(
queue_manager=queue_manager,
poll_interval=settings.coordinator_poll_interval,
telemetry_client=mosaic_telemetry_client,
instance_id=mosaic_telemetry_config.instance_id or "",
)
logger.info(
f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, "
f"max agents: {settings.coordinator_max_concurrent_agents})"
)
# Start coordinator in background
_coordinator_task = asyncio.create_task(_coordinator.start())
logger.info("Coordinator orchestration loop started")
else:
logger.info("Coordinator disabled via configuration")
yield {"queue_manager": queue_manager, "coordinator": _coordinator}
# Shutdown
logger.info("Shutting down mosaic-coordinator")
# Stop coordinator gracefully
if _coordinator is not None:
logger.info("Stopping coordinator...")
await _coordinator.stop()
if _coordinator_task is not None:
_coordinator_task.cancel()
try:
await _coordinator_task
except asyncio.CancelledError:
pass
logger.info("Coordinator stopped")
# Shutdown Mosaic telemetry client
if mosaic_telemetry_client is not None:
await mosaic_telemetry_client.stop_async()
logger.info("Mosaic telemetry client stopped")
# Shutdown OpenTelemetry
if telemetry_enabled:
shutdown_telemetry()
logger.info("OpenTelemetry telemetry shut down")
logger.info("Mosaic-coordinator shutdown complete")
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
# Create FastAPI application
app = FastAPI(
title="Mosaic Coordinator",
description="Webhook receiver for Gitea issue events",
version="0.0.1",
lifespan=lifespan,
)
# Instrument FastAPI with OpenTelemetry if enabled
if os.getenv("OTEL_ENABLED", "true").lower() != "false":
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
FastAPIInstrumentor.instrument_app(app)
logger.info("FastAPI instrumented with OpenTelemetry")
# Register rate limiter
app.state.limiter = limiter
def _rate_limit_handler(request: Request, exc: Exception) -> Response:
"""Wrapper for slowapi handler with Exception-compatible signature."""
if not isinstance(exc, RateLimitExceeded):
return Response(content="Rate limit error", status_code=429)
return _rate_limit_exceeded_handler(request, exc)
app.add_exception_handler(RateLimitExceeded, _rate_limit_handler)
class HealthResponse(BaseModel):
"""Health check response model."""
status: str
service: str
coordinator_running: bool = False
active_agents: int = 0
@app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""Health check endpoint.
Returns:
HealthResponse indicating service is healthy with coordinator status
"""
coordinator_running = False
active_agents = 0
if _coordinator is not None:
coordinator_running = _coordinator.is_running
active_agents = _coordinator.get_active_agent_count()
return HealthResponse(
status="healthy",
service="mosaic-coordinator",
coordinator_running=coordinator_running,
active_agents=active_agents,
)
# Include webhook router
app.include_router(webhook_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.main:app",
host=settings.host,
port=settings.port,
reload=True,
log_level=settings.log_level.lower(),
)