Release: Merge develop to main (111 commits) #302
@@ -2,10 +2,17 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
|
from src.context_monitor import ContextMonitor
|
||||||
|
from src.forced_continuation import ForcedContinuationService
|
||||||
|
from src.models import ContextAction
|
||||||
|
from src.quality_orchestrator import QualityOrchestrator, VerificationResult
|
||||||
from src.queue import QueueItem, QueueManager
|
from src.queue import QueueItem, QueueManager
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
pass
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -179,3 +186,329 @@ class Coordinator:
|
|||||||
logger.info(f"[STUB] Agent completed for issue #{item.issue_number}")
|
logger.info(f"[STUB] Agent completed for issue #{item.issue_number}")
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class OrchestrationLoop:
|
||||||
|
"""Advanced orchestration loop integrating all coordinator components.
|
||||||
|
|
||||||
|
The OrchestrationLoop coordinates:
|
||||||
|
- Issue queue processing with priority sorting
|
||||||
|
- Agent assignment using 50% rule
|
||||||
|
- Quality gate verification on completion claims
|
||||||
|
- Rejection handling with forced continuation prompts
|
||||||
|
- Context monitoring during agent execution
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
queue_manager: QueueManager,
|
||||||
|
quality_orchestrator: QualityOrchestrator,
|
||||||
|
continuation_service: ForcedContinuationService,
|
||||||
|
context_monitor: ContextMonitor,
|
||||||
|
poll_interval: float = 5.0,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the OrchestrationLoop.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queue_manager: QueueManager instance for queue operations
|
||||||
|
quality_orchestrator: QualityOrchestrator for running quality gates
|
||||||
|
continuation_service: ForcedContinuationService for rejection prompts
|
||||||
|
context_monitor: ContextMonitor for tracking agent context usage
|
||||||
|
poll_interval: Seconds between queue polls (default: 5.0)
|
||||||
|
"""
|
||||||
|
self.queue_manager = queue_manager
|
||||||
|
self.quality_orchestrator = quality_orchestrator
|
||||||
|
self.continuation_service = continuation_service
|
||||||
|
self.context_monitor = context_monitor
|
||||||
|
self.poll_interval = poll_interval
|
||||||
|
self._running = False
|
||||||
|
self._stop_event: asyncio.Event | None = None
|
||||||
|
self._active_agents: dict[int, dict[str, Any]] = {}
|
||||||
|
|
||||||
|
# Metrics tracking
|
||||||
|
self._processed_count = 0
|
||||||
|
self._success_count = 0
|
||||||
|
self._rejection_count = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
"""Check if the orchestration loop is currently running.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the orchestration loop is running
|
||||||
|
"""
|
||||||
|
return self._running
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_agents(self) -> dict[int, dict[str, Any]]:
|
||||||
|
"""Get the dictionary of active agents.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping issue numbers to agent info
|
||||||
|
"""
|
||||||
|
return self._active_agents
|
||||||
|
|
||||||
|
@property
|
||||||
|
def processed_count(self) -> int:
|
||||||
|
"""Get total number of processed issues.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of issues processed
|
||||||
|
"""
|
||||||
|
return self._processed_count
|
||||||
|
|
||||||
|
@property
|
||||||
|
def success_count(self) -> int:
|
||||||
|
"""Get number of successfully completed issues.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of issues that passed quality gates
|
||||||
|
"""
|
||||||
|
return self._success_count
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rejection_count(self) -> int:
|
||||||
|
"""Get number of rejected issues (failed quality gates).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of issues that failed quality gates
|
||||||
|
"""
|
||||||
|
return self._rejection_count
|
||||||
|
|
||||||
|
def get_active_agent_count(self) -> int:
|
||||||
|
"""Get the count of currently active agents.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of active agents
|
||||||
|
"""
|
||||||
|
return len(self._active_agents)
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start the orchestration loop.
|
||||||
|
|
||||||
|
Continuously processes the queue until stop() is called.
|
||||||
|
"""
|
||||||
|
self._running = True
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
logger.info("OrchestrationLoop started - beginning orchestration")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
await self.process_next_issue()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in process_next_issue: {e}")
|
||||||
|
# Continue running despite errors
|
||||||
|
|
||||||
|
# Wait for poll interval or stop signal
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
self._stop_event.wait(),
|
||||||
|
timeout=self.poll_interval,
|
||||||
|
)
|
||||||
|
# If we reach here, stop was requested
|
||||||
|
break
|
||||||
|
except TimeoutError:
|
||||||
|
# Normal timeout, continue polling
|
||||||
|
pass
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self._running = False
|
||||||
|
logger.info("OrchestrationLoop stopped")
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop the orchestration loop gracefully.
|
||||||
|
|
||||||
|
Signals the loop to stop and waits for current processing to complete.
|
||||||
|
This method is idempotent - can be called multiple times safely.
|
||||||
|
"""
|
||||||
|
logger.info("OrchestrationLoop stop requested")
|
||||||
|
self._running = False
|
||||||
|
if self._stop_event is not None:
|
||||||
|
self._stop_event.set()
|
||||||
|
|
||||||
|
async def process_next_issue(self) -> QueueItem | None:
|
||||||
|
"""Process the next ready issue from the queue.
|
||||||
|
|
||||||
|
This method:
|
||||||
|
1. Gets the next ready item (priority sorted)
|
||||||
|
2. Marks it as in progress
|
||||||
|
3. Spawns an agent to process it
|
||||||
|
4. Runs quality gates on completion
|
||||||
|
5. Handles rejection with forced continuation or marks complete
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The QueueItem that was processed, or None if queue is empty
|
||||||
|
"""
|
||||||
|
# Get next ready item
|
||||||
|
item = self.queue_manager.get_next_ready()
|
||||||
|
|
||||||
|
if item is None:
|
||||||
|
logger.debug("No items in queue to process")
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Processing issue #{item.issue_number} "
|
||||||
|
f"(agent: {item.metadata.assigned_agent}, "
|
||||||
|
f"difficulty: {item.metadata.difficulty}, "
|
||||||
|
f"context: {item.metadata.estimated_context} tokens)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mark as in progress
|
||||||
|
self.queue_manager.mark_in_progress(item.issue_number)
|
||||||
|
self._processed_count += 1
|
||||||
|
|
||||||
|
# Track the agent
|
||||||
|
agent_id = f"agent-{item.issue_number}"
|
||||||
|
self._active_agents[item.issue_number] = {
|
||||||
|
"agent_type": item.metadata.assigned_agent,
|
||||||
|
"issue_number": item.issue_number,
|
||||||
|
"agent_id": agent_id,
|
||||||
|
"status": "running",
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Spawn agent (stub implementation)
|
||||||
|
agent_success = await self._spawn_agent(item)
|
||||||
|
|
||||||
|
if not agent_success:
|
||||||
|
logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress")
|
||||||
|
return item
|
||||||
|
|
||||||
|
# Check context usage (stub - no real monitoring in Phase 0)
|
||||||
|
await self._check_context(agent_id)
|
||||||
|
|
||||||
|
# Run quality gates on completion
|
||||||
|
verification = await self._verify_quality(item)
|
||||||
|
|
||||||
|
if verification.all_passed:
|
||||||
|
# All gates passed - mark as complete
|
||||||
|
self.queue_manager.mark_complete(item.issue_number)
|
||||||
|
self._success_count += 1
|
||||||
|
logger.info(
|
||||||
|
f"Issue #{item.issue_number} completed successfully - all gates passed"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Gates failed - generate continuation prompt
|
||||||
|
self._rejection_count += 1
|
||||||
|
await self._handle_rejection(item, verification)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing issue #{item.issue_number}: {e}")
|
||||||
|
# Item remains in progress on error
|
||||||
|
|
||||||
|
return item
|
||||||
|
|
||||||
|
async def _spawn_agent(self, item: QueueItem) -> bool:
|
||||||
|
"""Spawn an agent to process the given item.
|
||||||
|
|
||||||
|
This is a stub implementation for Phase 0 that always succeeds.
|
||||||
|
Future phases will implement actual agent spawning.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item: QueueItem containing issue details
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if agent completed successfully, False otherwise
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
f"[STUB] Spawning {item.metadata.assigned_agent} agent "
|
||||||
|
f"for issue #{item.issue_number} "
|
||||||
|
f"(estimated context: {item.metadata.estimated_context} tokens)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stub implementation: always succeed
|
||||||
|
logger.info(f"[STUB] Agent completed for issue #{item.issue_number}")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def _check_context(self, agent_id: str) -> ContextAction:
|
||||||
|
"""Check context usage and determine action.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ContextAction based on usage thresholds
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
action = await self.context_monitor.determine_action(agent_id)
|
||||||
|
|
||||||
|
if action == ContextAction.COMPACT:
|
||||||
|
logger.info(f"Agent {agent_id}: Context at 80%, compaction recommended")
|
||||||
|
elif action == ContextAction.ROTATE_SESSION:
|
||||||
|
logger.warning(f"Agent {agent_id}: Context at 95%, session rotation needed")
|
||||||
|
|
||||||
|
return action
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error checking context for {agent_id}: {e}")
|
||||||
|
return ContextAction.CONTINUE
|
||||||
|
|
||||||
|
async def _verify_quality(self, item: QueueItem) -> VerificationResult:
|
||||||
|
"""Run quality gates to verify completion.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item: QueueItem that claims completion
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
VerificationResult from quality orchestrator
|
||||||
|
"""
|
||||||
|
logger.info(f"Running quality gates for issue #{item.issue_number}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await self.quality_orchestrator.verify_completion()
|
||||||
|
|
||||||
|
if result.all_passed:
|
||||||
|
logger.info(f"Issue #{item.issue_number}: All quality gates passed")
|
||||||
|
else:
|
||||||
|
failed_gates = [
|
||||||
|
name for name, r in result.gate_results.items() if not r.passed
|
||||||
|
]
|
||||||
|
logger.warning(
|
||||||
|
f"Issue #{item.issue_number}: Quality gates failed: {failed_gates}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error verifying quality for issue #{item.issue_number}: {e}")
|
||||||
|
# Return a failure result on error
|
||||||
|
from src.gates.quality_gate import GateResult
|
||||||
|
return VerificationResult(
|
||||||
|
all_passed=False,
|
||||||
|
gate_results={
|
||||||
|
"error": GateResult(
|
||||||
|
passed=False,
|
||||||
|
message=f"Quality verification error: {e}",
|
||||||
|
details={"error": str(e)},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_rejection(
|
||||||
|
self, item: QueueItem, verification: VerificationResult
|
||||||
|
) -> None:
|
||||||
|
"""Handle quality gate rejection by generating continuation prompt.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item: QueueItem that failed quality gates
|
||||||
|
verification: VerificationResult with failure details
|
||||||
|
"""
|
||||||
|
logger.info(f"Generating forced continuation for issue #{item.issue_number}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
prompt = self.continuation_service.generate_prompt(verification)
|
||||||
|
logger.info(
|
||||||
|
f"Issue #{item.issue_number}: Forced continuation generated "
|
||||||
|
f"({len(prompt)} chars)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update agent status
|
||||||
|
if item.issue_number in self._active_agents:
|
||||||
|
self._active_agents[item.issue_number]["status"] = "needs_continuation"
|
||||||
|
self._active_agents[item.issue_number]["continuation_prompt"] = prompt
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error generating continuation for issue #{item.issue_number}: {e}"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user