"""Mosaic Stack telemetry integration for the Coordinator. This module provides helpers for tracking task completion events using the mosaicstack-telemetry SDK. It is separate from the OpenTelemetry distributed tracing configured in telemetry.py. Environment variables (auto-read by the SDK): MOSAIC_TELEMETRY_ENABLED: Enable/disable telemetry (default: true) MOSAIC_TELEMETRY_SERVER_URL: Telemetry server endpoint MOSAIC_TELEMETRY_API_KEY: API key for authentication MOSAIC_TELEMETRY_INSTANCE_ID: UUID identifying this coordinator instance """ from __future__ import annotations import logging from typing import TYPE_CHECKING from mosaicstack_telemetry import ( # type: ignore[import-untyped] Complexity, EventBuilder, Harness, Outcome, Provider, QualityGate, TaskType, TelemetryClient, TelemetryConfig, ) if TYPE_CHECKING: from fastapi import FastAPI from mosaicstack_telemetry import TaskCompletionEvent logger = logging.getLogger(__name__) def get_telemetry_client(app: FastAPI) -> TelemetryClient | None: """Retrieve the Mosaic telemetry client from FastAPI app state. Args: app: The FastAPI application instance. Returns: The TelemetryClient if initialised and telemetry is enabled, or None if telemetry is disabled or not yet initialised. """ client: TelemetryClient | None = getattr(app.state, "mosaic_telemetry", None) return client def build_task_event( *, instance_id: str, task_type: TaskType = TaskType.IMPLEMENTATION, complexity: Complexity = Complexity.MEDIUM, outcome: Outcome = Outcome.SUCCESS, duration_ms: int = 0, model: str = "claude-sonnet-4-20250514", provider: Provider = Provider.ANTHROPIC, harness: Harness = Harness.CLAUDE_CODE, estimated_input_tokens: int = 0, estimated_output_tokens: int = 0, actual_input_tokens: int = 0, actual_output_tokens: int = 0, estimated_cost_micros: int = 0, actual_cost_micros: int = 0, quality_passed: bool = False, quality_gates_run: list[QualityGate] | None = None, quality_gates_failed: list[QualityGate] | None = None, context_compactions: int = 0, context_rotations: int = 0, context_utilization: float = 0.0, retry_count: int = 0, language: str | None = "typescript", ) -> TaskCompletionEvent: """Build a TaskCompletionEvent for a coordinator task. Provides sensible defaults for the coordinator context (Claude Code harness, Anthropic provider, TypeScript language). Args: instance_id: UUID identifying this coordinator instance. task_type: The kind of task that was performed. complexity: Complexity level of the task. outcome: Whether the task succeeded, failed, etc. duration_ms: Task duration in milliseconds. model: The AI model used. provider: The AI model provider. harness: The coding harness used. estimated_input_tokens: Estimated input token count. estimated_output_tokens: Estimated output token count. actual_input_tokens: Actual input token count. actual_output_tokens: Actual output token count. estimated_cost_micros: Estimated cost in USD micros. actual_cost_micros: Actual cost in USD micros. quality_passed: Whether all quality gates passed. quality_gates_run: List of quality gates that were executed. quality_gates_failed: List of quality gates that failed. context_compactions: Number of context compactions during the task. context_rotations: Number of context rotations during the task. context_utilization: Final context window utilization (0.0-1.0). retry_count: Number of retries before the task completed. language: Primary programming language (default: typescript). Returns: A fully populated TaskCompletionEvent ready to be tracked. """ builder = ( EventBuilder(instance_id=instance_id) .task_type(task_type) .complexity_level(complexity) .harness_type(harness) .model(model) .provider(provider) .duration_ms(duration_ms) .outcome_value(outcome) .tokens( estimated_in=estimated_input_tokens, estimated_out=estimated_output_tokens, actual_in=actual_input_tokens, actual_out=actual_output_tokens, ) .cost(estimated=estimated_cost_micros, actual=actual_cost_micros) .quality( passed=quality_passed, gates_run=quality_gates_run or [], gates_failed=quality_gates_failed or [], ) .context( compactions=context_compactions, rotations=context_rotations, utilization=context_utilization, ) .retry_count(retry_count) .language(language) ) return builder.build() def create_telemetry_config() -> TelemetryConfig: """Create a TelemetryConfig instance. The config reads from MOSAIC_TELEMETRY_* environment variables automatically. Validation warnings are logged but do not prevent creation. Returns: A TelemetryConfig instance with env-var overrides applied. """ config = TelemetryConfig() errors = config.validate() if errors and config.enabled: logger.warning( "Mosaic telemetry config has validation issues (telemetry may not submit): %s", "; ".join(errors), ) return config