From 36e6cdd9f9186fac93c6e207912a675a3981c997 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Feb 2026 01:52:54 -0600 Subject: [PATCH] feat(#372): track orchestrator agent task completions via telemetry - Instrument Coordinator.process_queue() with timing and telemetry events - Instrument OrchestrationLoop.process_next_issue() with quality gate tracking - Add agent-to-telemetry mapping (model, provider, harness per agent name) - Map difficulty levels to Complexity enum and gate names to QualityGate enum - Track retry counts per issue (increment on failure, clear on success) - Emit FAILURE outcome on agent spawn failure or quality gate rejection - Non-blocking: telemetry errors are logged and swallowed, never delay tasks - Pass telemetry client from FastAPI lifespan to Coordinator constructor - Add 33 unit tests covering all telemetry scenarios Refs #372 Co-Authored-By: Claude Opus 4.6 --- apps/coordinator/src/coordinator.py | 260 +++++- apps/coordinator/src/main.py | 2 + apps/coordinator/tests/test_task_telemetry.py | 796 ++++++++++++++++++ 3 files changed, 1057 insertions(+), 1 deletion(-) create mode 100644 apps/coordinator/tests/test_task_telemetry.py diff --git a/apps/coordinator/src/coordinator.py b/apps/coordinator/src/coordinator.py index aee45c2..090302b 100644 --- a/apps/coordinator/src/coordinator.py +++ b/apps/coordinator/src/coordinator.py @@ -2,12 +2,24 @@ import asyncio import logging +import time from typing import TYPE_CHECKING, Any +from mosaicstack_telemetry import ( # type: ignore[import-untyped] + Complexity, + Harness, + Outcome, + Provider, + QualityGate, + TaskType, + TelemetryClient, +) + from src.circuit_breaker import CircuitBreaker, CircuitBreakerError from src.context_monitor import ContextMonitor from src.forced_continuation import ForcedContinuationService from src.models import ContextAction +from src.mosaic_telemetry import build_task_event from src.quality_orchestrator import QualityOrchestrator, VerificationResult from src.queue import QueueItem, QueueManager from src.tracing_decorators import trace_agent_operation @@ -17,6 +29,49 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Agent-name → telemetry-field mapping helpers +# --------------------------------------------------------------------------- + +# Maps assigned_agent strings to (model, Provider, Harness) +_AGENT_TELEMETRY_MAP: dict[str, tuple[str, Provider, Harness]] = { + "sonnet": ("claude-sonnet-4-20250514", Provider.ANTHROPIC, Harness.CLAUDE_CODE), + "opus": ("claude-opus-4-20250514", Provider.ANTHROPIC, Harness.CLAUDE_CODE), + "haiku": ("claude-haiku-3.5-20241022", Provider.ANTHROPIC, Harness.CLAUDE_CODE), + "glm": ("glm-4", Provider.CUSTOM, Harness.CUSTOM), + "minimax": ("minimax", Provider.CUSTOM, Harness.CUSTOM), +} + +_DIFFICULTY_TO_COMPLEXITY: dict[str, Complexity] = { + "easy": Complexity.LOW, + "medium": Complexity.MEDIUM, + "hard": Complexity.HIGH, +} + +_GATE_NAME_TO_ENUM: dict[str, QualityGate] = { + "build": QualityGate.BUILD, + "lint": QualityGate.LINT, + "test": QualityGate.TEST, + "coverage": QualityGate.COVERAGE, +} + + +def _resolve_agent_fields( + assigned_agent: str, +) -> tuple[str, Provider, Harness]: + """Resolve agent name to (model, provider, harness) for telemetry. + + Args: + assigned_agent: The agent name string from issue metadata. + + Returns: + Tuple of (model_name, Provider, Harness). + """ + return _AGENT_TELEMETRY_MAP.get( + assigned_agent, + ("unknown", Provider.UNKNOWN, Harness.UNKNOWN), + ) + class Coordinator: """Main orchestration loop for processing the issue queue. @@ -41,6 +96,8 @@ class Coordinator: poll_interval: float = 5.0, circuit_breaker_threshold: int = 5, circuit_breaker_cooldown: float = 30.0, + telemetry_client: TelemetryClient | None = None, + instance_id: str = "", ) -> None: """Initialize the Coordinator. @@ -49,12 +106,16 @@ class Coordinator: poll_interval: Seconds between queue polls (default: 5.0) circuit_breaker_threshold: Consecutive failures before opening circuit (default: 5) circuit_breaker_cooldown: Seconds to wait before retry after circuit opens (default: 30) + telemetry_client: Optional Mosaic telemetry client for tracking task events + instance_id: UUID identifying this coordinator instance for telemetry """ self.queue_manager = queue_manager self.poll_interval = poll_interval self._running = False self._stop_event: asyncio.Event | None = None self._active_agents: dict[int, dict[str, Any]] = {} + self._telemetry_client = telemetry_client + self._instance_id = instance_id # Circuit breaker for preventing infinite retry loops (SEC-ORCH-7) self._circuit_breaker = CircuitBreaker( @@ -197,7 +258,8 @@ class Coordinator: """Process the next ready item from the queue. Gets the next ready item, spawns an agent to process it, - and marks it complete on success. + and marks it complete on success. Emits a Mosaic telemetry + TaskCompletionEvent after each task attempt. Returns: The QueueItem that was processed, or None if queue is empty @@ -218,6 +280,10 @@ class Coordinator: # Mark as in progress self.queue_manager.mark_in_progress(item.issue_number) + # Track timing for telemetry + start_mono = time.monotonic() + outcome = Outcome.FAILURE + # Spawn agent (stub implementation) try: success = await self.spawn_agent(item) @@ -225,6 +291,7 @@ class Coordinator: if success: # Mark as complete self.queue_manager.mark_complete(item.issue_number) + outcome = Outcome.SUCCESS logger.info(f"Issue #{item.issue_number} completed successfully") else: logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress") @@ -233,8 +300,81 @@ class Coordinator: logger.error(f"Error spawning agent for issue #{item.issue_number}: {e}") # Item remains in progress on error + finally: + elapsed_ms = int((time.monotonic() - start_mono) * 1000) + self._emit_task_telemetry(item, outcome=outcome, duration_ms=elapsed_ms) + return item + def _emit_task_telemetry( + self, + item: QueueItem, + *, + outcome: Outcome, + duration_ms: int, + retry_count: int = 0, + actual_input_tokens: int = 0, + actual_output_tokens: int = 0, + quality_passed: bool = False, + quality_gates_run: list[QualityGate] | None = None, + quality_gates_failed: list[QualityGate] | None = None, + ) -> None: + """Emit a Mosaic telemetry TaskCompletionEvent (non-blocking). + + This method never raises; any telemetry errors are logged and swallowed + so they do not interfere with task processing. + + Args: + item: The QueueItem that was processed. + outcome: Task outcome (SUCCESS, FAILURE, TIMEOUT, etc.). + duration_ms: Wall-clock duration in milliseconds. + retry_count: Number of retries before this attempt. + actual_input_tokens: Actual input tokens consumed by the harness. + actual_output_tokens: Actual output tokens consumed by the harness. + quality_passed: Whether all quality gates passed. + quality_gates_run: Quality gates that were executed. + quality_gates_failed: Quality gates that failed. + """ + if self._telemetry_client is None or not self._instance_id: + return + + try: + model, provider, harness = _resolve_agent_fields( + item.metadata.assigned_agent, + ) + complexity = _DIFFICULTY_TO_COMPLEXITY.get( + item.metadata.difficulty, Complexity.MEDIUM + ) + + event = build_task_event( + instance_id=self._instance_id, + task_type=TaskType.IMPLEMENTATION, + complexity=complexity, + outcome=outcome, + duration_ms=duration_ms, + model=model, + provider=provider, + harness=harness, + actual_input_tokens=actual_input_tokens, + actual_output_tokens=actual_output_tokens, + estimated_input_tokens=item.metadata.estimated_context, + quality_passed=quality_passed, + quality_gates_run=quality_gates_run, + quality_gates_failed=quality_gates_failed, + retry_count=retry_count, + ) + self._telemetry_client.track(event) + logger.debug( + "Telemetry event emitted for issue #%d (outcome=%s)", + item.issue_number, + outcome.value, + ) + except Exception: + logger.exception( + "Failed to emit telemetry for issue #%d (non-fatal)", + item.issue_number, + ) + @trace_agent_operation(operation_name="spawn_agent") async def spawn_agent(self, item: QueueItem) -> bool: """Spawn an agent to process the given item. @@ -294,6 +434,8 @@ class OrchestrationLoop: poll_interval: float = 5.0, circuit_breaker_threshold: int = 5, circuit_breaker_cooldown: float = 30.0, + telemetry_client: TelemetryClient | None = None, + instance_id: str = "", ) -> None: """Initialize the OrchestrationLoop. @@ -305,6 +447,8 @@ class OrchestrationLoop: poll_interval: Seconds between queue polls (default: 5.0) circuit_breaker_threshold: Consecutive failures before opening circuit (default: 5) circuit_breaker_cooldown: Seconds to wait before retry after circuit opens (default: 30) + telemetry_client: Optional Mosaic telemetry client for tracking task events + instance_id: UUID identifying this coordinator instance for telemetry """ self.queue_manager = queue_manager self.quality_orchestrator = quality_orchestrator @@ -314,6 +458,11 @@ class OrchestrationLoop: self._running = False self._stop_event: asyncio.Event | None = None self._active_agents: dict[int, dict[str, Any]] = {} + self._telemetry_client = telemetry_client + self._instance_id = instance_id + + # Per-issue retry tracking + self._retry_counts: dict[int, int] = {} # Metrics tracking self._processed_count = 0 @@ -493,6 +642,7 @@ class OrchestrationLoop: 3. Spawns an agent to process it 4. Runs quality gates on completion 5. Handles rejection with forced continuation or marks complete + 6. Emits a Mosaic telemetry TaskCompletionEvent Returns: The QueueItem that was processed, or None if queue is empty @@ -524,12 +674,21 @@ class OrchestrationLoop: "status": "running", } + # Track timing for telemetry + start_mono = time.monotonic() + outcome = Outcome.FAILURE + quality_passed = False + gates_run: list[QualityGate] = [] + gates_failed: list[QualityGate] = [] + retry_count = self._retry_counts.get(item.issue_number, 0) + try: # Spawn agent (stub implementation) agent_success = await self._spawn_agent(item) if not agent_success: logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress") + self._retry_counts[item.issue_number] = retry_count + 1 return item # Check context usage (stub - no real monitoring in Phase 0) @@ -538,24 +697,123 @@ class OrchestrationLoop: # Run quality gates on completion verification = await self._verify_quality(item) + # Map gate results for telemetry + gates_run = [ + _GATE_NAME_TO_ENUM[name] + for name in verification.gate_results + if name in _GATE_NAME_TO_ENUM + ] + gates_failed = [ + _GATE_NAME_TO_ENUM[name] + for name, result in verification.gate_results.items() + if name in _GATE_NAME_TO_ENUM and not result.passed + ] + quality_passed = verification.all_passed + if verification.all_passed: # All gates passed - mark as complete self.queue_manager.mark_complete(item.issue_number) self._success_count += 1 + outcome = Outcome.SUCCESS + # Clear retry counter on success + self._retry_counts.pop(item.issue_number, None) logger.info( f"Issue #{item.issue_number} completed successfully - all gates passed" ) else: # Gates failed - generate continuation prompt self._rejection_count += 1 + outcome = Outcome.FAILURE + self._retry_counts[item.issue_number] = retry_count + 1 await self._handle_rejection(item, verification) except Exception as e: logger.error(f"Error processing issue #{item.issue_number}: {e}") # Item remains in progress on error + finally: + elapsed_ms = int((time.monotonic() - start_mono) * 1000) + self._emit_task_telemetry( + item, + outcome=outcome, + duration_ms=elapsed_ms, + retry_count=retry_count, + quality_passed=quality_passed, + quality_gates_run=gates_run, + quality_gates_failed=gates_failed, + ) + return item + def _emit_task_telemetry( + self, + item: QueueItem, + *, + outcome: Outcome, + duration_ms: int, + retry_count: int = 0, + actual_input_tokens: int = 0, + actual_output_tokens: int = 0, + quality_passed: bool = False, + quality_gates_run: list[QualityGate] | None = None, + quality_gates_failed: list[QualityGate] | None = None, + ) -> None: + """Emit a Mosaic telemetry TaskCompletionEvent (non-blocking). + + This method never raises; any telemetry errors are logged and swallowed + so they do not interfere with task processing. + + Args: + item: The QueueItem that was processed. + outcome: Task outcome (SUCCESS, FAILURE, TIMEOUT, etc.). + duration_ms: Wall-clock duration in milliseconds. + retry_count: Number of retries before this attempt. + actual_input_tokens: Actual input tokens consumed by the harness. + actual_output_tokens: Actual output tokens consumed by the harness. + quality_passed: Whether all quality gates passed. + quality_gates_run: Quality gates that were executed. + quality_gates_failed: Quality gates that failed. + """ + if self._telemetry_client is None or not self._instance_id: + return + + try: + model, provider, harness = _resolve_agent_fields( + item.metadata.assigned_agent, + ) + complexity = _DIFFICULTY_TO_COMPLEXITY.get( + item.metadata.difficulty, Complexity.MEDIUM + ) + + event = build_task_event( + instance_id=self._instance_id, + task_type=TaskType.IMPLEMENTATION, + complexity=complexity, + outcome=outcome, + duration_ms=duration_ms, + model=model, + provider=provider, + harness=harness, + actual_input_tokens=actual_input_tokens, + actual_output_tokens=actual_output_tokens, + estimated_input_tokens=item.metadata.estimated_context, + quality_passed=quality_passed, + quality_gates_run=quality_gates_run, + quality_gates_failed=quality_gates_failed, + retry_count=retry_count, + ) + self._telemetry_client.track(event) + logger.debug( + "Telemetry event emitted for issue #%d (outcome=%s)", + item.issue_number, + outcome.value, + ) + except Exception: + logger.exception( + "Failed to emit telemetry for issue #%d (non-fatal)", + item.issue_number, + ) + async def _spawn_agent(self, item: QueueItem) -> bool: """Spawn an agent to process the given item. diff --git a/apps/coordinator/src/main.py b/apps/coordinator/src/main.py index 8f345b5..9c37f91 100644 --- a/apps/coordinator/src/main.py +++ b/apps/coordinator/src/main.py @@ -100,6 +100,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]: _coordinator = Coordinator( queue_manager=queue_manager, poll_interval=settings.coordinator_poll_interval, + telemetry_client=mosaic_telemetry_client, + instance_id=mosaic_telemetry_config.instance_id or "", ) logger.info( f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, " diff --git a/apps/coordinator/tests/test_task_telemetry.py b/apps/coordinator/tests/test_task_telemetry.py new file mode 100644 index 0000000..ddcc2c4 --- /dev/null +++ b/apps/coordinator/tests/test_task_telemetry.py @@ -0,0 +1,796 @@ +"""Tests for task completion telemetry instrumentation in the coordinator. + +These tests verify that the Coordinator and OrchestrationLoop correctly +emit TaskCompletionEvents via the Mosaic telemetry SDK after each task +dispatch attempt. +""" + +from __future__ import annotations + +import tempfile +from collections.abc import Generator +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest +from mosaicstack_telemetry import ( # type: ignore[import-untyped] + Complexity, + Harness, + Outcome, + Provider, + QualityGate, + TaskCompletionEvent, + TaskType, + TelemetryClient, +) + +from src.coordinator import ( + _AGENT_TELEMETRY_MAP, + _DIFFICULTY_TO_COMPLEXITY, + _GATE_NAME_TO_ENUM, + Coordinator, + OrchestrationLoop, + _resolve_agent_fields, +) +from src.gates.quality_gate import GateResult +from src.models import IssueMetadata +from src.quality_orchestrator import QualityOrchestrator, VerificationResult +from src.queue import QueueManager + +VALID_INSTANCE_ID = "12345678-1234-1234-1234-123456789abc" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def temp_queue_file() -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + if temp_path.exists(): + temp_path.unlink() + + +@pytest.fixture +def queue_manager(temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + +@pytest.fixture +def mock_telemetry_client() -> MagicMock: + """Create a mock TelemetryClient.""" + client = MagicMock(spec=TelemetryClient) + client.track = MagicMock() + return client + + +@pytest.fixture +def sonnet_metadata() -> IssueMetadata: + """Metadata for a sonnet agent task.""" + return IssueMetadata( + assigned_agent="sonnet", + difficulty="medium", + estimated_context=50000, + ) + + +@pytest.fixture +def opus_metadata() -> IssueMetadata: + """Metadata for an opus agent task (hard difficulty).""" + return IssueMetadata( + assigned_agent="opus", + difficulty="hard", + estimated_context=120000, + ) + + +# --------------------------------------------------------------------------- +# _resolve_agent_fields tests +# --------------------------------------------------------------------------- + + +class TestResolveAgentFields: + """Tests for the _resolve_agent_fields helper.""" + + def test_known_agent_sonnet(self) -> None: + """Should return correct fields for sonnet agent.""" + model, provider, harness = _resolve_agent_fields("sonnet") + assert model == "claude-sonnet-4-20250514" + assert provider == Provider.ANTHROPIC + assert harness == Harness.CLAUDE_CODE + + def test_known_agent_opus(self) -> None: + """Should return correct fields for opus agent.""" + model, provider, harness = _resolve_agent_fields("opus") + assert model == "claude-opus-4-20250514" + assert provider == Provider.ANTHROPIC + assert harness == Harness.CLAUDE_CODE + + def test_known_agent_haiku(self) -> None: + """Should return correct fields for haiku agent.""" + model, provider, harness = _resolve_agent_fields("haiku") + assert model == "claude-haiku-3.5-20241022" + assert provider == Provider.ANTHROPIC + assert harness == Harness.CLAUDE_CODE + + def test_known_agent_glm(self) -> None: + """Should return correct fields for glm (self-hosted) agent.""" + model, provider, harness = _resolve_agent_fields("glm") + assert model == "glm-4" + assert provider == Provider.CUSTOM + assert harness == Harness.CUSTOM + + def test_known_agent_minimax(self) -> None: + """Should return correct fields for minimax (self-hosted) agent.""" + model, provider, harness = _resolve_agent_fields("minimax") + assert model == "minimax" + assert provider == Provider.CUSTOM + assert harness == Harness.CUSTOM + + def test_unknown_agent_returns_defaults(self) -> None: + """Should return unknown values for unrecognised agent names.""" + model, provider, harness = _resolve_agent_fields("nonexistent") + assert model == "unknown" + assert provider == Provider.UNKNOWN + assert harness == Harness.UNKNOWN + + def test_all_map_entries_covered(self) -> None: + """Ensure every entry in _AGENT_TELEMETRY_MAP is resolvable.""" + for agent_name in _AGENT_TELEMETRY_MAP: + model, provider, harness = _resolve_agent_fields(agent_name) + assert model != "unknown" + + +# --------------------------------------------------------------------------- +# Coordinator telemetry emission tests +# --------------------------------------------------------------------------- + + +class TestCoordinatorTelemetry: + """Tests for telemetry emission in the Coordinator class.""" + + @pytest.mark.asyncio + async def test_emits_success_event_on_completion( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit a SUCCESS event when task completes successfully.""" + queue_manager.enqueue(100, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + + await coordinator.process_queue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert isinstance(event, TaskCompletionEvent) + assert event.outcome == Outcome.SUCCESS + assert event.task_type == TaskType.IMPLEMENTATION + assert event.complexity == Complexity.MEDIUM + assert event.provider == Provider.ANTHROPIC + assert event.harness == Harness.CLAUDE_CODE + assert str(event.instance_id) == VALID_INSTANCE_ID + assert event.task_duration_ms >= 0 + + @pytest.mark.asyncio + async def test_emits_failure_event_when_agent_fails( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit a FAILURE event when spawn_agent returns False.""" + queue_manager.enqueue(101, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + # Override spawn_agent to fail + coordinator.spawn_agent = AsyncMock(return_value=False) # type: ignore[method-assign] + + await coordinator.process_queue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert event.outcome == Outcome.FAILURE + + @pytest.mark.asyncio + async def test_emits_failure_event_on_exception( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit a FAILURE event when spawn_agent raises an exception.""" + queue_manager.enqueue(102, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + coordinator.spawn_agent = AsyncMock(side_effect=RuntimeError("agent crashed")) # type: ignore[method-assign] + + await coordinator.process_queue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert event.outcome == Outcome.FAILURE + + @pytest.mark.asyncio + async def test_maps_difficulty_to_complexity( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + opus_metadata: IssueMetadata, + ) -> None: + """Should map difficulty='hard' to Complexity.HIGH in the event.""" + queue_manager.enqueue(103, opus_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + + await coordinator.process_queue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.complexity == Complexity.HIGH + + @pytest.mark.asyncio + async def test_maps_agent_to_model_and_provider( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + opus_metadata: IssueMetadata, + ) -> None: + """Should map 'opus' agent to opus model and ANTHROPIC provider.""" + queue_manager.enqueue(104, opus_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + + await coordinator.process_queue() + + event = mock_telemetry_client.track.call_args[0][0] + assert "opus" in event.model + assert event.provider == Provider.ANTHROPIC + assert event.harness == Harness.CLAUDE_CODE + + @pytest.mark.asyncio + async def test_no_event_when_telemetry_disabled( + self, + queue_manager: QueueManager, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should not call track when telemetry_client is None.""" + queue_manager.enqueue(105, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=None, + instance_id=VALID_INSTANCE_ID, + ) + + # Should not raise + await coordinator.process_queue() + + @pytest.mark.asyncio + async def test_no_event_when_instance_id_empty( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should not call track when instance_id is empty.""" + queue_manager.enqueue(106, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id="", + ) + + await coordinator.process_queue() + mock_telemetry_client.track.assert_not_called() + + @pytest.mark.asyncio + async def test_telemetry_exception_does_not_propagate( + self, + queue_manager: QueueManager, + sonnet_metadata: IssueMetadata, + ) -> None: + """Telemetry failures must never break task processing.""" + queue_manager.enqueue(107, sonnet_metadata) + + bad_client = MagicMock(spec=TelemetryClient) + bad_client.track = MagicMock(side_effect=RuntimeError("telemetry down")) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=bad_client, + instance_id=VALID_INSTANCE_ID, + ) + + # Should complete without raising, despite telemetry failure + result = await coordinator.process_queue() + assert result is not None + assert result.issue_number == 107 + + @pytest.mark.asyncio + async def test_no_event_when_queue_empty( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + ) -> None: + """Should not emit any event when the queue is empty.""" + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + + result = await coordinator.process_queue() + assert result is None + mock_telemetry_client.track.assert_not_called() + + @pytest.mark.asyncio + async def test_estimated_input_tokens_from_metadata( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should set estimated_input_tokens from issue metadata.""" + queue_manager.enqueue(108, sonnet_metadata) + + coordinator = Coordinator( + queue_manager=queue_manager, + telemetry_client=mock_telemetry_client, + instance_id=VALID_INSTANCE_ID, + ) + + await coordinator.process_queue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.estimated_input_tokens == 50000 + + +# --------------------------------------------------------------------------- +# OrchestrationLoop telemetry emission tests +# --------------------------------------------------------------------------- + + +def _make_orchestration_loop( + queue_manager: QueueManager, + telemetry_client: TelemetryClient | None = None, + instance_id: str = VALID_INSTANCE_ID, + quality_result: VerificationResult | None = None, +) -> OrchestrationLoop: + """Create an OrchestrationLoop with mocked dependencies. + + Args: + queue_manager: Queue manager instance. + telemetry_client: Optional telemetry client. + instance_id: Coordinator instance ID. + quality_result: Override quality verification result. + + Returns: + Configured OrchestrationLoop. + """ + # Create quality orchestrator mock + qo = MagicMock(spec=QualityOrchestrator) + default_result = quality_result or VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build OK"), + "lint": GateResult(passed=True, message="Lint OK"), + "test": GateResult(passed=True, message="Test OK"), + "coverage": GateResult(passed=True, message="Coverage OK"), + }, + ) + qo.verify_completion = AsyncMock(return_value=default_result) + + # Continuation service mock + from src.forced_continuation import ForcedContinuationService + + cs = MagicMock(spec=ForcedContinuationService) + cs.generate_prompt = MagicMock(return_value="Fix: build failed") + + # Context monitor mock + from src.context_monitor import ContextMonitor + + cm = MagicMock(spec=ContextMonitor) + cm.determine_action = AsyncMock(return_value="continue") + + return OrchestrationLoop( + queue_manager=queue_manager, + quality_orchestrator=qo, + continuation_service=cs, + context_monitor=cm, + telemetry_client=telemetry_client, + instance_id=instance_id, + ) + + +class TestOrchestrationLoopTelemetry: + """Tests for telemetry emission in the OrchestrationLoop class.""" + + @pytest.mark.asyncio + async def test_emits_success_with_quality_gates( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit SUCCESS event with quality gate details.""" + queue_manager.enqueue(200, sonnet_metadata) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + await loop.process_next_issue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert event.outcome == Outcome.SUCCESS + assert event.quality_gate_passed is True + assert set(event.quality_gates_run) == { + QualityGate.BUILD, + QualityGate.LINT, + QualityGate.TEST, + QualityGate.COVERAGE, + } + assert event.quality_gates_failed == [] + + @pytest.mark.asyncio + async def test_emits_failure_with_failed_gates( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit FAILURE event with failed gate details.""" + queue_manager.enqueue(201, sonnet_metadata) + + failed_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=True, message="Build OK"), + "lint": GateResult(passed=True, message="Lint OK"), + "test": GateResult(passed=False, message="3 tests failed"), + "coverage": GateResult(passed=False, message="Coverage 70% < 85%"), + }, + ) + + loop = _make_orchestration_loop( + queue_manager, + telemetry_client=mock_telemetry_client, + quality_result=failed_result, + ) + + await loop.process_next_issue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert event.outcome == Outcome.FAILURE + assert event.quality_gate_passed is False + assert set(event.quality_gates_failed) == { + QualityGate.TEST, + QualityGate.COVERAGE, + } + assert set(event.quality_gates_run) == { + QualityGate.BUILD, + QualityGate.LINT, + QualityGate.TEST, + QualityGate.COVERAGE, + } + + @pytest.mark.asyncio + async def test_retry_count_starts_at_zero( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """First attempt should report retry_count=0.""" + queue_manager.enqueue(202, sonnet_metadata) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + await loop.process_next_issue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.retry_count == 0 + + @pytest.mark.asyncio + async def test_retry_count_increments_on_failure( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Retry count should increment after a quality gate failure.""" + queue_manager.enqueue(203, sonnet_metadata) + + failed_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=False, message="Build failed"), + }, + ) + + loop = _make_orchestration_loop( + queue_manager, + telemetry_client=mock_telemetry_client, + quality_result=failed_result, + ) + + # First attempt + await loop.process_next_issue() + event1 = mock_telemetry_client.track.call_args[0][0] + assert event1.retry_count == 0 + + # Re-enqueue and process again (simulates retry) + queue_manager.enqueue(203, sonnet_metadata) + mock_telemetry_client.track.reset_mock() + + await loop.process_next_issue() + event2 = mock_telemetry_client.track.call_args[0][0] + assert event2.retry_count == 1 + + @pytest.mark.asyncio + async def test_retry_count_clears_on_success( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Retry count should be cleared after a successful completion.""" + queue_manager.enqueue(204, sonnet_metadata) + + # First: fail + failed_result = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=False, message="Build failed"), + }, + ) + loop = _make_orchestration_loop( + queue_manager, + telemetry_client=mock_telemetry_client, + quality_result=failed_result, + ) + + await loop.process_next_issue() + assert loop._retry_counts.get(204) == 1 + + # Now succeed + success_result = VerificationResult( + all_passed=True, + gate_results={ + "build": GateResult(passed=True, message="Build OK"), + }, + ) + loop.quality_orchestrator.verify_completion = AsyncMock(return_value=success_result) # type: ignore[method-assign] + queue_manager.enqueue(204, sonnet_metadata) + mock_telemetry_client.track.reset_mock() + + await loop.process_next_issue() + assert 204 not in loop._retry_counts + + @pytest.mark.asyncio + async def test_emits_failure_when_agent_spawn_fails( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should emit FAILURE when _spawn_agent returns False.""" + queue_manager.enqueue(205, sonnet_metadata) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + loop._spawn_agent = AsyncMock(return_value=False) # type: ignore[method-assign] + + await loop.process_next_issue() + + mock_telemetry_client.track.assert_called_once() + event = mock_telemetry_client.track.call_args[0][0] + assert event.outcome == Outcome.FAILURE + + @pytest.mark.asyncio + async def test_no_event_when_telemetry_disabled( + self, + queue_manager: QueueManager, + sonnet_metadata: IssueMetadata, + ) -> None: + """Should not call track when telemetry_client is None.""" + queue_manager.enqueue(206, sonnet_metadata) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=None + ) + + # Should not raise + result = await loop.process_next_issue() + assert result is not None + + @pytest.mark.asyncio + async def test_telemetry_exception_does_not_propagate( + self, + queue_manager: QueueManager, + sonnet_metadata: IssueMetadata, + ) -> None: + """Telemetry failures must never disrupt task processing.""" + queue_manager.enqueue(207, sonnet_metadata) + + bad_client = MagicMock(spec=TelemetryClient) + bad_client.track = MagicMock(side_effect=RuntimeError("telemetry down")) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=bad_client + ) + + result = await loop.process_next_issue() + assert result is not None + assert result.issue_number == 207 + + @pytest.mark.asyncio + async def test_duration_is_positive( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Duration should be a non-negative integer.""" + queue_manager.enqueue(208, sonnet_metadata) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + await loop.process_next_issue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.task_duration_ms >= 0 + + @pytest.mark.asyncio + async def test_maps_glm_agent_correctly( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + ) -> None: + """Should map GLM (self-hosted) agent to CUSTOM provider/harness.""" + glm_meta = IssueMetadata( + assigned_agent="glm", + difficulty="medium", + estimated_context=30000, + ) + queue_manager.enqueue(209, glm_meta) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + await loop.process_next_issue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.model == "glm-4" + assert event.provider == Provider.CUSTOM + assert event.harness == Harness.CUSTOM + + @pytest.mark.asyncio + async def test_maps_easy_difficulty_to_low_complexity( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + ) -> None: + """Should map difficulty='easy' to Complexity.LOW.""" + easy_meta = IssueMetadata( + assigned_agent="haiku", + difficulty="easy", + estimated_context=10000, + ) + queue_manager.enqueue(210, easy_meta) + + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + await loop.process_next_issue() + + event = mock_telemetry_client.track.call_args[0][0] + assert event.complexity == Complexity.LOW + + @pytest.mark.asyncio + async def test_no_event_when_queue_empty( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + ) -> None: + """Should not emit an event when queue is empty.""" + loop = _make_orchestration_loop( + queue_manager, telemetry_client=mock_telemetry_client + ) + + result = await loop.process_next_issue() + assert result is None + mock_telemetry_client.track.assert_not_called() + + @pytest.mark.asyncio + async def test_unknown_gate_names_excluded( + self, + queue_manager: QueueManager, + mock_telemetry_client: MagicMock, + sonnet_metadata: IssueMetadata, + ) -> None: + """Gate names not in _GATE_NAME_TO_ENUM should be excluded from telemetry.""" + queue_manager.enqueue(211, sonnet_metadata) + + result_with_unknown = VerificationResult( + all_passed=False, + gate_results={ + "build": GateResult(passed=True, message="Build OK"), + "unknown_gate": GateResult(passed=False, message="Unknown gate"), + }, + ) + + loop = _make_orchestration_loop( + queue_manager, + telemetry_client=mock_telemetry_client, + quality_result=result_with_unknown, + ) + + await loop.process_next_issue() + + event = mock_telemetry_client.track.call_args[0][0] + assert QualityGate.BUILD in event.quality_gates_run + # unknown_gate should not appear + assert len(event.quality_gates_run) == 1 + assert len(event.quality_gates_failed) == 0 + + +# --------------------------------------------------------------------------- +# Mapping dict completeness tests +# --------------------------------------------------------------------------- + + +class TestMappingCompleteness: + """Tests to verify mapping dicts cover expected values.""" + + def test_difficulty_map_covers_all_metadata_values(self) -> None: + """All valid difficulty levels should have Complexity mappings.""" + expected_difficulties = {"easy", "medium", "hard"} + assert expected_difficulties == set(_DIFFICULTY_TO_COMPLEXITY.keys()) + + def test_gate_name_map_covers_all_orchestrator_gates(self) -> None: + """All gate names used by QualityOrchestrator should be mappable.""" + expected_gates = {"build", "lint", "test", "coverage"} + assert expected_gates == set(_GATE_NAME_TO_ENUM.keys()) + + def test_agent_map_covers_all_configured_agents(self) -> None: + """All agents used by the coordinator should have telemetry mappings.""" + expected_agents = {"sonnet", "opus", "haiku", "glm", "minimax"} + assert expected_agents == set(_AGENT_TELEMETRY_MAP.keys())