diff --git a/apps/coordinator/src/coordinator.py b/apps/coordinator/src/coordinator.py index cd0d774..02b583a 100644 --- a/apps/coordinator/src/coordinator.py +++ b/apps/coordinator/src/coordinator.py @@ -2,10 +2,17 @@ import asyncio import logging -from typing import Any +from typing import TYPE_CHECKING, Any +from src.context_monitor import ContextMonitor +from src.forced_continuation import ForcedContinuationService +from src.models import ContextAction +from src.quality_orchestrator import QualityOrchestrator, VerificationResult from src.queue import QueueItem, QueueManager +if TYPE_CHECKING: + pass + logger = logging.getLogger(__name__) @@ -179,3 +186,329 @@ class Coordinator: logger.info(f"[STUB] Agent completed for issue #{item.issue_number}") return True + + +class OrchestrationLoop: + """Advanced orchestration loop integrating all coordinator components. + + The OrchestrationLoop coordinates: + - Issue queue processing with priority sorting + - Agent assignment using 50% rule + - Quality gate verification on completion claims + - Rejection handling with forced continuation prompts + - Context monitoring during agent execution + """ + + def __init__( + self, + queue_manager: QueueManager, + quality_orchestrator: QualityOrchestrator, + continuation_service: ForcedContinuationService, + context_monitor: ContextMonitor, + poll_interval: float = 5.0, + ) -> None: + """Initialize the OrchestrationLoop. + + Args: + queue_manager: QueueManager instance for queue operations + quality_orchestrator: QualityOrchestrator for running quality gates + continuation_service: ForcedContinuationService for rejection prompts + context_monitor: ContextMonitor for tracking agent context usage + poll_interval: Seconds between queue polls (default: 5.0) + """ + self.queue_manager = queue_manager + self.quality_orchestrator = quality_orchestrator + self.continuation_service = continuation_service + self.context_monitor = context_monitor + self.poll_interval = poll_interval + self._running = False + self._stop_event: asyncio.Event | None = None + self._active_agents: dict[int, dict[str, Any]] = {} + + # Metrics tracking + self._processed_count = 0 + self._success_count = 0 + self._rejection_count = 0 + + @property + def is_running(self) -> bool: + """Check if the orchestration loop is currently running. + + Returns: + True if the orchestration loop is running + """ + return self._running + + @property + def active_agents(self) -> dict[int, dict[str, Any]]: + """Get the dictionary of active agents. + + Returns: + Dictionary mapping issue numbers to agent info + """ + return self._active_agents + + @property + def processed_count(self) -> int: + """Get total number of processed issues. + + Returns: + Number of issues processed + """ + return self._processed_count + + @property + def success_count(self) -> int: + """Get number of successfully completed issues. + + Returns: + Number of issues that passed quality gates + """ + return self._success_count + + @property + def rejection_count(self) -> int: + """Get number of rejected issues (failed quality gates). + + Returns: + Number of issues that failed quality gates + """ + return self._rejection_count + + def get_active_agent_count(self) -> int: + """Get the count of currently active agents. + + Returns: + Number of active agents + """ + return len(self._active_agents) + + async def start(self) -> None: + """Start the orchestration loop. + + Continuously processes the queue until stop() is called. + """ + self._running = True + self._stop_event = asyncio.Event() + logger.info("OrchestrationLoop started - beginning orchestration") + + try: + while self._running: + try: + await self.process_next_issue() + except Exception as e: + logger.error(f"Error in process_next_issue: {e}") + # Continue running despite errors + + # Wait for poll interval or stop signal + try: + await asyncio.wait_for( + self._stop_event.wait(), + timeout=self.poll_interval, + ) + # If we reach here, stop was requested + break + except TimeoutError: + # Normal timeout, continue polling + pass + + finally: + self._running = False + logger.info("OrchestrationLoop stopped") + + async def stop(self) -> None: + """Stop the orchestration loop gracefully. + + Signals the loop to stop and waits for current processing to complete. + This method is idempotent - can be called multiple times safely. + """ + logger.info("OrchestrationLoop stop requested") + self._running = False + if self._stop_event is not None: + self._stop_event.set() + + async def process_next_issue(self) -> QueueItem | None: + """Process the next ready issue from the queue. + + This method: + 1. Gets the next ready item (priority sorted) + 2. Marks it as in progress + 3. Spawns an agent to process it + 4. Runs quality gates on completion + 5. Handles rejection with forced continuation or marks complete + + Returns: + The QueueItem that was processed, or None if queue is empty + """ + # Get next ready item + item = self.queue_manager.get_next_ready() + + if item is None: + logger.debug("No items in queue to process") + return None + + logger.info( + f"Processing issue #{item.issue_number} " + f"(agent: {item.metadata.assigned_agent}, " + f"difficulty: {item.metadata.difficulty}, " + f"context: {item.metadata.estimated_context} tokens)" + ) + + # Mark as in progress + self.queue_manager.mark_in_progress(item.issue_number) + self._processed_count += 1 + + # Track the agent + agent_id = f"agent-{item.issue_number}" + self._active_agents[item.issue_number] = { + "agent_type": item.metadata.assigned_agent, + "issue_number": item.issue_number, + "agent_id": agent_id, + "status": "running", + } + + try: + # Spawn agent (stub implementation) + agent_success = await self._spawn_agent(item) + + if not agent_success: + logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress") + return item + + # Check context usage (stub - no real monitoring in Phase 0) + await self._check_context(agent_id) + + # Run quality gates on completion + verification = await self._verify_quality(item) + + if verification.all_passed: + # All gates passed - mark as complete + self.queue_manager.mark_complete(item.issue_number) + self._success_count += 1 + logger.info( + f"Issue #{item.issue_number} completed successfully - all gates passed" + ) + else: + # Gates failed - generate continuation prompt + self._rejection_count += 1 + await self._handle_rejection(item, verification) + + except Exception as e: + logger.error(f"Error processing issue #{item.issue_number}: {e}") + # Item remains in progress on error + + return item + + async def _spawn_agent(self, item: QueueItem) -> bool: + """Spawn an agent to process the given item. + + This is a stub implementation for Phase 0 that always succeeds. + Future phases will implement actual agent spawning. + + Args: + item: QueueItem containing issue details + + Returns: + True if agent completed successfully, False otherwise + """ + logger.info( + f"[STUB] Spawning {item.metadata.assigned_agent} agent " + f"for issue #{item.issue_number} " + f"(estimated context: {item.metadata.estimated_context} tokens)" + ) + + # Stub implementation: always succeed + logger.info(f"[STUB] Agent completed for issue #{item.issue_number}") + + return True + + async def _check_context(self, agent_id: str) -> ContextAction: + """Check context usage and determine action. + + Args: + agent_id: Unique identifier for the agent + + Returns: + ContextAction based on usage thresholds + """ + try: + action = await self.context_monitor.determine_action(agent_id) + + if action == ContextAction.COMPACT: + logger.info(f"Agent {agent_id}: Context at 80%, compaction recommended") + elif action == ContextAction.ROTATE_SESSION: + logger.warning(f"Agent {agent_id}: Context at 95%, session rotation needed") + + return action + except Exception as e: + logger.error(f"Error checking context for {agent_id}: {e}") + return ContextAction.CONTINUE + + async def _verify_quality(self, item: QueueItem) -> VerificationResult: + """Run quality gates to verify completion. + + Args: + item: QueueItem that claims completion + + Returns: + VerificationResult from quality orchestrator + """ + logger.info(f"Running quality gates for issue #{item.issue_number}") + + try: + result = await self.quality_orchestrator.verify_completion() + + if result.all_passed: + logger.info(f"Issue #{item.issue_number}: All quality gates passed") + else: + failed_gates = [ + name for name, r in result.gate_results.items() if not r.passed + ] + logger.warning( + f"Issue #{item.issue_number}: Quality gates failed: {failed_gates}" + ) + + return result + + except Exception as e: + logger.error(f"Error verifying quality for issue #{item.issue_number}: {e}") + # Return a failure result on error + from src.gates.quality_gate import GateResult + return VerificationResult( + all_passed=False, + gate_results={ + "error": GateResult( + passed=False, + message=f"Quality verification error: {e}", + details={"error": str(e)}, + ) + }, + ) + + async def _handle_rejection( + self, item: QueueItem, verification: VerificationResult + ) -> None: + """Handle quality gate rejection by generating continuation prompt. + + Args: + item: QueueItem that failed quality gates + verification: VerificationResult with failure details + """ + logger.info(f"Generating forced continuation for issue #{item.issue_number}") + + try: + prompt = self.continuation_service.generate_prompt(verification) + logger.info( + f"Issue #{item.issue_number}: Forced continuation generated " + f"({len(prompt)} chars)" + ) + + # Update agent status + if item.issue_number in self._active_agents: + self._active_agents[item.issue_number]["status"] = "needs_continuation" + self._active_agents[item.issue_number]["continuation_prompt"] = prompt + + except Exception as e: + logger.error( + f"Error generating continuation for issue #{item.issue_number}: {e}" + )