# Integration Guide This guide covers installing and integrating `mosaicstack-telemetry` into Python applications. The SDK reports AI coding task-completion telemetry to a [Mosaic Stack Telemetry](https://github.com/mosaicstack/telemetry) server and queries crowd-sourced predictions. **Telemetry API version:** This SDK targets the Mosaic Telemetry API **v1** with event schema version **1.0**. ## Installation Install from the Mosaic Stack package registry: ```bash pip install mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/ ``` Or with [uv](https://docs.astral.sh/uv/): ```bash uv add mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/ ``` To avoid repeating the index URL, configure it in your project's `pyproject.toml`: ```toml [[tool.uv.index]] name = "mosaic" url = "https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/" ``` Or in `pip.conf` (`~/.config/pip/pip.conf` on Linux): ```ini [global] extra-index-url = https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/ ``` **Requirements:** Python 3.10+. Runtime dependencies: `httpx` and `pydantic`. ## Configuration ### Constructor Parameters ```python from mosaicstack_telemetry import TelemetryConfig config = TelemetryConfig( server_url="https://tel-api.mosaicstack.dev", api_key="your-64-char-hex-api-key-here...", instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d", ) ``` All three fields (`server_url`, `api_key`, `instance_id`) are required when telemetry is enabled. The `api_key` must be a 64-character hexadecimal string issued by a Mosaic Telemetry administrator. The `instance_id` is a UUID that identifies your Mosaic Stack installation and must match the instance associated with your API key on the server. ### Environment Variables Instead of passing values to the constructor, set environment variables: ```bash export MOSAIC_TELEMETRY_ENABLED=true export MOSAIC_TELEMETRY_SERVER_URL=https://tel-api.mosaicstack.dev export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key export MOSAIC_TELEMETRY_INSTANCE_ID=a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d ``` Then create the config with no arguments: ```python config = TelemetryConfig() # Reads from environment ``` Constructor values take priority over environment variables. ### Full Configuration Reference | Parameter | Type | Default | Env Var | Description | |-----------|------|---------|---------|-------------| | `server_url` | `str` | `""` (required) | `MOSAIC_TELEMETRY_SERVER_URL` | Telemetry API base URL (no trailing slash) | | `api_key` | `str` | `""` (required) | `MOSAIC_TELEMETRY_API_KEY` | 64-character hex API key | | `instance_id` | `str` | `""` (required) | `MOSAIC_TELEMETRY_INSTANCE_ID` | UUID identifying this Mosaic Stack instance | | `enabled` | `bool` | `True` | `MOSAIC_TELEMETRY_ENABLED` | Enable/disable telemetry entirely | | `submit_interval_seconds` | `float` | `300.0` | -- | How often the background submitter flushes queued events (seconds) | | `max_queue_size` | `int` | `1000` | -- | Maximum events held in the in-memory queue | | `batch_size` | `int` | `100` | -- | Events per batch (server maximum is 100) | | `request_timeout_seconds` | `float` | `10.0` | -- | HTTP request timeout for API calls | | `prediction_cache_ttl_seconds` | `float` | `21600.0` | -- | Prediction cache TTL (default 6 hours) | | `dry_run` | `bool` | `False` | -- | Log batches but don't send to server | | `max_retries` | `int` | `3` | -- | Retries on transient failures (429, timeouts, network errors) | ### Environment-Specific Configuration **Development:** ```python config = TelemetryConfig( server_url="http://localhost:8000", api_key="a" * 64, instance_id="12345678-1234-1234-1234-123456789abc", dry_run=True, # Log but don't send submit_interval_seconds=10.0, # Flush quickly for testing ) ``` **Production:** ```python config = TelemetryConfig( server_url="https://tel-api.mosaicstack.dev", api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"], instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"], submit_interval_seconds=300.0, # Default: flush every 5 minutes max_retries=3, # Retry transient failures ) ``` --- ## Sync Usage (Threading) Best for scripts, CLI tools, aider integrations, and non-async contexts. The SDK spawns a daemon thread that periodically flushes queued events. ```python from mosaicstack_telemetry import ( TelemetryClient, TelemetryConfig, EventBuilder, TaskType, Provider, Harness, Complexity, Outcome, QualityGate, ) config = TelemetryConfig( server_url="https://tel-api.mosaicstack.dev", api_key="your-64-char-hex-api-key-here...", instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d", ) client = TelemetryClient(config) client.start() # Starts background daemon thread event = ( EventBuilder(instance_id=config.instance_id) .task_type(TaskType.IMPLEMENTATION) .model("claude-sonnet-4-5-20250929") .provider(Provider.ANTHROPIC) .harness_type(Harness.CLAUDE_CODE) .complexity_level(Complexity.MEDIUM) .outcome_value(Outcome.SUCCESS) .duration_ms(45000) .tokens(estimated_in=105000, estimated_out=45000, actual_in=112340, actual_out=38760) .cost(estimated=630000, actual=919200) .quality( passed=True, gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.TYPECHECK], ) .context(compactions=2, rotations=0, utilization=0.72) .language("typescript") .build() ) client.track(event) # Non-blocking, thread-safe client.stop() # Flushes remaining events and stops the thread ``` ### Sync Context Manager The context manager calls `start()` on entry and `stop()` (with flush) on exit: ```python with TelemetryClient(config) as client: client.track(event) # Automatically flushed and stopped here ``` --- ## Async Usage (asyncio) For asyncio-based applications (FastAPI, aiohttp, etc.). The SDK creates an asyncio task that periodically flushes events. ```python import asyncio from mosaicstack_telemetry import TelemetryClient, TelemetryConfig async def main(): config = TelemetryConfig( server_url="https://tel-api.mosaicstack.dev", api_key="your-64-char-hex-api-key-here...", instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d", ) client = TelemetryClient(config) await client.start_async() # Starts asyncio background task # track() is always synchronous and non-blocking client.track(event) await client.stop_async() # Flushes remaining events asyncio.run(main()) ``` ### Async Context Manager ```python async with TelemetryClient(config) as client: client.track(event) # Automatically flushed and stopped here ``` ### Key Difference: Sync vs Async | Aspect | Sync | Async | |--------|------|-------| | Start | `client.start()` | `await client.start_async()` | | Stop | `client.stop()` | `await client.stop_async()` | | Context manager | `with TelemetryClient(config)` | `async with TelemetryClient(config)` | | Background mechanism | `threading.Timer` (daemon thread) | `asyncio.Task` | | `track()` | Always synchronous | Always synchronous | | `refresh_predictions` | `refresh_predictions_sync(queries)` | `await refresh_predictions(queries)` | The `track()` method is always synchronous regardless of which mode you use. It simply appends to a thread-safe in-memory queue and returns immediately. The background submitter handles batching and sending. --- ## Integration Examples ### Example 1: Instrumenting a FastAPI Service ```python import os import time from contextlib import asynccontextmanager from uuid import UUID from fastapi import FastAPI from mosaicstack_telemetry import ( Complexity, EventBuilder, Harness, Outcome, Provider, QualityGate, TaskType, TelemetryClient, TelemetryConfig, ) # Initialize telemetry once at startup config = TelemetryConfig( server_url=os.environ.get("MOSAIC_TELEMETRY_SERVER_URL", "https://tel-api.mosaicstack.dev"), api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"], instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"], ) telemetry = TelemetryClient(config) @asynccontextmanager async def lifespan(app: FastAPI): """Start telemetry on app startup, flush on shutdown.""" await telemetry.start_async() yield await telemetry.stop_async() app = FastAPI(lifespan=lifespan) @app.post("/tasks/complete") async def complete_task( task_type: str, model: str, provider: str, complexity: str, actual_input_tokens: int, actual_output_tokens: int, actual_cost_usd_micros: int, duration_ms: int, outcome: str, ): """Record a completed AI coding task.""" event = ( EventBuilder(instance_id=config.instance_id) .task_type(TaskType(task_type)) .model(model) .provider(Provider(provider)) .harness_type(Harness.CLAUDE_CODE) .complexity_level(Complexity(complexity)) .outcome_value(Outcome(outcome)) .duration_ms(duration_ms) .tokens( estimated_in=0, estimated_out=0, actual_in=actual_input_tokens, actual_out=actual_output_tokens, ) .cost(estimated=0, actual=actual_cost_usd_micros) .quality(passed=outcome == "success", gates_run=[QualityGate.BUILD, QualityGate.TEST]) .context(compactions=0, rotations=0, utilization=0.0) .build() ) telemetry.track(event) # Non-blocking, never throws return {"status": "tracked"} ``` ### Example 2: Instrumenting a Generic Python App ```python """ Generic Python script that tracks AI coding tasks. Suitable for CLI tools, batch processors, or any non-async application. """ import os import time from mosaicstack_telemetry import ( Complexity, EventBuilder, Harness, Outcome, Provider, QualityGate, RepoSizeCategory, TaskType, TelemetryClient, TelemetryConfig, ) def run_coding_task() -> dict: """Simulate an AI coding task. Returns task metrics.""" start = time.monotonic() # ... your AI coding logic here ... elapsed_ms = int((time.monotonic() - start) * 1000) return { "duration_ms": elapsed_ms, "actual_input_tokens": 4500, "actual_output_tokens": 1800, "actual_cost_usd_micros": 48000, "outcome": "success", "quality_gates_passed": True, } def main(): config = TelemetryConfig() # Reads from MOSAIC_TELEMETRY_* env vars with TelemetryClient(config) as client: result = run_coding_task() event = ( EventBuilder(instance_id=config.instance_id) .task_type(TaskType.IMPLEMENTATION) .model("claude-sonnet-4-5-20250929") .provider(Provider.ANTHROPIC) .harness_type(Harness.AIDER) .complexity_level(Complexity.MEDIUM) .outcome_value(Outcome(result["outcome"])) .duration_ms(result["duration_ms"]) .tokens( estimated_in=5000, estimated_out=2000, actual_in=result["actual_input_tokens"], actual_out=result["actual_output_tokens"], ) .cost(estimated=50000, actual=result["actual_cost_usd_micros"]) .quality( passed=result["quality_gates_passed"], gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST], ) .context(compactions=0, rotations=0, utilization=0.35) .language("python") .repo_size(RepoSizeCategory.MEDIUM) .build() ) client.track(event) print(f"Tracked task: {event.event_id}") # Client is automatically flushed and stopped after the `with` block if __name__ == "__main__": main() ``` --- ## Building Events The `EventBuilder` provides a fluent API for constructing `TaskCompletionEvent` objects with sensible defaults. All setter methods return the builder instance for chaining. ### Required Fields Every event requires these fields to be set (either via builder methods or from defaults): | Builder Method | Sets Field | Default | |----------------|-----------|---------| | `EventBuilder(instance_id=...)` | `instance_id` | (required) | | `.task_type(TaskType.X)` | `task_type` | `unknown` | | `.model("model-name")` | `model` | `"unknown"` | | `.provider(Provider.X)` | `provider` | `unknown` | | `.harness_type(Harness.X)` | `harness` | `unknown` | | `.complexity_level(Complexity.X)` | `complexity` | `medium` | | `.outcome_value(Outcome.X)` | `outcome` | `failure` | | `.duration_ms(N)` | `task_duration_ms` | `0` | | `.tokens(...)` | token fields | all `0` | | `.cost(...)` | cost fields | all `0` | | `.quality(...)` | quality fields | `passed=False, gates_run=[], gates_failed=[]` | | `.context(...)` | context fields | all `0` / `0.0` | ### Auto-Generated Fields | Field | Auto-generated Value | |-------|---------------------| | `event_id` | Random UUID (override with `.event_id(uuid)`) | | `timestamp` | Current UTC time (override with `.timestamp(dt)`) | | `schema_version` | `"1.0"` (set automatically by `TaskCompletionEvent`) | ### Optional Fields | Builder Method | Sets Field | Default | |----------------|-----------|---------| | `.language("python")` | `language` | `None` | | `.repo_size(RepoSizeCategory.MEDIUM)` | `repo_size_category` | `None` | | `.retry_count(N)` | `retry_count` | `0` | ### Token and Cost Values Costs are expressed in **microdollars** (1 USD = 1,000,000 microdollars). This avoids floating-point precision issues. ```python event = ( EventBuilder(instance_id=config.instance_id) # ... other fields ... .tokens( estimated_in=105000, # Pre-task estimate: input tokens estimated_out=45000, # Pre-task estimate: output tokens actual_in=112340, # Actual input tokens consumed actual_out=38760, # Actual output tokens generated ) .cost( estimated=630000, # $0.63 in microdollars actual=919200, # $0.92 in microdollars ) .build() ) ``` ### Quality Gates Record which quality gates were executed and their results: ```python event = ( EventBuilder(instance_id=config.instance_id) # ... other fields ... .quality( passed=False, gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.COVERAGE], gates_failed=[QualityGate.COVERAGE], ) .build() ) ``` Available gates: `BUILD`, `LINT`, `TEST`, `COVERAGE`, `TYPECHECK`, `SECURITY`. --- ## Querying Predictions The SDK can query crowd-sourced predictions from the telemetry server. Predictions provide percentile distributions for token usage, cost, duration, and quality metrics based on aggregated data from all participating instances. ### Fetching Predictions ```python from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity queries = [ PredictionQuery( task_type=TaskType.IMPLEMENTATION, model="claude-sonnet-4-5-20250929", provider=Provider.ANTHROPIC, complexity=Complexity.MEDIUM, ), PredictionQuery( task_type=TaskType.TESTING, model="claude-haiku-4-5-20251001", provider=Provider.ANTHROPIC, complexity=Complexity.LOW, ), ] # Async await client.refresh_predictions(queries) # Sync client.refresh_predictions_sync(queries) ``` ### Reading from Cache Predictions are stored in a TTL-based in-memory cache (default: 6 hours): ```python prediction = client.get_prediction(queries[0]) if prediction and prediction.prediction: data = prediction.prediction print(f"Input tokens (median): {data.input_tokens.median}") print(f"Input tokens (p90): {data.input_tokens.p90}") print(f"Output tokens (median): {data.output_tokens.median}") print(f"Cost (median): ${data.cost_usd_micros['median'] / 1_000_000:.4f}") print(f"Duration (median): {data.duration_ms['median'] / 1000:.1f}s") print(f"Correction factor (input): {data.correction_factors.input:.2f}") print(f"Quality gate pass rate: {data.quality.gate_pass_rate:.0%}") print(f"Success rate: {data.quality.success_rate:.0%}") meta = prediction.metadata print(f"Sample size: {meta.sample_size}") print(f"Confidence: {meta.confidence}") if meta.fallback_note: print(f"Note: {meta.fallback_note}") else: print("No prediction data available for this combination") ``` ### Prediction Confidence Levels | Level | Meaning | |-------|---------| | `high` | 100+ samples, exact dimension match | | `medium` | 30-99 samples, exact dimension match | | `low` | <30 samples or fallback was used | | `none` | No data available; `prediction` is `None` | --- ## Error Handling ### The `track()` Contract **`track()` never throws and never blocks the caller.** If anything goes wrong (queue full, telemetry disabled, unexpected error), the event is silently dropped and the error is logged. This ensures telemetry instrumentation never affects your application's behavior. ```python # This is always safe, even if telemetry is misconfigured client.track(event) ``` ### Queue Overflow When the in-memory queue reaches `max_queue_size` (default 1000), the oldest events are evicted to make room for new ones. A warning is logged when this happens. ### Submission Retries The background submitter retries transient failures with exponential backoff and jitter: - **429 Too Many Requests**: Honors the server's `Retry-After` header - **Timeouts**: Retried with backoff - **Network errors**: Retried with backoff - **403 Forbidden**: Not retried (configuration error) Failed batches are re-queued for the next submission cycle (up to queue capacity). ### Logging All SDK logging uses the `mosaicstack_telemetry` logger. Enable it to see submission activity: ```python import logging logging.basicConfig(level=logging.DEBUG) # Or target the SDK logger specifically: logging.getLogger("mosaicstack_telemetry").setLevel(logging.DEBUG) ``` --- ## Dry-Run Mode Test your integration without sending data to the server: ```python config = TelemetryConfig( server_url="https://tel-api.mosaicstack.dev", api_key="a" * 64, instance_id="12345678-1234-1234-1234-123456789abc", dry_run=True, ) with TelemetryClient(config) as client: client.track(event) # Logs: "[DRY RUN] Would submit batch of 1 events to ..." ``` ## Disabling Telemetry Set `enabled=False` or the environment variable `MOSAIC_TELEMETRY_ENABLED=false`: ```python config = TelemetryConfig(enabled=False) with TelemetryClient(config) as client: client.track(event) # Silently dropped, no background thread started ``` --- ## API Compatibility | SDK Version | Telemetry API | Event Schema | Notes | |-------------|---------------|--------------|-------| | 0.1.x | v1 (`/v1/*`) | 1.0 | Current release | The SDK submits events to `POST /v1/events/batch` and queries predictions from `POST /v1/predictions/batch`. These are the only two server endpoints the SDK communicates with. For the full server API documentation, see the [Mosaic Telemetry API Reference](https://github.com/mosaicstack/telemetry).