From 8d8d37dbf9e53559d3c00147e1d27f0ef7cd626d Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Feb 2026 01:33:54 -0600 Subject: [PATCH] feat(#370): install mosaicstack-telemetry in Coordinator - Add mosaicstack-telemetry>=0.1.0 to pyproject.toml dependencies - Configure Gitea PyPI registry via pip.conf (extra-index-url) - Integrate TelemetryClient in FastAPI lifespan (start_async/stop_async) - Store client on app.state.mosaic_telemetry for downstream access - Create mosaic_telemetry.py helper module with: - get_telemetry_client(): retrieve client from app state - build_task_event(): construct TaskCompletionEvent with coordinator defaults - create_telemetry_config(): create config from MOSAIC_TELEMETRY_* env vars - Add 28 unit tests covering config, helpers, disabled mode, and lifespan - New module has 100% test coverage Refs #370 Co-Authored-By: Claude Opus 4.6 --- apps/coordinator/pip.conf | 2 + apps/coordinator/pyproject.toml | 1 + apps/coordinator/src/main.py | 19 + apps/coordinator/src/mosaic_telemetry.py | 157 +++++++ .../tests/test_mosaic_telemetry.py | 426 ++++++++++++++++++ 5 files changed, 605 insertions(+) create mode 100644 apps/coordinator/pip.conf create mode 100644 apps/coordinator/src/mosaic_telemetry.py create mode 100644 apps/coordinator/tests/test_mosaic_telemetry.py diff --git a/apps/coordinator/pip.conf b/apps/coordinator/pip.conf new file mode 100644 index 0000000..a421c56 --- /dev/null +++ b/apps/coordinator/pip.conf @@ -0,0 +1,2 @@ +[global] +extra-index-url = https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/ diff --git a/apps/coordinator/pyproject.toml b/apps/coordinator/pyproject.toml index 62b6704..ca20f48 100644 --- a/apps/coordinator/pyproject.toml +++ b/apps/coordinator/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "opentelemetry-sdk>=1.20.0", "opentelemetry-instrumentation-fastapi>=0.41b0", "opentelemetry-exporter-otlp>=1.20.0", + "mosaicstack-telemetry>=0.1.0", ] [project.optional-dependencies] diff --git a/apps/coordinator/src/main.py b/apps/coordinator/src/main.py index f1f378c..8f345b5 100644 --- a/apps/coordinator/src/main.py +++ b/apps/coordinator/src/main.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import Any from fastapi import FastAPI +from mosaicstack_telemetry import TelemetryClient # type: ignore[import-untyped] from pydantic import BaseModel from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded @@ -18,6 +19,7 @@ from starlette.responses import Response from .config import settings from .coordinator import Coordinator +from .mosaic_telemetry import create_telemetry_config from .queue import QueueManager from .telemetry import TelemetryService, shutdown_telemetry from .webhook import router as webhook_router @@ -76,6 +78,18 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]: telemetry_service.initialize() logger.info("OpenTelemetry telemetry initialized") + # Initialize Mosaic telemetry client + mosaic_telemetry_config = create_telemetry_config() + mosaic_telemetry_client: TelemetryClient | None = None + if mosaic_telemetry_config.enabled: + mosaic_telemetry_client = TelemetryClient(mosaic_telemetry_config) + await mosaic_telemetry_client.start_async() + app.state.mosaic_telemetry = mosaic_telemetry_client + logger.info("Mosaic telemetry client started") + else: + app.state.mosaic_telemetry = None + logger.info("Mosaic telemetry disabled via configuration") + # Initialize queue manager queue_file = Path("queue.json") queue_manager = QueueManager(queue_file=queue_file) @@ -115,6 +129,11 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]: pass logger.info("Coordinator stopped") + # Shutdown Mosaic telemetry client + if mosaic_telemetry_client is not None: + await mosaic_telemetry_client.stop_async() + logger.info("Mosaic telemetry client stopped") + # Shutdown OpenTelemetry if telemetry_enabled: shutdown_telemetry() diff --git a/apps/coordinator/src/mosaic_telemetry.py b/apps/coordinator/src/mosaic_telemetry.py new file mode 100644 index 0000000..c4793fd --- /dev/null +++ b/apps/coordinator/src/mosaic_telemetry.py @@ -0,0 +1,157 @@ +"""Mosaic Stack telemetry integration for the Coordinator. + +This module provides helpers for tracking task completion events using the +mosaicstack-telemetry SDK. It is separate from the OpenTelemetry distributed +tracing configured in telemetry.py. + +Environment variables (auto-read by the SDK): + MOSAIC_TELEMETRY_ENABLED: Enable/disable telemetry (default: true) + MOSAIC_TELEMETRY_SERVER_URL: Telemetry server endpoint + MOSAIC_TELEMETRY_API_KEY: API key for authentication + MOSAIC_TELEMETRY_INSTANCE_ID: UUID identifying this coordinator instance +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from mosaicstack_telemetry import ( + Complexity, + EventBuilder, + Harness, + Outcome, + Provider, + QualityGate, + TaskType, + TelemetryClient, + TelemetryConfig, +) + +if TYPE_CHECKING: + from fastapi import FastAPI + from mosaicstack_telemetry import TaskCompletionEvent + +logger = logging.getLogger(__name__) + + +def get_telemetry_client(app: FastAPI) -> TelemetryClient | None: + """Retrieve the Mosaic telemetry client from FastAPI app state. + + Args: + app: The FastAPI application instance. + + Returns: + The TelemetryClient if initialised and telemetry is enabled, + or None if telemetry is disabled or not yet initialised. + """ + client: TelemetryClient | None = getattr(app.state, "mosaic_telemetry", None) + return client + + +def build_task_event( + *, + instance_id: str, + task_type: TaskType = TaskType.IMPLEMENTATION, + complexity: Complexity = Complexity.MEDIUM, + outcome: Outcome = Outcome.SUCCESS, + duration_ms: int = 0, + model: str = "claude-sonnet-4-20250514", + provider: Provider = Provider.ANTHROPIC, + harness: Harness = Harness.CLAUDE_CODE, + estimated_input_tokens: int = 0, + estimated_output_tokens: int = 0, + actual_input_tokens: int = 0, + actual_output_tokens: int = 0, + estimated_cost_micros: int = 0, + actual_cost_micros: int = 0, + quality_passed: bool = False, + quality_gates_run: list[QualityGate] | None = None, + quality_gates_failed: list[QualityGate] | None = None, + context_compactions: int = 0, + context_rotations: int = 0, + context_utilization: float = 0.0, + retry_count: int = 0, + language: str | None = "typescript", +) -> TaskCompletionEvent: + """Build a TaskCompletionEvent for a coordinator task. + + Provides sensible defaults for the coordinator context (Claude Code harness, + Anthropic provider, TypeScript language). + + Args: + instance_id: UUID identifying this coordinator instance. + task_type: The kind of task that was performed. + complexity: Complexity level of the task. + outcome: Whether the task succeeded, failed, etc. + duration_ms: Task duration in milliseconds. + model: The AI model used. + provider: The AI model provider. + harness: The coding harness used. + estimated_input_tokens: Estimated input token count. + estimated_output_tokens: Estimated output token count. + actual_input_tokens: Actual input token count. + actual_output_tokens: Actual output token count. + estimated_cost_micros: Estimated cost in USD micros. + actual_cost_micros: Actual cost in USD micros. + quality_passed: Whether all quality gates passed. + quality_gates_run: List of quality gates that were executed. + quality_gates_failed: List of quality gates that failed. + context_compactions: Number of context compactions during the task. + context_rotations: Number of context rotations during the task. + context_utilization: Final context window utilization (0.0-1.0). + retry_count: Number of retries before the task completed. + language: Primary programming language (default: typescript). + + Returns: + A fully populated TaskCompletionEvent ready to be tracked. + """ + builder = ( + EventBuilder(instance_id=instance_id) + .task_type(task_type) + .complexity_level(complexity) + .harness_type(harness) + .model(model) + .provider(provider) + .duration_ms(duration_ms) + .outcome_value(outcome) + .tokens( + estimated_in=estimated_input_tokens, + estimated_out=estimated_output_tokens, + actual_in=actual_input_tokens, + actual_out=actual_output_tokens, + ) + .cost(estimated=estimated_cost_micros, actual=actual_cost_micros) + .quality( + passed=quality_passed, + gates_run=quality_gates_run or [], + gates_failed=quality_gates_failed or [], + ) + .context( + compactions=context_compactions, + rotations=context_rotations, + utilization=context_utilization, + ) + .retry_count(retry_count) + .language(language) + ) + return builder.build() + + +def create_telemetry_config() -> TelemetryConfig: + """Create a TelemetryConfig instance. + + The config reads from MOSAIC_TELEMETRY_* environment variables automatically. + Validation warnings are logged but do not prevent creation. + + Returns: + A TelemetryConfig instance with env-var overrides applied. + """ + config = TelemetryConfig() + errors = config.validate() + if errors and config.enabled: + logger.warning( + "Mosaic telemetry config has validation issues (telemetry may not submit): %s", + "; ".join(errors), + ) + return config diff --git a/apps/coordinator/tests/test_mosaic_telemetry.py b/apps/coordinator/tests/test_mosaic_telemetry.py new file mode 100644 index 0000000..cbd92e6 --- /dev/null +++ b/apps/coordinator/tests/test_mosaic_telemetry.py @@ -0,0 +1,426 @@ +"""Tests for Mosaic Stack telemetry integration (mosaic_telemetry module). + +These tests cover the mosaicstack-telemetry SDK integration, NOT the +OpenTelemetry distributed tracing (which is tested in test_telemetry.py). +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +from fastapi import FastAPI +from mosaicstack_telemetry import ( + Complexity, + Harness, + Outcome, + Provider, + QualityGate, + TaskCompletionEvent, + TaskType, + TelemetryClient, + TelemetryConfig, +) + +from src.mosaic_telemetry import ( + build_task_event, + create_telemetry_config, + get_telemetry_client, +) + +# --------------------------------------------------------------------------- +# TelemetryConfig creation from environment variables +# --------------------------------------------------------------------------- + + +class TestCreateTelemetryConfig: + """Tests for create_telemetry_config helper.""" + + def test_config_reads_enabled_from_env(self) -> None: + """TelemetryConfig should read MOSAIC_TELEMETRY_ENABLED from env.""" + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_ENABLED": "true"}, + clear=False, + ): + config = create_telemetry_config() + assert config.enabled is True + + def test_config_disabled_from_env(self) -> None: + """TelemetryConfig should be disabled when env var is false.""" + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_ENABLED": "false"}, + clear=False, + ): + config = create_telemetry_config() + assert config.enabled is False + + def test_config_reads_server_url_from_env(self) -> None: + """TelemetryConfig should read MOSAIC_TELEMETRY_SERVER_URL from env.""" + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_SERVER_URL": "https://telemetry.example.com"}, + clear=False, + ): + config = create_telemetry_config() + assert config.server_url == "https://telemetry.example.com" + + def test_config_reads_api_key_from_env(self) -> None: + """TelemetryConfig should read MOSAIC_TELEMETRY_API_KEY from env.""" + api_key = "a" * 64 # 64-char hex string + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_API_KEY": api_key}, + clear=False, + ): + config = create_telemetry_config() + assert config.api_key == api_key + + def test_config_reads_instance_id_from_env(self) -> None: + """TelemetryConfig should read MOSAIC_TELEMETRY_INSTANCE_ID from env.""" + instance_id = "12345678-1234-1234-1234-123456789abc" + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_INSTANCE_ID": instance_id}, + clear=False, + ): + config = create_telemetry_config() + assert config.instance_id == instance_id + + def test_config_defaults_to_enabled(self) -> None: + """TelemetryConfig should default to enabled when env var is not set.""" + with patch.dict( + "os.environ", + {}, + clear=True, + ): + config = create_telemetry_config() + assert config.enabled is True + + def test_config_logs_validation_warnings_when_enabled(self) -> None: + """Config creation should log warnings for validation errors when enabled.""" + with ( + patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_ENABLED": "true"}, + clear=True, + ), + patch("src.mosaic_telemetry.logger") as mock_logger, + ): + config = create_telemetry_config() + # server_url, api_key, and instance_id are all empty = validation errors + assert config.enabled is True + mock_logger.warning.assert_called_once() + warning_msg = mock_logger.warning.call_args[0][0] + assert "validation issues" in warning_msg + + def test_config_no_warnings_when_disabled(self) -> None: + """Config creation should not log warnings when telemetry is disabled.""" + with ( + patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_ENABLED": "false"}, + clear=True, + ), + patch("src.mosaic_telemetry.logger") as mock_logger, + ): + create_telemetry_config() + mock_logger.warning.assert_not_called() + + def test_config_strips_trailing_slashes(self) -> None: + """TelemetryConfig should strip trailing slashes from server_url.""" + with patch.dict( + "os.environ", + {"MOSAIC_TELEMETRY_SERVER_URL": "https://telemetry.example.com/"}, + clear=False, + ): + config = create_telemetry_config() + assert config.server_url == "https://telemetry.example.com" + + +# --------------------------------------------------------------------------- +# get_telemetry_client from app state +# --------------------------------------------------------------------------- + + +class TestGetTelemetryClient: + """Tests for get_telemetry_client helper.""" + + def test_returns_client_when_set(self) -> None: + """Should return the telemetry client from app state.""" + app = FastAPI() + mock_client = MagicMock(spec=TelemetryClient) + app.state.mosaic_telemetry = mock_client + + result = get_telemetry_client(app) + assert result is mock_client + + def test_returns_none_when_not_set(self) -> None: + """Should return None when mosaic_telemetry is not in app state.""" + app = FastAPI() + # Do not set app.state.mosaic_telemetry + + result = get_telemetry_client(app) + assert result is None + + def test_returns_none_when_explicitly_none(self) -> None: + """Should return None when mosaic_telemetry is explicitly set to None.""" + app = FastAPI() + app.state.mosaic_telemetry = None + + result = get_telemetry_client(app) + assert result is None + + +# --------------------------------------------------------------------------- +# build_task_event helper +# --------------------------------------------------------------------------- + + +class TestBuildTaskEvent: + """Tests for build_task_event helper.""" + + VALID_INSTANCE_ID = "12345678-1234-1234-1234-123456789abc" + + def test_builds_event_with_defaults(self) -> None: + """Should build a TaskCompletionEvent with default values.""" + event = build_task_event(instance_id=self.VALID_INSTANCE_ID) + + assert isinstance(event, TaskCompletionEvent) + assert str(event.instance_id) == self.VALID_INSTANCE_ID + assert event.task_type == TaskType.IMPLEMENTATION + assert event.complexity == Complexity.MEDIUM + assert event.outcome == Outcome.SUCCESS + assert event.harness == Harness.CLAUDE_CODE + assert event.provider == Provider.ANTHROPIC + assert event.language == "typescript" + + def test_builds_event_with_custom_task_type(self) -> None: + """Should respect custom task_type parameter.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + task_type=TaskType.TESTING, + ) + assert event.task_type == TaskType.TESTING + + def test_builds_event_with_custom_outcome(self) -> None: + """Should respect custom outcome parameter.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + outcome=Outcome.FAILURE, + ) + assert event.outcome == Outcome.FAILURE + + def test_builds_event_with_duration(self) -> None: + """Should set duration_ms correctly.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + duration_ms=45000, + ) + assert event.task_duration_ms == 45000 + + def test_builds_event_with_token_counts(self) -> None: + """Should set all token counts correctly.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + estimated_input_tokens=1000, + estimated_output_tokens=500, + actual_input_tokens=1100, + actual_output_tokens=480, + ) + assert event.estimated_input_tokens == 1000 + assert event.estimated_output_tokens == 500 + assert event.actual_input_tokens == 1100 + assert event.actual_output_tokens == 480 + + def test_builds_event_with_cost(self) -> None: + """Should set cost fields correctly.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + estimated_cost_micros=50000, + actual_cost_micros=48000, + ) + assert event.estimated_cost_usd_micros == 50000 + assert event.actual_cost_usd_micros == 48000 + + def test_builds_event_with_quality_gates(self) -> None: + """Should set quality gate information correctly.""" + gates_run = [QualityGate.LINT, QualityGate.TEST, QualityGate.BUILD] + gates_failed = [QualityGate.TEST] + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + quality_passed=False, + quality_gates_run=gates_run, + quality_gates_failed=gates_failed, + ) + assert event.quality_gate_passed is False + assert event.quality_gates_run == gates_run + assert event.quality_gates_failed == gates_failed + + def test_builds_event_with_context_info(self) -> None: + """Should set context compaction/rotation/utilization correctly.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + context_compactions=2, + context_rotations=1, + context_utilization=0.75, + ) + assert event.context_compactions == 2 + assert event.context_rotations == 1 + assert event.context_utilization_final == 0.75 + + def test_builds_event_with_retry_count(self) -> None: + """Should set retry count correctly.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + retry_count=3, + ) + assert event.retry_count == 3 + + def test_builds_event_with_custom_language(self) -> None: + """Should allow overriding the default language.""" + event = build_task_event( + instance_id=self.VALID_INSTANCE_ID, + language="python", + ) + assert event.language == "python" + + +# --------------------------------------------------------------------------- +# TelemetryClient lifecycle (disabled mode) +# --------------------------------------------------------------------------- + + +class TestTelemetryDisabledMode: + """Tests for disabled telemetry mode (no HTTP calls).""" + + def test_disabled_client_does_not_start(self) -> None: + """Client start_async should be a no-op when disabled.""" + config = TelemetryConfig(enabled=False) + client = TelemetryClient(config) + # Should not raise + assert client.is_running is False + + def test_disabled_client_track_is_noop(self) -> None: + """Tracking events when disabled should silently drop them.""" + config = TelemetryConfig(enabled=False) + client = TelemetryClient(config) + + event = build_task_event( + instance_id="12345678-1234-1234-1234-123456789abc", + ) + # Should not raise, should silently drop + client.track(event) + assert client.queue_size == 0 + + @pytest.mark.asyncio + async def test_disabled_client_start_stop_async(self) -> None: + """Async start/stop should be safe when disabled.""" + config = TelemetryConfig(enabled=False) + client = TelemetryClient(config) + + await client.start_async() + assert client.is_running is False + await client.stop_async() + + +# --------------------------------------------------------------------------- +# Lifespan integration +# --------------------------------------------------------------------------- + + +class TestLifespanIntegration: + """Tests for Mosaic telemetry in the FastAPI lifespan.""" + + @pytest.mark.asyncio + async def test_lifespan_sets_mosaic_telemetry_on_app_state(self) -> None: + """Lifespan should store mosaic_telemetry client on app.state.""" + with patch.dict( + "os.environ", + { + "GITEA_WEBHOOK_SECRET": "test-secret", + "GITEA_URL": "https://git.mosaicstack.dev", + "ANTHROPIC_API_KEY": "test-key", + "MOSAIC_TELEMETRY_ENABLED": "true", + "MOSAIC_TELEMETRY_SERVER_URL": "https://telemetry.example.com", + "MOSAIC_TELEMETRY_API_KEY": "a" * 64, + "MOSAIC_TELEMETRY_INSTANCE_ID": "12345678-1234-1234-1234-123456789abc", + "OTEL_ENABLED": "false", + "COORDINATOR_ENABLED": "false", + }, + ): + # Reload config to pick up test env vars + import importlib + + from src import config + importlib.reload(config) + + from src.main import lifespan + + app = FastAPI() + async with lifespan(app) as _state: + client = getattr(app.state, "mosaic_telemetry", None) + assert client is not None + assert isinstance(client, TelemetryClient) + + @pytest.mark.asyncio + async def test_lifespan_sets_none_when_disabled(self) -> None: + """Lifespan should set mosaic_telemetry to None when disabled.""" + with patch.dict( + "os.environ", + { + "GITEA_WEBHOOK_SECRET": "test-secret", + "GITEA_URL": "https://git.mosaicstack.dev", + "ANTHROPIC_API_KEY": "test-key", + "MOSAIC_TELEMETRY_ENABLED": "false", + "OTEL_ENABLED": "false", + "COORDINATOR_ENABLED": "false", + }, + ): + import importlib + + from src import config + importlib.reload(config) + + from src.main import lifespan + + app = FastAPI() + async with lifespan(app) as _state: + client = getattr(app.state, "mosaic_telemetry", None) + assert client is None + + @pytest.mark.asyncio + async def test_lifespan_stops_client_on_shutdown(self) -> None: + """Lifespan should call stop_async on shutdown.""" + with patch.dict( + "os.environ", + { + "GITEA_WEBHOOK_SECRET": "test-secret", + "GITEA_URL": "https://git.mosaicstack.dev", + "ANTHROPIC_API_KEY": "test-key", + "MOSAIC_TELEMETRY_ENABLED": "true", + "MOSAIC_TELEMETRY_SERVER_URL": "https://telemetry.example.com", + "MOSAIC_TELEMETRY_API_KEY": "a" * 64, + "MOSAIC_TELEMETRY_INSTANCE_ID": "12345678-1234-1234-1234-123456789abc", + "OTEL_ENABLED": "false", + "COORDINATOR_ENABLED": "false", + }, + ): + import importlib + + from src import config + importlib.reload(config) + + from src.main import lifespan + + app = FastAPI() + async with lifespan(app) as _state: + client = app.state.mosaic_telemetry + assert isinstance(client, TelemetryClient) + # Client was started + # After context manager exits, stop_async should have been called + + # After lifespan exits, client should no longer be running + # (stop_async was called in the shutdown section) + assert not client.is_running