# mosaicstack-telemetry Python client SDK for [Mosaic Stack Telemetry](https://github.com/mosaicstack/telemetry). Report AI coding task-completion telemetry and query crowd-sourced predictions for token usage, cost, and quality outcomes. ## Installation ```bash pip install mosaicstack-telemetry # or uv add mosaicstack-telemetry ``` ## Quick Start (Sync) Best for scripts, aider integrations, and non-async contexts: ```python from mosaicstack_telemetry import ( TelemetryClient, TelemetryConfig, EventBuilder, TaskType, Provider, Harness, Complexity, Outcome, QualityGate, ) config = TelemetryConfig( server_url="https://telemetry.mosaicstack.dev", api_key="your-64-char-hex-api-key-here...", instance_id="your-uuid-instance-id", ) client = TelemetryClient(config) client.start() # Starts background submission thread # Build and track an event event = ( EventBuilder(instance_id=config.instance_id) .task_type(TaskType.IMPLEMENTATION) .model("claude-sonnet-4-20250514") .provider(Provider.ANTHROPIC) .harness_type(Harness.AIDER) .complexity_level(Complexity.MEDIUM) .outcome_value(Outcome.SUCCESS) .duration_ms(45000) .tokens(estimated_in=5000, estimated_out=2000, actual_in=5200, actual_out=1800) .cost(estimated=50000, actual=48000) .quality(passed=True, gates_run=[QualityGate.LINT, QualityGate.TEST]) .context(compactions=0, rotations=0, utilization=0.4) .language("python") .build() ) client.track(event) # Non-blocking, thread-safe # When done client.stop() # Flushes remaining events ``` ## Async Usage For asyncio-based applications: ```python import asyncio from mosaicstack_telemetry import TelemetryClient, TelemetryConfig async def main(): config = TelemetryConfig( server_url="https://telemetry.mosaicstack.dev", api_key="your-64-char-hex-api-key-here...", instance_id="your-uuid-instance-id", ) client = TelemetryClient(config) await client.start_async() # Starts asyncio background task # track() is always synchronous client.track(event) await client.stop_async() # Flushes remaining events asyncio.run(main()) ``` ## Context Manager Both sync and async context managers are supported: ```python # Sync with TelemetryClient(config) as client: client.track(event) # Async async with TelemetryClient(config) as client: client.track(event) ``` ## Configuration via Environment Variables All core settings can be set via environment variables: ```bash export MOSAIC_TELEMETRY_ENABLED=true export MOSAIC_TELEMETRY_SERVER_URL=https://telemetry.mosaicstack.dev export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key export MOSAIC_TELEMETRY_INSTANCE_ID=your-uuid-instance-id ``` Then create a config with defaults: ```python config = TelemetryConfig() # Picks up env vars automatically ``` Explicit constructor values take priority over environment variables. ## Querying Predictions Fetch crowd-sourced predictions for token usage, cost, and quality: ```python from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity query = PredictionQuery( task_type=TaskType.IMPLEMENTATION, model="claude-sonnet-4-20250514", provider=Provider.ANTHROPIC, complexity=Complexity.MEDIUM, ) # Async await client.refresh_predictions([query]) # Sync client.refresh_predictions_sync([query]) # Read from cache prediction = client.get_prediction(query) if prediction and prediction.prediction: print(f"Expected input tokens (median): {prediction.prediction.input_tokens.median}") print(f"Expected cost (median): ${prediction.prediction.cost_usd_micros['median'] / 1_000_000:.4f}") print(f"Quality gate pass rate: {prediction.prediction.quality.gate_pass_rate:.0%}") ``` ## Dry-Run Mode Test your integration without sending data to the server: ```python config = TelemetryConfig( server_url="https://telemetry.mosaicstack.dev", api_key="a" * 64, instance_id="12345678-1234-1234-1234-123456789abc", dry_run=True, # Logs batches but doesn't send ) ``` ## Configuration Reference | Parameter | Default | Description | |-----------|---------|-------------| | `server_url` | (required) | Telemetry server base URL | | `api_key` | (required) | 64-character hex API key | | `instance_id` | (required) | UUID identifying this instance | | `enabled` | `True` | Enable/disable telemetry | | `submit_interval_seconds` | `300.0` | Background flush interval | | `max_queue_size` | `1000` | Max events in memory queue | | `batch_size` | `100` | Events per batch (server max) | | `request_timeout_seconds` | `10.0` | HTTP request timeout | | `prediction_cache_ttl_seconds` | `21600.0` | Prediction cache TTL (6h) | | `dry_run` | `False` | Log but don't send | | `max_retries` | `3` | Retries on failure | ## License MPL-2.0