Standalone Python package (mosaicstack-telemetry) for reporting task-completion telemetry and querying predictions from the Mosaic Stack Telemetry server. - Sync/async TelemetryClient with context manager support - Thread-safe EventQueue with bounded deque - BatchSubmitter with httpx, exponential backoff, Retry-After - PredictionCache with TTL - EventBuilder convenience class - All types standalone (no server dependency) - 55 tests, 90% coverage, mypy strict clean Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
mosaicstack-telemetry
Python client SDK for Mosaic Stack Telemetry. Report AI coding task-completion telemetry and query crowd-sourced predictions for token usage, cost, and quality outcomes.
Installation
pip install mosaicstack-telemetry
# or
uv add mosaicstack-telemetry
Quick Start (Sync)
Best for scripts, aider integrations, and non-async contexts:
from mosaicstack_telemetry import (
TelemetryClient,
TelemetryConfig,
EventBuilder,
TaskType,
Provider,
Harness,
Complexity,
Outcome,
QualityGate,
)
config = TelemetryConfig(
server_url="https://telemetry.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="your-uuid-instance-id",
)
client = TelemetryClient(config)
client.start() # Starts background submission thread
# Build and track an event
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-20250514")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.AIDER)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome.SUCCESS)
.duration_ms(45000)
.tokens(estimated_in=5000, estimated_out=2000, actual_in=5200, actual_out=1800)
.cost(estimated=50000, actual=48000)
.quality(passed=True, gates_run=[QualityGate.LINT, QualityGate.TEST])
.context(compactions=0, rotations=0, utilization=0.4)
.language("python")
.build()
)
client.track(event) # Non-blocking, thread-safe
# When done
client.stop() # Flushes remaining events
Async Usage
For asyncio-based applications:
import asyncio
from mosaicstack_telemetry import TelemetryClient, TelemetryConfig
async def main():
config = TelemetryConfig(
server_url="https://telemetry.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="your-uuid-instance-id",
)
client = TelemetryClient(config)
await client.start_async() # Starts asyncio background task
# track() is always synchronous
client.track(event)
await client.stop_async() # Flushes remaining events
asyncio.run(main())
Context Manager
Both sync and async context managers are supported:
# Sync
with TelemetryClient(config) as client:
client.track(event)
# Async
async with TelemetryClient(config) as client:
client.track(event)
Configuration via Environment Variables
All core settings can be set via environment variables:
export MOSAIC_TELEMETRY_ENABLED=true
export MOSAIC_TELEMETRY_SERVER_URL=https://telemetry.mosaicstack.dev
export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key
export MOSAIC_TELEMETRY_INSTANCE_ID=your-uuid-instance-id
Then create a config with defaults:
config = TelemetryConfig() # Picks up env vars automatically
Explicit constructor values take priority over environment variables.
Querying Predictions
Fetch crowd-sourced predictions for token usage, cost, and quality:
from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity
query = PredictionQuery(
task_type=TaskType.IMPLEMENTATION,
model="claude-sonnet-4-20250514",
provider=Provider.ANTHROPIC,
complexity=Complexity.MEDIUM,
)
# Async
await client.refresh_predictions([query])
# Sync
client.refresh_predictions_sync([query])
# Read from cache
prediction = client.get_prediction(query)
if prediction and prediction.prediction:
print(f"Expected input tokens (median): {prediction.prediction.input_tokens.median}")
print(f"Expected cost (median): ${prediction.prediction.cost_usd_micros['median'] / 1_000_000:.4f}")
print(f"Quality gate pass rate: {prediction.prediction.quality.gate_pass_rate:.0%}")
Dry-Run Mode
Test your integration without sending data to the server:
config = TelemetryConfig(
server_url="https://telemetry.mosaicstack.dev",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True, # Logs batches but doesn't send
)
Configuration Reference
| Parameter | Default | Description |
|---|---|---|
server_url |
(required) | Telemetry server base URL |
api_key |
(required) | 64-character hex API key |
instance_id |
(required) | UUID identifying this instance |
enabled |
True |
Enable/disable telemetry |
submit_interval_seconds |
300.0 |
Background flush interval |
max_queue_size |
1000 |
Max events in memory queue |
batch_size |
100 |
Events per batch (server max) |
request_timeout_seconds |
10.0 |
HTTP request timeout |
prediction_cache_ttl_seconds |
21600.0 |
Prediction cache TTL (6h) |
dry_run |
False |
Log but don't send |
max_retries |
3 |
Retries on failure |
License
MPL-2.0
Description
Languages
Python
100%