All checks were successful
ci/woodpecker/push/coordinator Pipeline was successful
The mosaicstack-telemetry package lacks py.typed marker. Add type ignore comment consistent with other import sites. Refs #370 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
"""Mosaic Stack telemetry integration for the Coordinator.
|
|
|
|
This module provides helpers for tracking task completion events using the
|
|
mosaicstack-telemetry SDK. It is separate from the OpenTelemetry distributed
|
|
tracing configured in telemetry.py.
|
|
|
|
Environment variables (auto-read by the SDK):
|
|
MOSAIC_TELEMETRY_ENABLED: Enable/disable telemetry (default: true)
|
|
MOSAIC_TELEMETRY_SERVER_URL: Telemetry server endpoint
|
|
MOSAIC_TELEMETRY_API_KEY: API key for authentication
|
|
MOSAIC_TELEMETRY_INSTANCE_ID: UUID identifying this coordinator instance
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
from mosaicstack_telemetry import ( # type: ignore[import-untyped]
|
|
Complexity,
|
|
EventBuilder,
|
|
Harness,
|
|
Outcome,
|
|
Provider,
|
|
QualityGate,
|
|
TaskType,
|
|
TelemetryClient,
|
|
TelemetryConfig,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import FastAPI
|
|
from mosaicstack_telemetry import TaskCompletionEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_telemetry_client(app: FastAPI) -> TelemetryClient | None:
|
|
"""Retrieve the Mosaic telemetry client from FastAPI app state.
|
|
|
|
Args:
|
|
app: The FastAPI application instance.
|
|
|
|
Returns:
|
|
The TelemetryClient if initialised and telemetry is enabled,
|
|
or None if telemetry is disabled or not yet initialised.
|
|
"""
|
|
client: TelemetryClient | None = getattr(app.state, "mosaic_telemetry", None)
|
|
return client
|
|
|
|
|
|
def build_task_event(
|
|
*,
|
|
instance_id: str,
|
|
task_type: TaskType = TaskType.IMPLEMENTATION,
|
|
complexity: Complexity = Complexity.MEDIUM,
|
|
outcome: Outcome = Outcome.SUCCESS,
|
|
duration_ms: int = 0,
|
|
model: str = "claude-sonnet-4-20250514",
|
|
provider: Provider = Provider.ANTHROPIC,
|
|
harness: Harness = Harness.CLAUDE_CODE,
|
|
estimated_input_tokens: int = 0,
|
|
estimated_output_tokens: int = 0,
|
|
actual_input_tokens: int = 0,
|
|
actual_output_tokens: int = 0,
|
|
estimated_cost_micros: int = 0,
|
|
actual_cost_micros: int = 0,
|
|
quality_passed: bool = False,
|
|
quality_gates_run: list[QualityGate] | None = None,
|
|
quality_gates_failed: list[QualityGate] | None = None,
|
|
context_compactions: int = 0,
|
|
context_rotations: int = 0,
|
|
context_utilization: float = 0.0,
|
|
retry_count: int = 0,
|
|
language: str | None = "typescript",
|
|
) -> TaskCompletionEvent:
|
|
"""Build a TaskCompletionEvent for a coordinator task.
|
|
|
|
Provides sensible defaults for the coordinator context (Claude Code harness,
|
|
Anthropic provider, TypeScript language).
|
|
|
|
Args:
|
|
instance_id: UUID identifying this coordinator instance.
|
|
task_type: The kind of task that was performed.
|
|
complexity: Complexity level of the task.
|
|
outcome: Whether the task succeeded, failed, etc.
|
|
duration_ms: Task duration in milliseconds.
|
|
model: The AI model used.
|
|
provider: The AI model provider.
|
|
harness: The coding harness used.
|
|
estimated_input_tokens: Estimated input token count.
|
|
estimated_output_tokens: Estimated output token count.
|
|
actual_input_tokens: Actual input token count.
|
|
actual_output_tokens: Actual output token count.
|
|
estimated_cost_micros: Estimated cost in USD micros.
|
|
actual_cost_micros: Actual cost in USD micros.
|
|
quality_passed: Whether all quality gates passed.
|
|
quality_gates_run: List of quality gates that were executed.
|
|
quality_gates_failed: List of quality gates that failed.
|
|
context_compactions: Number of context compactions during the task.
|
|
context_rotations: Number of context rotations during the task.
|
|
context_utilization: Final context window utilization (0.0-1.0).
|
|
retry_count: Number of retries before the task completed.
|
|
language: Primary programming language (default: typescript).
|
|
|
|
Returns:
|
|
A fully populated TaskCompletionEvent ready to be tracked.
|
|
"""
|
|
builder = (
|
|
EventBuilder(instance_id=instance_id)
|
|
.task_type(task_type)
|
|
.complexity_level(complexity)
|
|
.harness_type(harness)
|
|
.model(model)
|
|
.provider(provider)
|
|
.duration_ms(duration_ms)
|
|
.outcome_value(outcome)
|
|
.tokens(
|
|
estimated_in=estimated_input_tokens,
|
|
estimated_out=estimated_output_tokens,
|
|
actual_in=actual_input_tokens,
|
|
actual_out=actual_output_tokens,
|
|
)
|
|
.cost(estimated=estimated_cost_micros, actual=actual_cost_micros)
|
|
.quality(
|
|
passed=quality_passed,
|
|
gates_run=quality_gates_run or [],
|
|
gates_failed=quality_gates_failed or [],
|
|
)
|
|
.context(
|
|
compactions=context_compactions,
|
|
rotations=context_rotations,
|
|
utilization=context_utilization,
|
|
)
|
|
.retry_count(retry_count)
|
|
.language(language)
|
|
)
|
|
return builder.build()
|
|
|
|
|
|
def create_telemetry_config() -> TelemetryConfig:
|
|
"""Create a TelemetryConfig instance.
|
|
|
|
The config reads from MOSAIC_TELEMETRY_* environment variables automatically.
|
|
Validation warnings are logged but do not prevent creation.
|
|
|
|
Returns:
|
|
A TelemetryConfig instance with env-var overrides applied.
|
|
"""
|
|
config = TelemetryConfig()
|
|
errors = config.validate()
|
|
if errors and config.enabled:
|
|
logger.warning(
|
|
"Mosaic telemetry config has validation issues (telemetry may not submit): %s",
|
|
"; ".join(errors),
|
|
)
|
|
return config
|