From eba04fb264e6ffd69eb2535efc11190eb88e92e5 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 20:22:00 -0600 Subject: [PATCH] feat(#150): Implement OrchestrationLoop class (TDD - GREEN phase) Implement the main orchestration loop that coordinates all components: - Queue processing with priority sorting (issues by number) - Integration with ContextMonitor for tracking agent context usage - Integration with QualityOrchestrator for running quality gates - Integration with ForcedContinuationService for rejection prompts - Metrics tracking (processed_count, success_count, rejection_count) - Graceful start/stop with proper lifecycle management - Error handling at all levels (spawn, context, quality, continuation) The OrchestrationLoop flow: 1. Read issue queue (priority sorted by issue number) 2. Mark issue as in progress 3. Spawn agent (stub implementation for Phase 0) 4. Check context usage via ContextMonitor 5. Run quality gates via QualityOrchestrator 6. On approval: mark complete, increment success count 7. On rejection: generate continuation prompt, increment rejection count 99% test coverage for coordinator.py (183 statements, 2 missed). Co-Authored-By: Claude Opus 4.5 --- apps/coordinator/src/coordinator.py | 335 +++++++++++++++++++++++++++- 1 file changed, 334 insertions(+), 1 deletion(-) diff --git a/apps/coordinator/src/coordinator.py b/apps/coordinator/src/coordinator.py index cd0d774..02b583a 100644 --- a/apps/coordinator/src/coordinator.py +++ b/apps/coordinator/src/coordinator.py @@ -2,10 +2,17 @@ import asyncio import logging -from typing import Any +from typing import TYPE_CHECKING, Any +from src.context_monitor import ContextMonitor +from src.forced_continuation import ForcedContinuationService +from src.models import ContextAction +from src.quality_orchestrator import QualityOrchestrator, VerificationResult from src.queue import QueueItem, QueueManager +if TYPE_CHECKING: + pass + logger = logging.getLogger(__name__) @@ -179,3 +186,329 @@ class Coordinator: logger.info(f"[STUB] Agent completed for issue #{item.issue_number}") return True + + +class OrchestrationLoop: + """Advanced orchestration loop integrating all coordinator components. + + The OrchestrationLoop coordinates: + - Issue queue processing with priority sorting + - Agent assignment using 50% rule + - Quality gate verification on completion claims + - Rejection handling with forced continuation prompts + - Context monitoring during agent execution + """ + + def __init__( + self, + queue_manager: QueueManager, + quality_orchestrator: QualityOrchestrator, + continuation_service: ForcedContinuationService, + context_monitor: ContextMonitor, + poll_interval: float = 5.0, + ) -> None: + """Initialize the OrchestrationLoop. + + Args: + queue_manager: QueueManager instance for queue operations + quality_orchestrator: QualityOrchestrator for running quality gates + continuation_service: ForcedContinuationService for rejection prompts + context_monitor: ContextMonitor for tracking agent context usage + poll_interval: Seconds between queue polls (default: 5.0) + """ + self.queue_manager = queue_manager + self.quality_orchestrator = quality_orchestrator + self.continuation_service = continuation_service + self.context_monitor = context_monitor + self.poll_interval = poll_interval + self._running = False + self._stop_event: asyncio.Event | None = None + self._active_agents: dict[int, dict[str, Any]] = {} + + # Metrics tracking + self._processed_count = 0 + self._success_count = 0 + self._rejection_count = 0 + + @property + def is_running(self) -> bool: + """Check if the orchestration loop is currently running. + + Returns: + True if the orchestration loop is running + """ + return self._running + + @property + def active_agents(self) -> dict[int, dict[str, Any]]: + """Get the dictionary of active agents. + + Returns: + Dictionary mapping issue numbers to agent info + """ + return self._active_agents + + @property + def processed_count(self) -> int: + """Get total number of processed issues. + + Returns: + Number of issues processed + """ + return self._processed_count + + @property + def success_count(self) -> int: + """Get number of successfully completed issues. + + Returns: + Number of issues that passed quality gates + """ + return self._success_count + + @property + def rejection_count(self) -> int: + """Get number of rejected issues (failed quality gates). + + Returns: + Number of issues that failed quality gates + """ + return self._rejection_count + + def get_active_agent_count(self) -> int: + """Get the count of currently active agents. + + Returns: + Number of active agents + """ + return len(self._active_agents) + + async def start(self) -> None: + """Start the orchestration loop. + + Continuously processes the queue until stop() is called. + """ + self._running = True + self._stop_event = asyncio.Event() + logger.info("OrchestrationLoop started - beginning orchestration") + + try: + while self._running: + try: + await self.process_next_issue() + except Exception as e: + logger.error(f"Error in process_next_issue: {e}") + # Continue running despite errors + + # Wait for poll interval or stop signal + try: + await asyncio.wait_for( + self._stop_event.wait(), + timeout=self.poll_interval, + ) + # If we reach here, stop was requested + break + except TimeoutError: + # Normal timeout, continue polling + pass + + finally: + self._running = False + logger.info("OrchestrationLoop stopped") + + async def stop(self) -> None: + """Stop the orchestration loop gracefully. + + Signals the loop to stop and waits for current processing to complete. + This method is idempotent - can be called multiple times safely. + """ + logger.info("OrchestrationLoop stop requested") + self._running = False + if self._stop_event is not None: + self._stop_event.set() + + async def process_next_issue(self) -> QueueItem | None: + """Process the next ready issue from the queue. + + This method: + 1. Gets the next ready item (priority sorted) + 2. Marks it as in progress + 3. Spawns an agent to process it + 4. Runs quality gates on completion + 5. Handles rejection with forced continuation or marks complete + + Returns: + The QueueItem that was processed, or None if queue is empty + """ + # Get next ready item + item = self.queue_manager.get_next_ready() + + if item is None: + logger.debug("No items in queue to process") + return None + + logger.info( + f"Processing issue #{item.issue_number} " + f"(agent: {item.metadata.assigned_agent}, " + f"difficulty: {item.metadata.difficulty}, " + f"context: {item.metadata.estimated_context} tokens)" + ) + + # Mark as in progress + self.queue_manager.mark_in_progress(item.issue_number) + self._processed_count += 1 + + # Track the agent + agent_id = f"agent-{item.issue_number}" + self._active_agents[item.issue_number] = { + "agent_type": item.metadata.assigned_agent, + "issue_number": item.issue_number, + "agent_id": agent_id, + "status": "running", + } + + try: + # Spawn agent (stub implementation) + agent_success = await self._spawn_agent(item) + + if not agent_success: + logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress") + return item + + # Check context usage (stub - no real monitoring in Phase 0) + await self._check_context(agent_id) + + # Run quality gates on completion + verification = await self._verify_quality(item) + + if verification.all_passed: + # All gates passed - mark as complete + self.queue_manager.mark_complete(item.issue_number) + self._success_count += 1 + logger.info( + f"Issue #{item.issue_number} completed successfully - all gates passed" + ) + else: + # Gates failed - generate continuation prompt + self._rejection_count += 1 + await self._handle_rejection(item, verification) + + except Exception as e: + logger.error(f"Error processing issue #{item.issue_number}: {e}") + # Item remains in progress on error + + return item + + async def _spawn_agent(self, item: QueueItem) -> bool: + """Spawn an agent to process the given item. + + This is a stub implementation for Phase 0 that always succeeds. + Future phases will implement actual agent spawning. + + Args: + item: QueueItem containing issue details + + Returns: + True if agent completed successfully, False otherwise + """ + logger.info( + f"[STUB] Spawning {item.metadata.assigned_agent} agent " + f"for issue #{item.issue_number} " + f"(estimated context: {item.metadata.estimated_context} tokens)" + ) + + # Stub implementation: always succeed + logger.info(f"[STUB] Agent completed for issue #{item.issue_number}") + + return True + + async def _check_context(self, agent_id: str) -> ContextAction: + """Check context usage and determine action. + + Args: + agent_id: Unique identifier for the agent + + Returns: + ContextAction based on usage thresholds + """ + try: + action = await self.context_monitor.determine_action(agent_id) + + if action == ContextAction.COMPACT: + logger.info(f"Agent {agent_id}: Context at 80%, compaction recommended") + elif action == ContextAction.ROTATE_SESSION: + logger.warning(f"Agent {agent_id}: Context at 95%, session rotation needed") + + return action + except Exception as e: + logger.error(f"Error checking context for {agent_id}: {e}") + return ContextAction.CONTINUE + + async def _verify_quality(self, item: QueueItem) -> VerificationResult: + """Run quality gates to verify completion. + + Args: + item: QueueItem that claims completion + + Returns: + VerificationResult from quality orchestrator + """ + logger.info(f"Running quality gates for issue #{item.issue_number}") + + try: + result = await self.quality_orchestrator.verify_completion() + + if result.all_passed: + logger.info(f"Issue #{item.issue_number}: All quality gates passed") + else: + failed_gates = [ + name for name, r in result.gate_results.items() if not r.passed + ] + logger.warning( + f"Issue #{item.issue_number}: Quality gates failed: {failed_gates}" + ) + + return result + + except Exception as e: + logger.error(f"Error verifying quality for issue #{item.issue_number}: {e}") + # Return a failure result on error + from src.gates.quality_gate import GateResult + return VerificationResult( + all_passed=False, + gate_results={ + "error": GateResult( + passed=False, + message=f"Quality verification error: {e}", + details={"error": str(e)}, + ) + }, + ) + + async def _handle_rejection( + self, item: QueueItem, verification: VerificationResult + ) -> None: + """Handle quality gate rejection by generating continuation prompt. + + Args: + item: QueueItem that failed quality gates + verification: VerificationResult with failure details + """ + logger.info(f"Generating forced continuation for issue #{item.issue_number}") + + try: + prompt = self.continuation_service.generate_prompt(verification) + logger.info( + f"Issue #{item.issue_number}: Forced continuation generated " + f"({len(prompt)} chars)" + ) + + # Update agent status + if item.issue_number in self._active_agents: + self._active_agents[item.issue_number]["status"] = "needs_continuation" + self._active_agents[item.issue_number]["continuation_prompt"] = prompt + + except Exception as e: + logger.error( + f"Error generating continuation for issue #{item.issue_number}: {e}" + )