Files
stack/apps/coordinator/src/main.py
Jason Woltje 88953fc998 feat(#160): Implement basic orchestration loop
Implements the Coordinator class with main orchestration loop:
- Async loop architecture with configurable poll interval
- process_queue() method gets next ready issue and spawns agent (stub)
- Graceful shutdown handling with stop() method
- Error handling that allows loop to continue after failures
- Logging for all actions (start, stop, processing, errors)
- Integration with QueueManager from #159
- Active agent tracking for future agent management

Configuration settings added:
- COORDINATOR_POLL_INTERVAL (default: 5.0s)
- COORDINATOR_MAX_CONCURRENT_AGENTS (default: 10)
- COORDINATOR_ENABLED (default: true)

Tests: 27 new tests covering all acceptance criteria
Coverage: 92% overall (100% for coordinator.py)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 18:03:12 -06:00

161 lines
4.4 KiB
Python

"""FastAPI application for mosaic-coordinator webhook receiver."""
import asyncio
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
from .config import settings
from .coordinator import Coordinator
from .queue import QueueManager
from .webhook import router as webhook_router
# Configure logging
def setup_logging() -> None:
"""Configure logging for the application."""
log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Setup logging on module import
setup_logging()
logger = logging.getLogger(__name__)
# Global instances for application state
_coordinator: Coordinator | None = None
_coordinator_task: asyncio.Task[None] | None = None
def get_coordinator() -> Coordinator | None:
"""Get the global coordinator instance.
Returns:
The Coordinator instance if initialized, None otherwise
"""
return _coordinator
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
"""Application lifespan manager.
Handles startup and shutdown logic including coordinator lifecycle.
Yields:
State dict with shared resources
"""
global _coordinator, _coordinator_task
# Startup
logger.info("Starting mosaic-coordinator webhook receiver")
logger.info(f"Gitea URL: {settings.gitea_url}")
logger.info(f"Log level: {settings.log_level}")
logger.info(f"Server: {settings.host}:{settings.port}")
# Initialize queue manager
queue_file = Path("queue.json")
queue_manager = QueueManager(queue_file=queue_file)
logger.info(f"Queue manager initialized (file: {queue_file})")
# Initialize and start coordinator if enabled
if settings.coordinator_enabled:
_coordinator = Coordinator(
queue_manager=queue_manager,
poll_interval=settings.coordinator_poll_interval,
)
logger.info(
f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, "
f"max agents: {settings.coordinator_max_concurrent_agents})"
)
# Start coordinator in background
_coordinator_task = asyncio.create_task(_coordinator.start())
logger.info("Coordinator orchestration loop started")
else:
logger.info("Coordinator disabled via configuration")
yield {"queue_manager": queue_manager, "coordinator": _coordinator}
# Shutdown
logger.info("Shutting down mosaic-coordinator")
# Stop coordinator gracefully
if _coordinator is not None:
logger.info("Stopping coordinator...")
await _coordinator.stop()
if _coordinator_task is not None:
_coordinator_task.cancel()
try:
await _coordinator_task
except asyncio.CancelledError:
pass
logger.info("Coordinator stopped")
logger.info("Mosaic-coordinator shutdown complete")
# Create FastAPI application
app = FastAPI(
title="Mosaic Coordinator",
description="Webhook receiver for Gitea issue events",
version="0.0.1",
lifespan=lifespan,
)
class HealthResponse(BaseModel):
"""Health check response model."""
status: str
service: str
coordinator_running: bool = False
active_agents: int = 0
@app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""Health check endpoint.
Returns:
HealthResponse indicating service is healthy with coordinator status
"""
coordinator_running = False
active_agents = 0
if _coordinator is not None:
coordinator_running = _coordinator.is_running
active_agents = _coordinator.get_active_agent_count()
return HealthResponse(
status="healthy",
service="mosaic-coordinator",
coordinator_running=coordinator_running,
active_agents=active_agents,
)
# Include webhook router
app.include_router(webhook_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.main:app",
host=settings.host,
port=settings.port,
reload=True,
log_level=settings.log_level.lower(),
)