From 88953fc998897d7c97a658482262fb5319404d14 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 18:03:12 -0600 Subject: [PATCH] feat(#160): Implement basic orchestration loop Implements the Coordinator class with main orchestration loop: - Async loop architecture with configurable poll interval - process_queue() method gets next ready issue and spawns agent (stub) - Graceful shutdown handling with stop() method - Error handling that allows loop to continue after failures - Logging for all actions (start, stop, processing, errors) - Integration with QueueManager from #159 - Active agent tracking for future agent management Configuration settings added: - COORDINATOR_POLL_INTERVAL (default: 5.0s) - COORDINATOR_MAX_CONCURRENT_AGENTS (default: 10) - COORDINATOR_ENABLED (default: true) Tests: 27 new tests covering all acceptance criteria Coverage: 92% overall (100% for coordinator.py) Co-Authored-By: Claude Opus 4.5 --- apps/coordinator/.env.example | 5 + apps/coordinator/src/config.py | 5 + apps/coordinator/src/coordinator.py | 181 +++++ apps/coordinator/src/main.py | 91 ++- apps/coordinator/tests/conftest.py | 3 +- .../coordinator/tests/test_context_monitor.py | 7 +- apps/coordinator/tests/test_coordinator.py | 746 ++++++++++++++++++ apps/coordinator/tests/test_parser.py | 27 +- apps/coordinator/tests/test_security.py | 2 - 9 files changed, 1043 insertions(+), 24 deletions(-) create mode 100644 apps/coordinator/src/coordinator.py create mode 100644 apps/coordinator/tests/test_coordinator.py diff --git a/apps/coordinator/.env.example b/apps/coordinator/.env.example index 76637ee..a84a440 100644 --- a/apps/coordinator/.env.example +++ b/apps/coordinator/.env.example @@ -11,3 +11,8 @@ PORT=8000 # Logging LOG_LEVEL=info + +# Coordinator Configuration +COORDINATOR_POLL_INTERVAL=5.0 +COORDINATOR_MAX_CONCURRENT_AGENTS=10 +COORDINATOR_ENABLED=true diff --git a/apps/coordinator/src/config.py b/apps/coordinator/src/config.py index 0869e1e..dd47001 100644 --- a/apps/coordinator/src/config.py +++ b/apps/coordinator/src/config.py @@ -27,6 +27,11 @@ class Settings(BaseSettings): # Logging log_level: str = "info" + # Coordinator Configuration + coordinator_poll_interval: float = 5.0 + coordinator_max_concurrent_agents: int = 10 + coordinator_enabled: bool = True + def get_settings() -> Settings: """Get settings instance (lazy loaded).""" diff --git a/apps/coordinator/src/coordinator.py b/apps/coordinator/src/coordinator.py new file mode 100644 index 0000000..cd0d774 --- /dev/null +++ b/apps/coordinator/src/coordinator.py @@ -0,0 +1,181 @@ +"""Coordinator orchestration loop for processing issue queue.""" + +import asyncio +import logging +from typing import Any + +from src.queue import QueueItem, QueueManager + +logger = logging.getLogger(__name__) + + +class Coordinator: + """Main orchestration loop for processing the issue queue. + + The Coordinator is responsible for: + - Monitoring the queue for ready items + - Spawning agents to process issues (stub implementation for Phase 0) + - Marking items as complete when processing finishes + - Handling errors gracefully + - Supporting graceful shutdown + """ + + def __init__( + self, + queue_manager: QueueManager, + poll_interval: float = 5.0, + ) -> None: + """Initialize the Coordinator. + + Args: + queue_manager: QueueManager instance for queue operations + poll_interval: Seconds between queue polls (default: 5.0) + """ + self.queue_manager = queue_manager + self.poll_interval = poll_interval + self._running = False + self._stop_event: asyncio.Event | None = None + self._active_agents: dict[int, dict[str, Any]] = {} + + @property + def is_running(self) -> bool: + """Check if the coordinator is currently running. + + Returns: + True if the orchestration loop is running + """ + return self._running + + @property + def active_agents(self) -> dict[int, dict[str, Any]]: + """Get the dictionary of active agents. + + Returns: + Dictionary mapping issue numbers to agent info + """ + return self._active_agents + + def get_active_agent_count(self) -> int: + """Get the count of currently active agents. + + Returns: + Number of active agents + """ + return len(self._active_agents) + + async def start(self) -> None: + """Start the orchestration loop. + + Continuously processes the queue until stop() is called. + """ + self._running = True + self._stop_event = asyncio.Event() + logger.info("Coordinator started - beginning orchestration loop") + + try: + while self._running: + try: + await self.process_queue() + except Exception as e: + logger.error(f"Error in process_queue: {e}") + # Continue running despite errors + + # Wait for poll interval or stop signal + try: + await asyncio.wait_for( + self._stop_event.wait(), + timeout=self.poll_interval, + ) + # If we reach here, stop was requested + break + except TimeoutError: + # Normal timeout, continue polling + pass + + finally: + self._running = False + logger.info("Coordinator stopped") + + async def stop(self) -> None: + """Stop the orchestration loop gracefully. + + Signals the loop to stop and waits for current processing to complete. + This method is idempotent - can be called multiple times safely. + """ + logger.info("Coordinator stop requested") + self._running = False + if self._stop_event is not None: + self._stop_event.set() + + async def process_queue(self) -> QueueItem | None: + """Process the next ready item from the queue. + + Gets the next ready item, spawns an agent to process it, + and marks it complete on success. + + Returns: + The QueueItem that was processed, or None if queue is empty + """ + # Get next ready item + item = self.queue_manager.get_next_ready() + + if item is None: + logger.debug("No items in queue to process") + return None + + logger.info( + f"Processing issue #{item.issue_number} " + f"(agent: {item.metadata.assigned_agent}, " + f"difficulty: {item.metadata.difficulty})" + ) + + # Mark as in progress + self.queue_manager.mark_in_progress(item.issue_number) + + # Spawn agent (stub implementation) + try: + success = await self.spawn_agent(item) + + if success: + # Mark as complete + self.queue_manager.mark_complete(item.issue_number) + logger.info(f"Issue #{item.issue_number} completed successfully") + else: + logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress") + + except Exception as e: + logger.error(f"Error spawning agent for issue #{item.issue_number}: {e}") + # Item remains in progress on error + + return item + + async def spawn_agent(self, item: QueueItem) -> bool: + """Spawn an agent to process the given item. + + This is a stub implementation for Phase 0 that always succeeds. + Future phases will implement actual agent spawning. + + Args: + item: QueueItem containing issue details + + Returns: + True if agent completed successfully, False otherwise + """ + logger.info( + f"[STUB] Spawning {item.metadata.assigned_agent} agent " + f"for issue #{item.issue_number} " + f"(estimated context: {item.metadata.estimated_context} tokens)" + ) + + # Track the agent + self._active_agents[item.issue_number] = { + "agent_type": item.metadata.assigned_agent, + "issue_number": item.issue_number, + "status": "running", + } + + # Stub implementation: always succeed + # In future phases, this will actually spawn a Claude agent process + logger.info(f"[STUB] Agent completed for issue #{item.issue_number}") + + return True diff --git a/apps/coordinator/src/main.py b/apps/coordinator/src/main.py index ad0f6ac..75da040 100644 --- a/apps/coordinator/src/main.py +++ b/apps/coordinator/src/main.py @@ -1,13 +1,18 @@ """FastAPI application for mosaic-coordinator webhook receiver.""" +import asyncio import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any from fastapi import FastAPI from pydantic import BaseModel from .config import settings +from .coordinator import Coordinator +from .queue import QueueManager from .webhook import router as webhook_router @@ -26,24 +31,77 @@ def setup_logging() -> None: setup_logging() logger = logging.getLogger(__name__) +# Global instances for application state +_coordinator: Coordinator | None = None +_coordinator_task: asyncio.Task[None] | None = None + + +def get_coordinator() -> Coordinator | None: + """Get the global coordinator instance. + + Returns: + The Coordinator instance if initialized, None otherwise + """ + return _coordinator + @asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncIterator[None]: - """ - Application lifespan manager. +async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]: + """Application lifespan manager. - Handles startup and shutdown logic. + Handles startup and shutdown logic including coordinator lifecycle. + + Yields: + State dict with shared resources """ + global _coordinator, _coordinator_task + # Startup logger.info("Starting mosaic-coordinator webhook receiver") logger.info(f"Gitea URL: {settings.gitea_url}") logger.info(f"Log level: {settings.log_level}") logger.info(f"Server: {settings.host}:{settings.port}") - yield + # Initialize queue manager + queue_file = Path("queue.json") + queue_manager = QueueManager(queue_file=queue_file) + logger.info(f"Queue manager initialized (file: {queue_file})") + + # Initialize and start coordinator if enabled + if settings.coordinator_enabled: + _coordinator = Coordinator( + queue_manager=queue_manager, + poll_interval=settings.coordinator_poll_interval, + ) + logger.info( + f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, " + f"max agents: {settings.coordinator_max_concurrent_agents})" + ) + + # Start coordinator in background + _coordinator_task = asyncio.create_task(_coordinator.start()) + logger.info("Coordinator orchestration loop started") + else: + logger.info("Coordinator disabled via configuration") + + yield {"queue_manager": queue_manager, "coordinator": _coordinator} # Shutdown - logger.info("Shutting down mosaic-coordinator webhook receiver") + logger.info("Shutting down mosaic-coordinator") + + # Stop coordinator gracefully + if _coordinator is not None: + logger.info("Stopping coordinator...") + await _coordinator.stop() + if _coordinator_task is not None: + _coordinator_task.cancel() + try: + await _coordinator_task + except asyncio.CancelledError: + pass + logger.info("Coordinator stopped") + + logger.info("Mosaic-coordinator shutdown complete") # Create FastAPI application @@ -60,17 +118,30 @@ class HealthResponse(BaseModel): status: str service: str + coordinator_running: bool = False + active_agents: int = 0 @app.get("/health", response_model=HealthResponse) async def health_check() -> HealthResponse: - """ - Health check endpoint. + """Health check endpoint. Returns: - HealthResponse indicating service is healthy + HealthResponse indicating service is healthy with coordinator status """ - return HealthResponse(status="healthy", service="mosaic-coordinator") + coordinator_running = False + active_agents = 0 + + if _coordinator is not None: + coordinator_running = _coordinator.is_running + active_agents = _coordinator.get_active_agent_count() + + return HealthResponse( + status="healthy", + service="mosaic-coordinator", + coordinator_running=coordinator_running, + active_agents=active_agents, + ) # Include webhook router diff --git a/apps/coordinator/tests/conftest.py b/apps/coordinator/tests/conftest.py index f357f4d..897bce5 100644 --- a/apps/coordinator/tests/conftest.py +++ b/apps/coordinator/tests/conftest.py @@ -112,8 +112,9 @@ def client(webhook_secret: str, gitea_url: str, monkeypatch: pytest.MonkeyPatch) monkeypatch.setenv("LOG_LEVEL", "debug") # Force reload of settings - from src import config import importlib + + from src import config importlib.reload(config) # Import app after settings are configured diff --git a/apps/coordinator/tests/test_context_monitor.py b/apps/coordinator/tests/test_context_monitor.py index 38b9a32..b7e6f55 100644 --- a/apps/coordinator/tests/test_context_monitor.py +++ b/apps/coordinator/tests/test_context_monitor.py @@ -1,8 +1,7 @@ """Tests for context monitoring.""" import asyncio -from typing import Any -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock import pytest @@ -60,7 +59,9 @@ class TestContextMonitor: assert monitor.ROTATE_THRESHOLD == 0.95 @pytest.mark.asyncio - async def test_get_context_usage_api_call(self, monitor: ContextMonitor, mock_claude_api: AsyncMock) -> None: + async def test_get_context_usage_api_call( + self, monitor: ContextMonitor, mock_claude_api: AsyncMock + ) -> None: """Should call Claude API to get context usage.""" # Mock API response mock_claude_api.get_context_usage.return_value = { diff --git a/apps/coordinator/tests/test_coordinator.py b/apps/coordinator/tests/test_coordinator.py new file mode 100644 index 0000000..8c4de4d --- /dev/null +++ b/apps/coordinator/tests/test_coordinator.py @@ -0,0 +1,746 @@ +"""Tests for the Coordinator orchestration loop.""" + +import asyncio +import tempfile +from collections.abc import Generator +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from src.models import IssueMetadata +from src.queue import QueueItem, QueueItemStatus, QueueManager + + +class TestCoordinator: + """Tests for the Coordinator class.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + def test_coordinator_initialization(self, queue_manager: QueueManager) -> None: + """Test creating a Coordinator with required dependencies.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager) + + assert coordinator.queue_manager is queue_manager + assert coordinator.is_running is False + assert coordinator.poll_interval == 5.0 # Default poll interval + + def test_coordinator_custom_poll_interval(self, queue_manager: QueueManager) -> None: + """Test creating a Coordinator with custom poll interval.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=2.0) + + assert coordinator.poll_interval == 2.0 + + @pytest.mark.asyncio + async def test_process_queue_no_items(self, queue_manager: QueueManager) -> None: + """Test process_queue when queue is empty.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager) + + result = await coordinator.process_queue() + + # Should return None when no items to process + assert result is None + + @pytest.mark.asyncio + async def test_process_queue_gets_next_ready(self, queue_manager: QueueManager) -> None: + """Test process_queue gets the next ready item from queue.""" + from src.coordinator import Coordinator + + # Add items to queue + meta1 = IssueMetadata(assigned_agent="sonnet") + meta2 = IssueMetadata(assigned_agent="haiku") + queue_manager.enqueue(159, meta1) + queue_manager.enqueue(160, meta2) + + coordinator = Coordinator(queue_manager=queue_manager) + + result = await coordinator.process_queue() + + # Should return the first ready item (159) + assert result is not None + assert result.issue_number == 159 + + @pytest.mark.asyncio + async def test_process_queue_marks_item_in_progress( + self, queue_manager: QueueManager + ) -> None: + """Test process_queue marks the item as in_progress before spawning agent.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + status_during_spawn: QueueItemStatus | None = None + + original_spawn_agent = coordinator.spawn_agent + + async def capturing_spawn_agent(item: QueueItem) -> bool: + nonlocal status_during_spawn + # Capture status while agent is "running" + queue_item = queue_manager.get_item(159) + if queue_item: + status_during_spawn = queue_item.status + return await original_spawn_agent(item) + + coordinator.spawn_agent = capturing_spawn_agent # type: ignore[method-assign] + + await coordinator.process_queue() + + # Status during spawn should have been IN_PROGRESS + assert status_during_spawn == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_process_queue_spawns_agent_stub(self, queue_manager: QueueManager) -> None: + """Test process_queue calls spawn_agent (stub implementation).""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn: + mock_spawn.return_value = True + await coordinator.process_queue() + + mock_spawn.assert_called_once() + # Verify it was called with the correct item + call_args = mock_spawn.call_args[0] + assert call_args[0].issue_number == 159 + + @pytest.mark.asyncio + async def test_process_queue_marks_complete_on_success( + self, queue_manager: QueueManager + ) -> None: + """Test process_queue marks item complete after successful agent spawn.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn: + mock_spawn.return_value = True + await coordinator.process_queue() + + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED + + @pytest.mark.asyncio + async def test_process_queue_handles_agent_failure( + self, queue_manager: QueueManager + ) -> None: + """Test process_queue handles agent spawn failure gracefully.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn: + mock_spawn.return_value = False # Agent failed + await coordinator.process_queue() + + # Item should remain in progress (not completed) on failure + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_spawn_agent_stub_returns_true(self, queue_manager: QueueManager) -> None: + """Test spawn_agent stub implementation returns True.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + item = QueueItem(issue_number=159, metadata=meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + result = await coordinator.spawn_agent(item) + + # Stub always returns True + assert result is True + + @pytest.mark.asyncio + async def test_spawn_agent_logs_agent_type(self, queue_manager: QueueManager) -> None: + """Test spawn_agent logs the agent type being spawned.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="opus") + item = QueueItem(issue_number=159, metadata=meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + with patch("src.coordinator.logger") as mock_logger: + await coordinator.spawn_agent(item) + + # Should log that we're spawning an agent + mock_logger.info.assert_called() + call_str = str(mock_logger.info.call_args) + assert "159" in call_str or "opus" in call_str + + +class TestCoordinatorLoop: + """Tests for the Coordinator orchestration loop.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.mark.asyncio + async def test_start_begins_running(self, queue_manager: QueueManager) -> None: + """Test that start() sets is_running to True.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1) + + # Start in background + task = asyncio.create_task(coordinator.start()) + + # Give it a moment to start + await asyncio.sleep(0.05) + + assert coordinator.is_running is True + + # Cleanup + await coordinator.stop() + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_stop_halts_loop(self, queue_manager: QueueManager) -> None: + """Test that stop() halts the orchestration loop.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1) + + # Start and then stop + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.05) + + await coordinator.stop() + await asyncio.sleep(0.15) + + assert coordinator.is_running is False + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_loop_processes_queue_repeatedly(self, queue_manager: QueueManager) -> None: + """Test that the loop calls process_queue repeatedly.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + call_count = 0 + + original_process_queue = coordinator.process_queue + + async def counting_process_queue() -> QueueItem | None: + nonlocal call_count + call_count += 1 + return await original_process_queue() + + coordinator.process_queue = counting_process_queue # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.2) # Allow time for multiple iterations + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should have been called multiple times + assert call_count >= 2 + + @pytest.mark.asyncio + async def test_loop_respects_poll_interval(self, queue_manager: QueueManager) -> None: + """Test that the loop waits for poll_interval between iterations.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1) + timestamps: list[float] = [] + + original_process_queue = coordinator.process_queue + + async def tracking_process_queue() -> QueueItem | None: + timestamps.append(asyncio.get_event_loop().time()) + return await original_process_queue() + + coordinator.process_queue = tracking_process_queue # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.35) # Allow time for 3-4 iterations + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Check intervals between calls + if len(timestamps) >= 2: + for i in range(1, len(timestamps)): + interval = timestamps[i] - timestamps[i - 1] + # Should be approximately poll_interval (with some tolerance) + assert interval >= 0.08, f"Interval {interval} is too short" + assert interval <= 0.15, f"Interval {interval} is too long" + + +class TestCoordinatorErrorHandling: + """Tests for Coordinator error handling.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.mark.asyncio + async def test_loop_continues_after_process_queue_error( + self, queue_manager: QueueManager + ) -> None: + """Test that the loop continues running after process_queue raises an error.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + call_count = 0 + error_raised = False + + async def failing_process_queue() -> QueueItem | None: + nonlocal call_count, error_raised + call_count += 1 + if call_count == 1: + error_raised = True + raise RuntimeError("Simulated error") + return None + + coordinator.process_queue = failing_process_queue # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.2) + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should have continued after the error + assert error_raised is True + assert call_count >= 2 + + @pytest.mark.asyncio + async def test_error_is_logged(self, queue_manager: QueueManager) -> None: + """Test that errors are logged properly.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + + async def failing_process_queue() -> QueueItem | None: + raise RuntimeError("Test error message") + + coordinator.process_queue = failing_process_queue # type: ignore[method-assign] + + with patch("src.coordinator.logger") as mock_logger: + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.1) + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should have logged the error + mock_logger.error.assert_called() + + @pytest.mark.asyncio + async def test_spawn_agent_exception_handled(self, queue_manager: QueueManager) -> None: + """Test that exceptions in spawn_agent are handled gracefully.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn: + mock_spawn.side_effect = RuntimeError("Agent spawn failed") + + # Should not raise - error handled internally + await coordinator.process_queue() + + # Item should remain in progress + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + +class TestCoordinatorGracefulShutdown: + """Tests for Coordinator graceful shutdown.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.mark.asyncio + async def test_stop_is_idempotent(self, queue_manager: QueueManager) -> None: + """Test that stop() can be called multiple times safely.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1) + + # Call stop multiple times without starting + await coordinator.stop() + await coordinator.stop() + await coordinator.stop() + + # Should not raise any errors + assert coordinator.is_running is False + + @pytest.mark.asyncio + async def test_stop_waits_for_current_process(self, queue_manager: QueueManager) -> None: + """Test that stop() waits for current process_queue to complete.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.5) + processing_started = asyncio.Event() + processing_done = asyncio.Event() + + original_process_queue = coordinator.process_queue + + async def slow_process_queue() -> QueueItem | None: + processing_started.set() + await asyncio.sleep(0.2) # Simulate slow processing + result = await original_process_queue() + processing_done.set() + return result + + coordinator.process_queue = slow_process_queue # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + + # Wait for processing to start + await processing_started.wait() + + # Request stop while processing + stop_task = asyncio.create_task(coordinator.stop()) + + # Wait for both to complete + await asyncio.wait_for(processing_done.wait(), timeout=1.0) + await stop_task + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert coordinator.is_running is False + + @pytest.mark.asyncio + async def test_shutdown_logs_message(self, queue_manager: QueueManager) -> None: + """Test that shutdown logs appropriate messages.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1) + + with patch("src.coordinator.logger") as mock_logger: + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.05) + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should log startup and shutdown + info_calls = [str(call) for call in mock_logger.info.call_args_list] + assert any("start" in call.lower() or "stop" in call.lower() for call in info_calls) + + +class TestCoordinatorIntegration: + """Integration tests for Coordinator with QueueManager.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.mark.asyncio + async def test_processes_multiple_items_in_order( + self, queue_manager: QueueManager + ) -> None: + """Test that coordinator processes items in dependency order.""" + from src.coordinator import Coordinator + + # 158 blocks 159 + meta_158 = IssueMetadata(blocks=[159], blocked_by=[], assigned_agent="sonnet") + meta_159 = IssueMetadata(blocks=[], blocked_by=[158], assigned_agent="haiku") + + queue_manager.enqueue(158, meta_158) + queue_manager.enqueue(159, meta_159) + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + processed_items: list[int] = [] + + original_spawn_agent = coordinator.spawn_agent + + async def tracking_spawn_agent(item: QueueItem) -> bool: + processed_items.append(item.issue_number) + return await original_spawn_agent(item) + + coordinator.spawn_agent = tracking_spawn_agent # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.3) # Allow time for processing + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # 158 should be processed before 159 (dependency order) + assert 158 in processed_items + assert 159 in processed_items + assert processed_items.index(158) < processed_items.index(159) + + @pytest.mark.asyncio + async def test_completes_all_items_in_queue(self, queue_manager: QueueManager) -> None: + """Test that coordinator eventually completes all items.""" + from src.coordinator import Coordinator + + # Add multiple items without dependencies + for i in range(157, 162): + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(i, meta) + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.02) + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.5) # Allow time for processing + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # All items should be completed + for i in range(157, 162): + item = queue_manager.get_item(i) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED + + @pytest.mark.asyncio + async def test_skips_already_completed_items(self, queue_manager: QueueManager) -> None: + """Test that coordinator skips items already marked as completed.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + queue_manager.mark_complete(159) # Pre-complete it + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + spawn_count = 0 + + original_spawn_agent = coordinator.spawn_agent + + async def counting_spawn_agent(item: QueueItem) -> bool: + nonlocal spawn_count + spawn_count += 1 + return await original_spawn_agent(item) + + coordinator.spawn_agent = counting_spawn_agent # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.2) + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should not have spawned any agents (item already completed) + assert spawn_count == 0 + + @pytest.mark.asyncio + async def test_skips_in_progress_items(self, queue_manager: QueueManager) -> None: + """Test that coordinator skips items already in progress.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + queue_manager.mark_in_progress(159) # Pre-mark as in progress + + coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05) + spawn_count = 0 + + original_spawn_agent = coordinator.spawn_agent + + async def counting_spawn_agent(item: QueueItem) -> bool: + nonlocal spawn_count + spawn_count += 1 + return await original_spawn_agent(item) + + coordinator.spawn_agent = counting_spawn_agent # type: ignore[method-assign] + + task = asyncio.create_task(coordinator.start()) + await asyncio.sleep(0.2) + await coordinator.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should not have spawned any agents (item already in progress) + assert spawn_count == 0 + + +class TestCoordinatorActiveAgents: + """Tests for tracking active agents.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + def test_active_agents_initially_empty(self, queue_manager: QueueManager) -> None: + """Test that active_agents is empty on initialization.""" + from src.coordinator import Coordinator + + coordinator = Coordinator(queue_manager=queue_manager) + + assert coordinator.active_agents == {} + + @pytest.mark.asyncio + async def test_active_agents_tracks_spawned_agents( + self, queue_manager: QueueManager + ) -> None: + """Test that active_agents tracks agents as they are spawned.""" + from src.coordinator import Coordinator + + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(159, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + await coordinator.process_queue() + + # Agent should be tracked (stub stores issue number) + assert 159 in coordinator.active_agents + + @pytest.mark.asyncio + async def test_get_active_agent_count(self, queue_manager: QueueManager) -> None: + """Test getting count of active agents.""" + from src.coordinator import Coordinator + + for i in range(157, 160): + meta = IssueMetadata(assigned_agent="sonnet") + queue_manager.enqueue(i, meta) + + coordinator = Coordinator(queue_manager=queue_manager) + + # Process all items + await coordinator.process_queue() + await coordinator.process_queue() + await coordinator.process_queue() + + assert coordinator.get_active_agent_count() == 3 diff --git a/apps/coordinator/tests/test_parser.py b/apps/coordinator/tests/test_parser.py index 21634cf..32e76b8 100644 --- a/apps/coordinator/tests/test_parser.py +++ b/apps/coordinator/tests/test_parser.py @@ -1,13 +1,12 @@ """Tests for issue parser agent.""" -import os +from unittest.mock import Mock, patch + import pytest -from unittest.mock import Mock, patch, AsyncMock from anthropic import Anthropic from anthropic.types import Message, TextBlock, Usage -from src.parser import parse_issue_metadata, clear_cache -from src.models import IssueMetadata +from src.parser import clear_cache, parse_issue_metadata @pytest.fixture(autouse=True) @@ -88,7 +87,10 @@ def mock_anthropic_response() -> Message: content=[ TextBlock( type="text", - text='{"estimated_context": 46800, "difficulty": "medium", "assigned_agent": "sonnet", "blocks": [159], "blocked_by": [157]}' + text=( + '{"estimated_context": 46800, "difficulty": "medium", ' + '"assigned_agent": "sonnet", "blocks": [159], "blocked_by": [157]}' + ), ) ], model="claude-sonnet-4.5-20250929", @@ -107,7 +109,10 @@ def mock_anthropic_minimal_response() -> Message: content=[ TextBlock( type="text", - text='{"estimated_context": 50000, "difficulty": "medium", "assigned_agent": "sonnet", "blocks": [], "blocked_by": []}' + text=( + '{"estimated_context": 50000, "difficulty": "medium", ' + '"assigned_agent": "sonnet", "blocks": [], "blocked_by": []}' + ), ) ], model="claude-sonnet-4.5-20250929", @@ -306,7 +311,10 @@ class TestParseIssueMetadata: content=[ TextBlock( type="text", - text='{"estimated_context": 10000, "difficulty": "invalid", "assigned_agent": "sonnet", "blocks": [], "blocked_by": []}' + text=( + '{"estimated_context": 10000, "difficulty": "invalid", ' + '"assigned_agent": "sonnet", "blocks": [], "blocked_by": []}' + ), ) ], model="claude-sonnet-4.5-20250929", @@ -341,7 +349,10 @@ class TestParseIssueMetadata: content=[ TextBlock( type="text", - text='{"estimated_context": 10000, "difficulty": "medium", "assigned_agent": "invalid_agent", "blocks": [], "blocked_by": []}' + text=( + '{"estimated_context": 10000, "difficulty": "medium", ' + '"assigned_agent": "invalid_agent", "blocks": [], "blocked_by": []}' + ), ) ], model="claude-sonnet-4.5-20250929", diff --git a/apps/coordinator/tests/test_security.py b/apps/coordinator/tests/test_security.py index 664e52d..054fdc3 100644 --- a/apps/coordinator/tests/test_security.py +++ b/apps/coordinator/tests/test_security.py @@ -3,8 +3,6 @@ import hmac import json -import pytest - class TestSignatureVerification: """Test suite for HMAC SHA256 signature verification."""