- Rewrite README with quick start, FastAPI snippet, async/sync patterns, config reference with env vars, and API version targeting (v1, schema 1.0) - Add docs/integration-guide.md with full FastAPI and generic Python integration examples, environment-specific config, prediction queries, error handling, and dry-run mode documentation - Add docs/api-reference.md covering all exported classes, methods, Pydantic models, enums (TaskType, Complexity, Harness, Provider, QualityGate, Outcome, RepoSizeCategory), and internal components - Add Woodpecker CI pipeline (.woodpecker.yml) with quality gates: lint, format check, typecheck, bandit security scan, pip-audit, and pytest with 85% coverage gate - Add bandit and pip-audit to dev dependencies Fixes #1 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
mosaicstack-telemetry
Python client SDK for Mosaic Stack Telemetry. Report AI coding task-completion telemetry and query crowd-sourced predictions for token usage, cost, and quality outcomes.
Targets: Telemetry API v1 | Event schema 1.0 | Python 3.10+
Installation
pip install mosaicstack-telemetry
# or
uv add mosaicstack-telemetry
Runtime dependencies: httpx and pydantic.
Quick Start
1. Configure
Set environment variables (or pass values to the constructor):
export MOSAIC_TELEMETRY_SERVER_URL=https://tel-api.mosaicstack.dev
export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key
export MOSAIC_TELEMETRY_INSTANCE_ID=a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d
2. Track Events
from mosaicstack_telemetry import (
TelemetryClient,
TelemetryConfig,
EventBuilder,
TaskType,
Provider,
Harness,
Complexity,
Outcome,
QualityGate,
)
config = TelemetryConfig() # Reads from MOSAIC_TELEMETRY_* env vars
with TelemetryClient(config) as client:
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-5-20250929")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.CLAUDE_CODE)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome.SUCCESS)
.duration_ms(45000)
.tokens(estimated_in=105000, estimated_out=45000, actual_in=112340, actual_out=38760)
.cost(estimated=630000, actual=919200) # Microdollars (1 USD = 1,000,000)
.quality(passed=True, gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST])
.context(compactions=2, rotations=0, utilization=0.72)
.language("typescript")
.build()
)
client.track(event) # Non-blocking, thread-safe, never throws
track() queues the event in memory. A background thread flushes batches to the server every 5 minutes (configurable). The context manager ensures all events are flushed on exit.
Async Usage
For asyncio applications (FastAPI, aiohttp, etc.):
async with TelemetryClient(config) as client:
client.track(event) # track() is always synchronous
Or manually:
client = TelemetryClient(config)
await client.start_async()
client.track(event)
await client.stop_async()
FastAPI Integration
from contextlib import asynccontextmanager
from fastapi import FastAPI
from mosaicstack_telemetry import TelemetryClient, TelemetryConfig
config = TelemetryConfig() # From env vars
telemetry = TelemetryClient(config)
@asynccontextmanager
async def lifespan(app: FastAPI):
await telemetry.start_async()
yield
await telemetry.stop_async()
app = FastAPI(lifespan=lifespan)
@app.post("/tasks/complete")
async def complete_task():
# ... build event from request data ...
telemetry.track(event)
return {"status": "tracked"}
See the Integration Guide for the full FastAPI example.
Querying Predictions
Fetch crowd-sourced predictions for token usage, cost, and quality:
from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity
query = PredictionQuery(
task_type=TaskType.IMPLEMENTATION,
model="claude-sonnet-4-5-20250929",
provider=Provider.ANTHROPIC,
complexity=Complexity.MEDIUM,
)
# Sync
client.refresh_predictions_sync([query])
# Async
await client.refresh_predictions([query])
# Read from cache
prediction = client.get_prediction(query)
if prediction and prediction.prediction:
print(f"Median input tokens: {prediction.prediction.input_tokens.median}")
print(f"Median cost: ${prediction.prediction.cost_usd_micros['median'] / 1_000_000:.4f}")
print(f"Quality gate pass rate: {prediction.prediction.quality.gate_pass_rate:.0%}")
Configuration Reference
| Parameter | Default | Env Var | Description |
|---|---|---|---|
server_url |
(required) | MOSAIC_TELEMETRY_SERVER_URL |
Telemetry API base URL |
api_key |
(required) | MOSAIC_TELEMETRY_API_KEY |
64-character hex API key |
instance_id |
(required) | MOSAIC_TELEMETRY_INSTANCE_ID |
UUID identifying this instance |
enabled |
True |
MOSAIC_TELEMETRY_ENABLED |
Enable/disable telemetry |
submit_interval_seconds |
300.0 |
-- | Background flush interval (seconds) |
max_queue_size |
1000 |
-- | Max events in memory queue |
batch_size |
100 |
-- | Events per batch (server max: 100) |
request_timeout_seconds |
10.0 |
-- | HTTP request timeout |
prediction_cache_ttl_seconds |
21600.0 |
-- | Prediction cache TTL (6 hours) |
dry_run |
False |
-- | Log batches but don't send to server |
max_retries |
3 |
-- | Retries on transient failures |
Constructor values take priority over environment variables.
Error Handling
track()never throws and never blocks. Errors are logged, not raised.- When the queue is full, the oldest events are evicted.
- Failed submissions are retried with exponential backoff (honors
Retry-Afteron 429). - All logging uses the
mosaicstack_telemetrylogger.
Dry-Run Mode
Test your integration without sending data:
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True, # Logs batches but doesn't send
)
Documentation
- Integration Guide -- Installation, configuration, FastAPI and generic Python examples, async vs sync patterns, prediction queries, error handling
- API Reference -- All exported classes, methods, types, and enums
License
MPL-2.0