feat(#372): track orchestrator agent task completions via telemetry
- Instrument Coordinator.process_queue() with timing and telemetry events - Instrument OrchestrationLoop.process_next_issue() with quality gate tracking - Add agent-to-telemetry mapping (model, provider, harness per agent name) - Map difficulty levels to Complexity enum and gate names to QualityGate enum - Track retry counts per issue (increment on failure, clear on success) - Emit FAILURE outcome on agent spawn failure or quality gate rejection - Non-blocking: telemetry errors are logged and swallowed, never delay tasks - Pass telemetry client from FastAPI lifespan to Coordinator constructor - Add 33 unit tests covering all telemetry scenarios Refs #372 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,12 +2,24 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from mosaicstack_telemetry import ( # type: ignore[import-untyped]
|
||||
Complexity,
|
||||
Harness,
|
||||
Outcome,
|
||||
Provider,
|
||||
QualityGate,
|
||||
TaskType,
|
||||
TelemetryClient,
|
||||
)
|
||||
|
||||
from src.circuit_breaker import CircuitBreaker, CircuitBreakerError
|
||||
from src.context_monitor import ContextMonitor
|
||||
from src.forced_continuation import ForcedContinuationService
|
||||
from src.models import ContextAction
|
||||
from src.mosaic_telemetry import build_task_event
|
||||
from src.quality_orchestrator import QualityOrchestrator, VerificationResult
|
||||
from src.queue import QueueItem, QueueManager
|
||||
from src.tracing_decorators import trace_agent_operation
|
||||
@@ -17,6 +29,49 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent-name → telemetry-field mapping helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Maps assigned_agent strings to (model, Provider, Harness)
|
||||
_AGENT_TELEMETRY_MAP: dict[str, tuple[str, Provider, Harness]] = {
|
||||
"sonnet": ("claude-sonnet-4-20250514", Provider.ANTHROPIC, Harness.CLAUDE_CODE),
|
||||
"opus": ("claude-opus-4-20250514", Provider.ANTHROPIC, Harness.CLAUDE_CODE),
|
||||
"haiku": ("claude-haiku-3.5-20241022", Provider.ANTHROPIC, Harness.CLAUDE_CODE),
|
||||
"glm": ("glm-4", Provider.CUSTOM, Harness.CUSTOM),
|
||||
"minimax": ("minimax", Provider.CUSTOM, Harness.CUSTOM),
|
||||
}
|
||||
|
||||
_DIFFICULTY_TO_COMPLEXITY: dict[str, Complexity] = {
|
||||
"easy": Complexity.LOW,
|
||||
"medium": Complexity.MEDIUM,
|
||||
"hard": Complexity.HIGH,
|
||||
}
|
||||
|
||||
_GATE_NAME_TO_ENUM: dict[str, QualityGate] = {
|
||||
"build": QualityGate.BUILD,
|
||||
"lint": QualityGate.LINT,
|
||||
"test": QualityGate.TEST,
|
||||
"coverage": QualityGate.COVERAGE,
|
||||
}
|
||||
|
||||
|
||||
def _resolve_agent_fields(
|
||||
assigned_agent: str,
|
||||
) -> tuple[str, Provider, Harness]:
|
||||
"""Resolve agent name to (model, provider, harness) for telemetry.
|
||||
|
||||
Args:
|
||||
assigned_agent: The agent name string from issue metadata.
|
||||
|
||||
Returns:
|
||||
Tuple of (model_name, Provider, Harness).
|
||||
"""
|
||||
return _AGENT_TELEMETRY_MAP.get(
|
||||
assigned_agent,
|
||||
("unknown", Provider.UNKNOWN, Harness.UNKNOWN),
|
||||
)
|
||||
|
||||
|
||||
class Coordinator:
|
||||
"""Main orchestration loop for processing the issue queue.
|
||||
@@ -41,6 +96,8 @@ class Coordinator:
|
||||
poll_interval: float = 5.0,
|
||||
circuit_breaker_threshold: int = 5,
|
||||
circuit_breaker_cooldown: float = 30.0,
|
||||
telemetry_client: TelemetryClient | None = None,
|
||||
instance_id: str = "",
|
||||
) -> None:
|
||||
"""Initialize the Coordinator.
|
||||
|
||||
@@ -49,12 +106,16 @@ class Coordinator:
|
||||
poll_interval: Seconds between queue polls (default: 5.0)
|
||||
circuit_breaker_threshold: Consecutive failures before opening circuit (default: 5)
|
||||
circuit_breaker_cooldown: Seconds to wait before retry after circuit opens (default: 30)
|
||||
telemetry_client: Optional Mosaic telemetry client for tracking task events
|
||||
instance_id: UUID identifying this coordinator instance for telemetry
|
||||
"""
|
||||
self.queue_manager = queue_manager
|
||||
self.poll_interval = poll_interval
|
||||
self._running = False
|
||||
self._stop_event: asyncio.Event | None = None
|
||||
self._active_agents: dict[int, dict[str, Any]] = {}
|
||||
self._telemetry_client = telemetry_client
|
||||
self._instance_id = instance_id
|
||||
|
||||
# Circuit breaker for preventing infinite retry loops (SEC-ORCH-7)
|
||||
self._circuit_breaker = CircuitBreaker(
|
||||
@@ -197,7 +258,8 @@ class Coordinator:
|
||||
"""Process the next ready item from the queue.
|
||||
|
||||
Gets the next ready item, spawns an agent to process it,
|
||||
and marks it complete on success.
|
||||
and marks it complete on success. Emits a Mosaic telemetry
|
||||
TaskCompletionEvent after each task attempt.
|
||||
|
||||
Returns:
|
||||
The QueueItem that was processed, or None if queue is empty
|
||||
@@ -218,6 +280,10 @@ class Coordinator:
|
||||
# Mark as in progress
|
||||
self.queue_manager.mark_in_progress(item.issue_number)
|
||||
|
||||
# Track timing for telemetry
|
||||
start_mono = time.monotonic()
|
||||
outcome = Outcome.FAILURE
|
||||
|
||||
# Spawn agent (stub implementation)
|
||||
try:
|
||||
success = await self.spawn_agent(item)
|
||||
@@ -225,6 +291,7 @@ class Coordinator:
|
||||
if success:
|
||||
# Mark as complete
|
||||
self.queue_manager.mark_complete(item.issue_number)
|
||||
outcome = Outcome.SUCCESS
|
||||
logger.info(f"Issue #{item.issue_number} completed successfully")
|
||||
else:
|
||||
logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress")
|
||||
@@ -233,8 +300,81 @@ class Coordinator:
|
||||
logger.error(f"Error spawning agent for issue #{item.issue_number}: {e}")
|
||||
# Item remains in progress on error
|
||||
|
||||
finally:
|
||||
elapsed_ms = int((time.monotonic() - start_mono) * 1000)
|
||||
self._emit_task_telemetry(item, outcome=outcome, duration_ms=elapsed_ms)
|
||||
|
||||
return item
|
||||
|
||||
def _emit_task_telemetry(
|
||||
self,
|
||||
item: QueueItem,
|
||||
*,
|
||||
outcome: Outcome,
|
||||
duration_ms: int,
|
||||
retry_count: int = 0,
|
||||
actual_input_tokens: int = 0,
|
||||
actual_output_tokens: int = 0,
|
||||
quality_passed: bool = False,
|
||||
quality_gates_run: list[QualityGate] | None = None,
|
||||
quality_gates_failed: list[QualityGate] | None = None,
|
||||
) -> None:
|
||||
"""Emit a Mosaic telemetry TaskCompletionEvent (non-blocking).
|
||||
|
||||
This method never raises; any telemetry errors are logged and swallowed
|
||||
so they do not interfere with task processing.
|
||||
|
||||
Args:
|
||||
item: The QueueItem that was processed.
|
||||
outcome: Task outcome (SUCCESS, FAILURE, TIMEOUT, etc.).
|
||||
duration_ms: Wall-clock duration in milliseconds.
|
||||
retry_count: Number of retries before this attempt.
|
||||
actual_input_tokens: Actual input tokens consumed by the harness.
|
||||
actual_output_tokens: Actual output tokens consumed by the harness.
|
||||
quality_passed: Whether all quality gates passed.
|
||||
quality_gates_run: Quality gates that were executed.
|
||||
quality_gates_failed: Quality gates that failed.
|
||||
"""
|
||||
if self._telemetry_client is None or not self._instance_id:
|
||||
return
|
||||
|
||||
try:
|
||||
model, provider, harness = _resolve_agent_fields(
|
||||
item.metadata.assigned_agent,
|
||||
)
|
||||
complexity = _DIFFICULTY_TO_COMPLEXITY.get(
|
||||
item.metadata.difficulty, Complexity.MEDIUM
|
||||
)
|
||||
|
||||
event = build_task_event(
|
||||
instance_id=self._instance_id,
|
||||
task_type=TaskType.IMPLEMENTATION,
|
||||
complexity=complexity,
|
||||
outcome=outcome,
|
||||
duration_ms=duration_ms,
|
||||
model=model,
|
||||
provider=provider,
|
||||
harness=harness,
|
||||
actual_input_tokens=actual_input_tokens,
|
||||
actual_output_tokens=actual_output_tokens,
|
||||
estimated_input_tokens=item.metadata.estimated_context,
|
||||
quality_passed=quality_passed,
|
||||
quality_gates_run=quality_gates_run,
|
||||
quality_gates_failed=quality_gates_failed,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
self._telemetry_client.track(event)
|
||||
logger.debug(
|
||||
"Telemetry event emitted for issue #%d (outcome=%s)",
|
||||
item.issue_number,
|
||||
outcome.value,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to emit telemetry for issue #%d (non-fatal)",
|
||||
item.issue_number,
|
||||
)
|
||||
|
||||
@trace_agent_operation(operation_name="spawn_agent")
|
||||
async def spawn_agent(self, item: QueueItem) -> bool:
|
||||
"""Spawn an agent to process the given item.
|
||||
@@ -294,6 +434,8 @@ class OrchestrationLoop:
|
||||
poll_interval: float = 5.0,
|
||||
circuit_breaker_threshold: int = 5,
|
||||
circuit_breaker_cooldown: float = 30.0,
|
||||
telemetry_client: TelemetryClient | None = None,
|
||||
instance_id: str = "",
|
||||
) -> None:
|
||||
"""Initialize the OrchestrationLoop.
|
||||
|
||||
@@ -305,6 +447,8 @@ class OrchestrationLoop:
|
||||
poll_interval: Seconds between queue polls (default: 5.0)
|
||||
circuit_breaker_threshold: Consecutive failures before opening circuit (default: 5)
|
||||
circuit_breaker_cooldown: Seconds to wait before retry after circuit opens (default: 30)
|
||||
telemetry_client: Optional Mosaic telemetry client for tracking task events
|
||||
instance_id: UUID identifying this coordinator instance for telemetry
|
||||
"""
|
||||
self.queue_manager = queue_manager
|
||||
self.quality_orchestrator = quality_orchestrator
|
||||
@@ -314,6 +458,11 @@ class OrchestrationLoop:
|
||||
self._running = False
|
||||
self._stop_event: asyncio.Event | None = None
|
||||
self._active_agents: dict[int, dict[str, Any]] = {}
|
||||
self._telemetry_client = telemetry_client
|
||||
self._instance_id = instance_id
|
||||
|
||||
# Per-issue retry tracking
|
||||
self._retry_counts: dict[int, int] = {}
|
||||
|
||||
# Metrics tracking
|
||||
self._processed_count = 0
|
||||
@@ -493,6 +642,7 @@ class OrchestrationLoop:
|
||||
3. Spawns an agent to process it
|
||||
4. Runs quality gates on completion
|
||||
5. Handles rejection with forced continuation or marks complete
|
||||
6. Emits a Mosaic telemetry TaskCompletionEvent
|
||||
|
||||
Returns:
|
||||
The QueueItem that was processed, or None if queue is empty
|
||||
@@ -524,12 +674,21 @@ class OrchestrationLoop:
|
||||
"status": "running",
|
||||
}
|
||||
|
||||
# Track timing for telemetry
|
||||
start_mono = time.monotonic()
|
||||
outcome = Outcome.FAILURE
|
||||
quality_passed = False
|
||||
gates_run: list[QualityGate] = []
|
||||
gates_failed: list[QualityGate] = []
|
||||
retry_count = self._retry_counts.get(item.issue_number, 0)
|
||||
|
||||
try:
|
||||
# Spawn agent (stub implementation)
|
||||
agent_success = await self._spawn_agent(item)
|
||||
|
||||
if not agent_success:
|
||||
logger.warning(f"Issue #{item.issue_number} agent failed - remains in progress")
|
||||
self._retry_counts[item.issue_number] = retry_count + 1
|
||||
return item
|
||||
|
||||
# Check context usage (stub - no real monitoring in Phase 0)
|
||||
@@ -538,24 +697,123 @@ class OrchestrationLoop:
|
||||
# Run quality gates on completion
|
||||
verification = await self._verify_quality(item)
|
||||
|
||||
# Map gate results for telemetry
|
||||
gates_run = [
|
||||
_GATE_NAME_TO_ENUM[name]
|
||||
for name in verification.gate_results
|
||||
if name in _GATE_NAME_TO_ENUM
|
||||
]
|
||||
gates_failed = [
|
||||
_GATE_NAME_TO_ENUM[name]
|
||||
for name, result in verification.gate_results.items()
|
||||
if name in _GATE_NAME_TO_ENUM and not result.passed
|
||||
]
|
||||
quality_passed = verification.all_passed
|
||||
|
||||
if verification.all_passed:
|
||||
# All gates passed - mark as complete
|
||||
self.queue_manager.mark_complete(item.issue_number)
|
||||
self._success_count += 1
|
||||
outcome = Outcome.SUCCESS
|
||||
# Clear retry counter on success
|
||||
self._retry_counts.pop(item.issue_number, None)
|
||||
logger.info(
|
||||
f"Issue #{item.issue_number} completed successfully - all gates passed"
|
||||
)
|
||||
else:
|
||||
# Gates failed - generate continuation prompt
|
||||
self._rejection_count += 1
|
||||
outcome = Outcome.FAILURE
|
||||
self._retry_counts[item.issue_number] = retry_count + 1
|
||||
await self._handle_rejection(item, verification)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing issue #{item.issue_number}: {e}")
|
||||
# Item remains in progress on error
|
||||
|
||||
finally:
|
||||
elapsed_ms = int((time.monotonic() - start_mono) * 1000)
|
||||
self._emit_task_telemetry(
|
||||
item,
|
||||
outcome=outcome,
|
||||
duration_ms=elapsed_ms,
|
||||
retry_count=retry_count,
|
||||
quality_passed=quality_passed,
|
||||
quality_gates_run=gates_run,
|
||||
quality_gates_failed=gates_failed,
|
||||
)
|
||||
|
||||
return item
|
||||
|
||||
def _emit_task_telemetry(
|
||||
self,
|
||||
item: QueueItem,
|
||||
*,
|
||||
outcome: Outcome,
|
||||
duration_ms: int,
|
||||
retry_count: int = 0,
|
||||
actual_input_tokens: int = 0,
|
||||
actual_output_tokens: int = 0,
|
||||
quality_passed: bool = False,
|
||||
quality_gates_run: list[QualityGate] | None = None,
|
||||
quality_gates_failed: list[QualityGate] | None = None,
|
||||
) -> None:
|
||||
"""Emit a Mosaic telemetry TaskCompletionEvent (non-blocking).
|
||||
|
||||
This method never raises; any telemetry errors are logged and swallowed
|
||||
so they do not interfere with task processing.
|
||||
|
||||
Args:
|
||||
item: The QueueItem that was processed.
|
||||
outcome: Task outcome (SUCCESS, FAILURE, TIMEOUT, etc.).
|
||||
duration_ms: Wall-clock duration in milliseconds.
|
||||
retry_count: Number of retries before this attempt.
|
||||
actual_input_tokens: Actual input tokens consumed by the harness.
|
||||
actual_output_tokens: Actual output tokens consumed by the harness.
|
||||
quality_passed: Whether all quality gates passed.
|
||||
quality_gates_run: Quality gates that were executed.
|
||||
quality_gates_failed: Quality gates that failed.
|
||||
"""
|
||||
if self._telemetry_client is None or not self._instance_id:
|
||||
return
|
||||
|
||||
try:
|
||||
model, provider, harness = _resolve_agent_fields(
|
||||
item.metadata.assigned_agent,
|
||||
)
|
||||
complexity = _DIFFICULTY_TO_COMPLEXITY.get(
|
||||
item.metadata.difficulty, Complexity.MEDIUM
|
||||
)
|
||||
|
||||
event = build_task_event(
|
||||
instance_id=self._instance_id,
|
||||
task_type=TaskType.IMPLEMENTATION,
|
||||
complexity=complexity,
|
||||
outcome=outcome,
|
||||
duration_ms=duration_ms,
|
||||
model=model,
|
||||
provider=provider,
|
||||
harness=harness,
|
||||
actual_input_tokens=actual_input_tokens,
|
||||
actual_output_tokens=actual_output_tokens,
|
||||
estimated_input_tokens=item.metadata.estimated_context,
|
||||
quality_passed=quality_passed,
|
||||
quality_gates_run=quality_gates_run,
|
||||
quality_gates_failed=quality_gates_failed,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
self._telemetry_client.track(event)
|
||||
logger.debug(
|
||||
"Telemetry event emitted for issue #%d (outcome=%s)",
|
||||
item.issue_number,
|
||||
outcome.value,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to emit telemetry for issue #%d (non-fatal)",
|
||||
item.issue_number,
|
||||
)
|
||||
|
||||
async def _spawn_agent(self, item: QueueItem) -> bool:
|
||||
"""Spawn an agent to process the given item.
|
||||
|
||||
|
||||
@@ -100,6 +100,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
|
||||
_coordinator = Coordinator(
|
||||
queue_manager=queue_manager,
|
||||
poll_interval=settings.coordinator_poll_interval,
|
||||
telemetry_client=mosaic_telemetry_client,
|
||||
instance_id=mosaic_telemetry_config.instance_id or "",
|
||||
)
|
||||
logger.info(
|
||||
f"Coordinator initialized (poll interval: {settings.coordinator_poll_interval}s, "
|
||||
|
||||
Reference in New Issue
Block a user