From 5cd2ff6c1319ed3fda23ff8e91cad952c3a61ea5 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 20:21:51 -0600 Subject: [PATCH] test(#150): Add tests for orchestration loop (TDD - RED phase) Add comprehensive test suite for OrchestrationLoop class that integrates: - Queue processing with priority sorting - Agent assignment (50% rule) - Quality gate verification on completion claims - Rejection handling with forced continuation prompts - Context monitoring during agent execution - Lifecycle management (start/stop) - Error handling for all edge cases - Metrics tracking (processed, success, rejection counts) 33 new tests covering all acceptance criteria. Co-Authored-By: Claude Opus 4.5 --- .../tests/test_orchestration_loop.py | 1543 +++++++++++++++++ 1 file changed, 1543 insertions(+) create mode 100644 apps/coordinator/tests/test_orchestration_loop.py diff --git a/apps/coordinator/tests/test_orchestration_loop.py b/apps/coordinator/tests/test_orchestration_loop.py new file mode 100644 index 0000000..56b8e55 --- /dev/null +++ b/apps/coordinator/tests/test_orchestration_loop.py @@ -0,0 +1,1543 @@ +"""Tests for the orchestration loop (issue #150). + +These tests verify the complete orchestration loop that integrates: +- Queue processing with priority sorting +- Agent assignment (50% rule) +- Quality gate verification +- Rejection handling (forced continuation) +- Approval and completion flow +- Context monitoring during execution +""" + +import asyncio +import tempfile +from collections.abc import Generator +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.gates.quality_gate import GateResult +from src.models import ContextAction, ContextUsage, IssueMetadata +from src.quality_orchestrator import VerificationResult +from src.queue import QueueItem, QueueItemStatus, QueueManager + + +class TestOrchestrationLoopInitialization: + """Tests for OrchestrationLoop initialization.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator.""" + orchestrator = MagicMock() + orchestrator.verify_completion = AsyncMock() + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock() + monitor.determine_action = AsyncMock() + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + def test_orchestration_loop_initialization( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test OrchestrationLoop initializes with all required components.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + assert loop.queue_manager is queue_manager + assert loop.quality_orchestrator is mock_quality_orchestrator + assert loop.continuation_service is mock_continuation_service + assert loop.context_monitor is mock_context_monitor + assert loop.is_running is False + + def test_orchestration_loop_default_poll_interval( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test OrchestrationLoop has default poll interval.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + assert loop.poll_interval == 5.0 + + def test_orchestration_loop_custom_poll_interval( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test OrchestrationLoop with custom poll interval.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=2.0, + ) + + assert loop.poll_interval == 2.0 + + +class TestOrchestrationLoopQueueProcessing: + """Tests for queue processing with priority sorting.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + service = MagicMock() + service.generate_prompt = MagicMock(return_value="Fix the issues") + return service + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.fixture + def orchestration_loop( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> Any: + """Create an orchestration loop for testing.""" + from src.coordinator import OrchestrationLoop + + return OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + @pytest.mark.asyncio + async def test_process_empty_queue( + self, + orchestration_loop: Any, + ) -> None: + """Test processing an empty queue returns None.""" + result = await orchestration_loop.process_next_issue() + assert result is None + + @pytest.mark.asyncio + async def test_process_single_issue( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test processing a single issue from queue.""" + meta = IssueMetadata( + estimated_context=50000, + difficulty="medium", + assigned_agent="sonnet", + ) + queue_manager.enqueue(150, meta) + + result = await orchestration_loop.process_next_issue() + + assert result is not None + assert result.issue_number == 150 + + @pytest.mark.asyncio + async def test_process_issues_in_priority_order( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test issues are processed in priority order (lower number first).""" + meta1 = IssueMetadata(estimated_context=50000, difficulty="easy") + meta2 = IssueMetadata(estimated_context=50000, difficulty="easy") + + queue_manager.enqueue(152, meta1) # Higher number + queue_manager.enqueue(150, meta2) # Lower number + + result1 = await orchestration_loop.process_next_issue() + result2 = await orchestration_loop.process_next_issue() + + assert result1 is not None + assert result1.issue_number == 150 # Lower number processed first + assert result2 is not None + assert result2.issue_number == 152 + + @pytest.mark.asyncio + async def test_respects_dependency_order( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test blocked issues are not processed until dependencies complete.""" + # 150 blocks 151 + meta150 = IssueMetadata( + estimated_context=50000, difficulty="easy", blocks=[151], blocked_by=[] + ) + meta151 = IssueMetadata( + estimated_context=50000, difficulty="easy", blocks=[], blocked_by=[150] + ) + + queue_manager.enqueue(150, meta150) + queue_manager.enqueue(151, meta151) + + # Verify 151 is blocked + item151 = queue_manager.get_item(151) + assert item151 is not None + assert item151.ready is False + + # Process 150 first + result = await orchestration_loop.process_next_issue() + assert result is not None + assert result.issue_number == 150 + + # Now 151 should be ready + item151 = queue_manager.get_item(151) + assert item151 is not None + assert item151.ready is True + + +class TestOrchestrationLoopAgentAssignment: + """Tests for agent assignment integration.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.fixture + def orchestration_loop( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> Any: + """Create an orchestration loop for testing.""" + from src.coordinator import OrchestrationLoop + + return OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + @pytest.mark.asyncio + async def test_assigns_cheapest_capable_agent( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test that cheapest capable agent is assigned (50% rule).""" + # Small context, easy difficulty - should get cheapest agent + meta = IssueMetadata( + estimated_context=20000, # Small context + difficulty="easy", + assigned_agent="sonnet", # May be overridden + ) + queue_manager.enqueue(150, meta) + + result = await orchestration_loop.process_next_issue() + + assert result is not None + # The orchestration loop should have attempted to assign an agent + # Agent assignment is done during spawn_agent + + @pytest.mark.asyncio + async def test_validates_50_percent_rule( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test that 50% rule is validated during agent assignment.""" + # Large context that violates 50% rule for some agents + meta = IssueMetadata( + estimated_context=90000, # This exceeds 50% of haiku's context + difficulty="easy", + assigned_agent="haiku", + ) + queue_manager.enqueue(150, meta) + + # Process should still work - will assign a capable agent + result = await orchestration_loop.process_next_issue() + assert result is not None + + +class TestOrchestrationLoopQualityVerification: + """Tests for quality gate verification integration.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.mark.asyncio + async def test_quality_gates_called_on_completion( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test quality gates are called when agent claims completion.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=passing_result) + + mock_continuation = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + # Verify quality orchestrator was called + mock_orchestrator.verify_completion.assert_called_once() + + @pytest.mark.asyncio + async def test_issue_completed_when_all_gates_pass( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test issue is marked completed when all quality gates pass.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=passing_result) + + mock_continuation = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + # Verify issue is completed + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED + + +class TestOrchestrationLoopRejectionHandling: + """Tests for handling quality gate rejections.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.mark.asyncio + async def test_forced_continuation_on_gate_failure( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test forced continuation prompt is generated on gate failure.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + failing_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=False, message="Lint failed", details={"errors": 5}), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=failing_result) + + mock_continuation = MagicMock() + mock_continuation.generate_prompt = MagicMock( + return_value="QUALITY GATES FAILED - Fix lint issues" + ) + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + result = await loop.process_next_issue() + + # Verify continuation prompt was generated + mock_continuation.generate_prompt.assert_called_once_with(failing_result) + assert result is not None + + @pytest.mark.asyncio + async def test_issue_remains_in_progress_on_rejection( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test issue remains in progress when quality gates fail.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + failing_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=False, message="Build failed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=failing_result) + + mock_continuation = MagicMock() + mock_continuation.generate_prompt = MagicMock(return_value="Fix build errors") + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + # Issue should remain in progress (not completed) + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_continuation_prompt_contains_failure_details( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test continuation prompt includes specific failure details.""" + from src.coordinator import OrchestrationLoop + from src.forced_continuation import ForcedContinuationService + + mock_orchestrator = MagicMock() + failing_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=False, message="Tests failed: 3 failures"), + "coverage": GateResult( + passed=False, + message="Coverage below threshold", + details={"coverage_percent": 70.0, "minimum_coverage": 85.0}, + ), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=failing_result) + + # Use real continuation service to verify prompt format + real_continuation = ForcedContinuationService() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=real_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + # Issue should remain in progress + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + +class TestOrchestrationLoopContextMonitoring: + """Tests for context monitoring during execution.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.mark.asyncio + async def test_context_monitor_tracks_agent( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + ) -> None: + """Test context monitor tracks agent during execution.""" + from src.coordinator import OrchestrationLoop + + mock_monitor = MagicMock() + mock_monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test-agent", used_tokens=50000, total_tokens=200000) + ) + mock_monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + mock_monitor.start_monitoring = AsyncMock() + mock_monitor.stop_monitoring = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + # Context monitor should have been used + # The exact behavior depends on implementation + + @pytest.mark.asyncio + async def test_handles_context_compact_action( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + ) -> None: + """Test handling COMPACT action from context monitor.""" + from src.coordinator import OrchestrationLoop + + mock_monitor = MagicMock() + mock_monitor.get_context_usage = AsyncMock( + return_value=ContextUsage( + agent_id="test-agent", used_tokens=160000, total_tokens=200000 + ) # 80% + ) + mock_monitor.determine_action = AsyncMock(return_value=ContextAction.COMPACT) + mock_monitor.start_monitoring = AsyncMock() + mock_monitor.stop_monitoring = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should complete successfully even with COMPACT action + result = await loop.process_next_issue() + assert result is not None + + @pytest.mark.asyncio + async def test_handles_context_rotate_action( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + ) -> None: + """Test handling ROTATE_SESSION action from context monitor.""" + from src.coordinator import OrchestrationLoop + + mock_monitor = MagicMock() + mock_monitor.get_context_usage = AsyncMock( + return_value=ContextUsage( + agent_id="test-agent", used_tokens=190000, total_tokens=200000 + ) # 95% + ) + mock_monitor.determine_action = AsyncMock(return_value=ContextAction.ROTATE_SESSION) + mock_monitor.start_monitoring = AsyncMock() + mock_monitor.stop_monitoring = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should complete (in stub implementation) + result = await loop.process_next_issue() + assert result is not None + + +class TestOrchestrationLoopLifecycle: + """Tests for orchestration loop lifecycle (start/stop).""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.fixture + def orchestration_loop( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> Any: + """Create an orchestration loop for testing.""" + from src.coordinator import OrchestrationLoop + + return OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + @pytest.mark.asyncio + async def test_start_sets_running_flag( + self, + orchestration_loop: Any, + ) -> None: + """Test start() sets is_running to True.""" + task = asyncio.create_task(orchestration_loop.start()) + + await asyncio.sleep(0.05) + assert orchestration_loop.is_running is True + + await orchestration_loop.stop() + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_stop_clears_running_flag( + self, + orchestration_loop: Any, + ) -> None: + """Test stop() clears is_running flag.""" + task = asyncio.create_task(orchestration_loop.start()) + + await asyncio.sleep(0.05) + await orchestration_loop.stop() + await asyncio.sleep(0.1) + + assert orchestration_loop.is_running is False + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_loop_processes_continuously( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test loop processes queue items continuously.""" + process_count = 0 + + original_process = orchestration_loop.process_next_issue + + async def counting_process() -> QueueItem | None: + nonlocal process_count + process_count += 1 + result: QueueItem | None = await original_process() + return result + + orchestration_loop.process_next_issue = counting_process + + task = asyncio.create_task(orchestration_loop.start()) + + await asyncio.sleep(0.2) + await orchestration_loop.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should have processed multiple times + assert process_count >= 2 + + @pytest.mark.asyncio + async def test_graceful_shutdown( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test graceful shutdown waits for current processing.""" + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + processing_started = asyncio.Event() + original_process = orchestration_loop.process_next_issue + + async def slow_process() -> QueueItem | None: + processing_started.set() + await asyncio.sleep(0.1) + result: QueueItem | None = await original_process() + return result + + orchestration_loop.process_next_issue = slow_process + + task = asyncio.create_task(orchestration_loop.start()) + + await processing_started.wait() + await orchestration_loop.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert orchestration_loop.is_running is False + + +class TestOrchestrationLoopErrorHandling: + """Tests for error handling in orchestration loop.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.mark.asyncio + async def test_handles_quality_orchestrator_error( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test loop handles quality orchestrator errors gracefully.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + mock_orchestrator.verify_completion = AsyncMock(side_effect=RuntimeError("API error")) + + mock_continuation = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should not raise, just log error + result = await loop.process_next_issue() + assert result is not None + + # Issue should remain in progress due to error + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_loop_continues_after_error( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test loop continues running after encountering an error.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=passing_result) + + mock_continuation = MagicMock() + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + call_count = 0 + error_raised = False + + async def failing_process() -> QueueItem | None: + nonlocal call_count, error_raised + call_count += 1 + if call_count == 1: + error_raised = True + raise RuntimeError("Simulated error") + return None + + loop.process_next_issue = failing_process # type: ignore[method-assign] + + task = asyncio.create_task(loop.start()) + await asyncio.sleep(0.2) + await loop.stop() + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Should have continued after the error + assert error_raised is True + assert call_count >= 2 + + @pytest.mark.asyncio + async def test_handles_continuation_service_error( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test loop handles continuation service errors gracefully.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + failing_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=False, message="Build failed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=failing_result) + + mock_continuation = MagicMock() + mock_continuation.generate_prompt = MagicMock(side_effect=ValueError("Prompt error")) + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should not raise + result = await loop.process_next_issue() + assert result is not None + + +class TestOrchestrationLoopEdgeCases: + """Tests for edge cases and additional coverage.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.mark.asyncio + async def test_active_agents_property( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test active_agents property returns agent dictionary.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + # Initially empty + assert loop.active_agents == {} + + # Process an issue + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + await loop.process_next_issue() + + # Now has active agent + assert 150 in loop.active_agents + + @pytest.mark.asyncio + async def test_get_active_agent_count_method( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test get_active_agent_count returns correct count.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + assert loop.get_active_agent_count() == 0 + + # Process issues + meta1 = IssueMetadata(estimated_context=50000, difficulty="easy") + meta2 = IssueMetadata(estimated_context=50000, difficulty="easy") + queue_manager.enqueue(150, meta1) + queue_manager.enqueue(151, meta2) + + await loop.process_next_issue() + assert loop.get_active_agent_count() == 1 + + await loop.process_next_issue() + assert loop.get_active_agent_count() == 2 + + @pytest.mark.asyncio + async def test_agent_spawn_failure( + self, + queue_manager: QueueManager, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test handling when agent spawn fails.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=passing_result) + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + # Override spawn to return False + async def failing_spawn(item: QueueItem) -> bool: + return False + + loop._spawn_agent = failing_spawn # type: ignore[method-assign] + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + result = await loop.process_next_issue() + + assert result is not None + # Issue remains in progress due to spawn failure + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_context_monitor_exception( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + ) -> None: + """Test handling when context monitor raises exception.""" + from src.coordinator import OrchestrationLoop + + mock_monitor = MagicMock() + mock_monitor.determine_action = AsyncMock(side_effect=RuntimeError("Monitor error")) + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_monitor, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should complete despite monitor error + result = await loop.process_next_issue() + assert result is not None + + @pytest.mark.asyncio + async def test_process_next_issue_exception_handling( + self, + queue_manager: QueueManager, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test exception handling in process_next_issue main try block.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + # Make verify_completion raise to trigger exception handling + mock_orchestrator.verify_completion = AsyncMock( + side_effect=RuntimeError("Verification failed catastrophically") + ) + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="medium") + queue_manager.enqueue(150, meta) + + # Should not raise, returns item despite error + result = await loop.process_next_issue() + assert result is not None + assert result.issue_number == 150 + + # Item should remain in progress due to error + item = queue_manager.get_item(150) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + @pytest.mark.asyncio + async def test_stop_signal_breaks_loop( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> None: + """Test that stop signal properly breaks the loop.""" + from src.coordinator import OrchestrationLoop + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=1.0, # Longer interval + ) + + task = asyncio.create_task(loop.start()) + + # Wait briefly for loop to start + await asyncio.sleep(0.05) + assert loop.is_running is True + + # Stop immediately + await loop.stop() + + # Wait for task to complete (should be quick due to stop signal) + try: + await asyncio.wait_for(task, timeout=0.5) + except TimeoutError: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert loop.is_running is False + + +class TestOrchestrationLoopMetrics: + """Tests for orchestration loop metrics and tracking.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + @pytest.fixture + def mock_quality_orchestrator(self) -> MagicMock: + """Create a mock quality orchestrator that passes all gates.""" + orchestrator = MagicMock() + passing_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build passed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + orchestrator.verify_completion = AsyncMock(return_value=passing_result) + return orchestrator + + @pytest.fixture + def mock_continuation_service(self) -> MagicMock: + """Create a mock continuation service.""" + return MagicMock() + + @pytest.fixture + def mock_context_monitor(self) -> MagicMock: + """Create a mock context monitor.""" + monitor = MagicMock() + monitor.get_context_usage = AsyncMock( + return_value=ContextUsage(agent_id="test", used_tokens=50000, total_tokens=200000) + ) + monitor.determine_action = AsyncMock(return_value=ContextAction.CONTINUE) + monitor.start_monitoring = AsyncMock() + monitor.stop_monitoring = MagicMock() + return monitor + + @pytest.fixture + def orchestration_loop( + self, + queue_manager: QueueManager, + mock_quality_orchestrator: MagicMock, + mock_continuation_service: MagicMock, + mock_context_monitor: MagicMock, + ) -> Any: + """Create an orchestration loop for testing.""" + from src.coordinator import OrchestrationLoop + + return OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_quality_orchestrator, + continuation_service=mock_continuation_service, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + @pytest.mark.asyncio + async def test_tracks_processed_issues( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test loop tracks number of processed issues.""" + meta1 = IssueMetadata(estimated_context=50000, difficulty="easy") + meta2 = IssueMetadata(estimated_context=50000, difficulty="medium") + + queue_manager.enqueue(150, meta1) + queue_manager.enqueue(151, meta2) + + await orchestration_loop.process_next_issue() + await orchestration_loop.process_next_issue() + + assert orchestration_loop.processed_count == 2 + + @pytest.mark.asyncio + async def test_tracks_successful_completions( + self, + orchestration_loop: Any, + queue_manager: QueueManager, + ) -> None: + """Test loop tracks successful completions.""" + meta = IssueMetadata(estimated_context=50000, difficulty="easy") + queue_manager.enqueue(150, meta) + + await orchestration_loop.process_next_issue() + + assert orchestration_loop.success_count == 1 + + @pytest.mark.asyncio + async def test_tracks_rejections( + self, + queue_manager: QueueManager, + mock_context_monitor: MagicMock, + ) -> None: + """Test loop tracks quality gate rejections.""" + from src.coordinator import OrchestrationLoop + + mock_orchestrator = MagicMock() + failing_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=False, message="Build failed"), + "lint": GateResult(passed=True, message="Lint passed"), + "test": GateResult(passed=True, message="Tests passed"), + "coverage": GateResult(passed=True, message="Coverage passed"), + }, + ) + mock_orchestrator.verify_completion = AsyncMock(return_value=failing_result) + + mock_continuation = MagicMock() + mock_continuation.generate_prompt = MagicMock(return_value="Fix issues") + + loop = OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=mock_orchestrator, + continuation_service=mock_continuation, + context_monitor=mock_context_monitor, + poll_interval=0.05, + ) + + meta = IssueMetadata(estimated_context=50000, difficulty="easy") + queue_manager.enqueue(150, meta) + + await loop.process_next_issue() + + assert loop.rejection_count == 1