feat(#370): install mosaicstack-telemetry in Coordinator
- Add mosaicstack-telemetry>=0.1.0 to pyproject.toml dependencies - Configure Gitea PyPI registry via pip.conf (extra-index-url) - Integrate TelemetryClient in FastAPI lifespan (start_async/stop_async) - Store client on app.state.mosaic_telemetry for downstream access - Create mosaic_telemetry.py helper module with: - get_telemetry_client(): retrieve client from app state - build_task_event(): construct TaskCompletionEvent with coordinator defaults - create_telemetry_config(): create config from MOSAIC_TELEMETRY_* env vars - Add 28 unit tests covering config, helpers, disabled mode, and lifespan - New module has 100% test coverage Refs #370 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI
|
||||
from mosaicstack_telemetry import TelemetryClient # type: ignore[import-untyped]
|
||||
from pydantic import BaseModel
|
||||
from slowapi import Limiter, _rate_limit_exceeded_handler
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
@@ -18,6 +19,7 @@ from starlette.responses import Response
|
||||
|
||||
from .config import settings
|
||||
from .coordinator import Coordinator
|
||||
from .mosaic_telemetry import create_telemetry_config
|
||||
from .queue import QueueManager
|
||||
from .telemetry import TelemetryService, shutdown_telemetry
|
||||
from .webhook import router as webhook_router
|
||||
@@ -76,6 +78,18 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
|
||||
telemetry_service.initialize()
|
||||
logger.info("OpenTelemetry telemetry initialized")
|
||||
|
||||
# Initialize Mosaic telemetry client
|
||||
mosaic_telemetry_config = create_telemetry_config()
|
||||
mosaic_telemetry_client: TelemetryClient | None = None
|
||||
if mosaic_telemetry_config.enabled:
|
||||
mosaic_telemetry_client = TelemetryClient(mosaic_telemetry_config)
|
||||
await mosaic_telemetry_client.start_async()
|
||||
app.state.mosaic_telemetry = mosaic_telemetry_client
|
||||
logger.info("Mosaic telemetry client started")
|
||||
else:
|
||||
app.state.mosaic_telemetry = None
|
||||
logger.info("Mosaic telemetry disabled via configuration")
|
||||
|
||||
# Initialize queue manager
|
||||
queue_file = Path("queue.json")
|
||||
queue_manager = QueueManager(queue_file=queue_file)
|
||||
@@ -115,6 +129,11 @@ async def lifespan(app: FastAPI) -> AsyncIterator[dict[str, Any]]:
|
||||
pass
|
||||
logger.info("Coordinator stopped")
|
||||
|
||||
# Shutdown Mosaic telemetry client
|
||||
if mosaic_telemetry_client is not None:
|
||||
await mosaic_telemetry_client.stop_async()
|
||||
logger.info("Mosaic telemetry client stopped")
|
||||
|
||||
# Shutdown OpenTelemetry
|
||||
if telemetry_enabled:
|
||||
shutdown_telemetry()
|
||||
|
||||
157
apps/coordinator/src/mosaic_telemetry.py
Normal file
157
apps/coordinator/src/mosaic_telemetry.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""Mosaic Stack telemetry integration for the Coordinator.
|
||||
|
||||
This module provides helpers for tracking task completion events using the
|
||||
mosaicstack-telemetry SDK. It is separate from the OpenTelemetry distributed
|
||||
tracing configured in telemetry.py.
|
||||
|
||||
Environment variables (auto-read by the SDK):
|
||||
MOSAIC_TELEMETRY_ENABLED: Enable/disable telemetry (default: true)
|
||||
MOSAIC_TELEMETRY_SERVER_URL: Telemetry server endpoint
|
||||
MOSAIC_TELEMETRY_API_KEY: API key for authentication
|
||||
MOSAIC_TELEMETRY_INSTANCE_ID: UUID identifying this coordinator instance
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from mosaicstack_telemetry import (
|
||||
Complexity,
|
||||
EventBuilder,
|
||||
Harness,
|
||||
Outcome,
|
||||
Provider,
|
||||
QualityGate,
|
||||
TaskType,
|
||||
TelemetryClient,
|
||||
TelemetryConfig,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import FastAPI
|
||||
from mosaicstack_telemetry import TaskCompletionEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_telemetry_client(app: FastAPI) -> TelemetryClient | None:
|
||||
"""Retrieve the Mosaic telemetry client from FastAPI app state.
|
||||
|
||||
Args:
|
||||
app: The FastAPI application instance.
|
||||
|
||||
Returns:
|
||||
The TelemetryClient if initialised and telemetry is enabled,
|
||||
or None if telemetry is disabled or not yet initialised.
|
||||
"""
|
||||
client: TelemetryClient | None = getattr(app.state, "mosaic_telemetry", None)
|
||||
return client
|
||||
|
||||
|
||||
def build_task_event(
|
||||
*,
|
||||
instance_id: str,
|
||||
task_type: TaskType = TaskType.IMPLEMENTATION,
|
||||
complexity: Complexity = Complexity.MEDIUM,
|
||||
outcome: Outcome = Outcome.SUCCESS,
|
||||
duration_ms: int = 0,
|
||||
model: str = "claude-sonnet-4-20250514",
|
||||
provider: Provider = Provider.ANTHROPIC,
|
||||
harness: Harness = Harness.CLAUDE_CODE,
|
||||
estimated_input_tokens: int = 0,
|
||||
estimated_output_tokens: int = 0,
|
||||
actual_input_tokens: int = 0,
|
||||
actual_output_tokens: int = 0,
|
||||
estimated_cost_micros: int = 0,
|
||||
actual_cost_micros: int = 0,
|
||||
quality_passed: bool = False,
|
||||
quality_gates_run: list[QualityGate] | None = None,
|
||||
quality_gates_failed: list[QualityGate] | None = None,
|
||||
context_compactions: int = 0,
|
||||
context_rotations: int = 0,
|
||||
context_utilization: float = 0.0,
|
||||
retry_count: int = 0,
|
||||
language: str | None = "typescript",
|
||||
) -> TaskCompletionEvent:
|
||||
"""Build a TaskCompletionEvent for a coordinator task.
|
||||
|
||||
Provides sensible defaults for the coordinator context (Claude Code harness,
|
||||
Anthropic provider, TypeScript language).
|
||||
|
||||
Args:
|
||||
instance_id: UUID identifying this coordinator instance.
|
||||
task_type: The kind of task that was performed.
|
||||
complexity: Complexity level of the task.
|
||||
outcome: Whether the task succeeded, failed, etc.
|
||||
duration_ms: Task duration in milliseconds.
|
||||
model: The AI model used.
|
||||
provider: The AI model provider.
|
||||
harness: The coding harness used.
|
||||
estimated_input_tokens: Estimated input token count.
|
||||
estimated_output_tokens: Estimated output token count.
|
||||
actual_input_tokens: Actual input token count.
|
||||
actual_output_tokens: Actual output token count.
|
||||
estimated_cost_micros: Estimated cost in USD micros.
|
||||
actual_cost_micros: Actual cost in USD micros.
|
||||
quality_passed: Whether all quality gates passed.
|
||||
quality_gates_run: List of quality gates that were executed.
|
||||
quality_gates_failed: List of quality gates that failed.
|
||||
context_compactions: Number of context compactions during the task.
|
||||
context_rotations: Number of context rotations during the task.
|
||||
context_utilization: Final context window utilization (0.0-1.0).
|
||||
retry_count: Number of retries before the task completed.
|
||||
language: Primary programming language (default: typescript).
|
||||
|
||||
Returns:
|
||||
A fully populated TaskCompletionEvent ready to be tracked.
|
||||
"""
|
||||
builder = (
|
||||
EventBuilder(instance_id=instance_id)
|
||||
.task_type(task_type)
|
||||
.complexity_level(complexity)
|
||||
.harness_type(harness)
|
||||
.model(model)
|
||||
.provider(provider)
|
||||
.duration_ms(duration_ms)
|
||||
.outcome_value(outcome)
|
||||
.tokens(
|
||||
estimated_in=estimated_input_tokens,
|
||||
estimated_out=estimated_output_tokens,
|
||||
actual_in=actual_input_tokens,
|
||||
actual_out=actual_output_tokens,
|
||||
)
|
||||
.cost(estimated=estimated_cost_micros, actual=actual_cost_micros)
|
||||
.quality(
|
||||
passed=quality_passed,
|
||||
gates_run=quality_gates_run or [],
|
||||
gates_failed=quality_gates_failed or [],
|
||||
)
|
||||
.context(
|
||||
compactions=context_compactions,
|
||||
rotations=context_rotations,
|
||||
utilization=context_utilization,
|
||||
)
|
||||
.retry_count(retry_count)
|
||||
.language(language)
|
||||
)
|
||||
return builder.build()
|
||||
|
||||
|
||||
def create_telemetry_config() -> TelemetryConfig:
|
||||
"""Create a TelemetryConfig instance.
|
||||
|
||||
The config reads from MOSAIC_TELEMETRY_* environment variables automatically.
|
||||
Validation warnings are logged but do not prevent creation.
|
||||
|
||||
Returns:
|
||||
A TelemetryConfig instance with env-var overrides applied.
|
||||
"""
|
||||
config = TelemetryConfig()
|
||||
errors = config.validate()
|
||||
if errors and config.enabled:
|
||||
logger.warning(
|
||||
"Mosaic telemetry config has validation issues (telemetry may not submit): %s",
|
||||
"; ".join(errors),
|
||||
)
|
||||
return config
|
||||
Reference in New Issue
Block a user