Files
telemetry-client-py/docs/integration-guide.md
Jason Woltje dd83d55aa7
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
docs(#1): Document release vs dev versioning scheme
- Add Versioning section to README with version format table and
  cross-SDK alignment note (Python PEP 440 vs JS semver)
- Add Versioning section to integration guide with install commands
  for stable and dev builds, pinning examples, and SDK format comparison
- Update API reference header to reflect dev version format

Refs #1

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:02:47 -06:00

658 lines
20 KiB
Markdown

# Integration Guide
This guide covers installing and integrating `mosaicstack-telemetry` into Python applications. The SDK reports AI coding task-completion telemetry to a [Mosaic Stack Telemetry](https://github.com/mosaicstack/telemetry) server and queries crowd-sourced predictions.
**Telemetry API version:** This SDK targets the Mosaic Telemetry API **v1** with event schema version **1.0**.
## Installation
Install from the Mosaic Stack package registry:
```bash
pip install mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
```
Or with [uv](https://docs.astral.sh/uv/):
```bash
uv add mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
```
To avoid repeating the index URL, configure it in your project's `pyproject.toml`:
```toml
[[tool.uv.index]]
name = "mosaic"
url = "https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/"
```
Or in `pip.conf` (`~/.config/pip/pip.conf` on Linux):
```ini
[global]
extra-index-url = https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
```
**Requirements:** Python 3.10+. Runtime dependencies: `httpx` and `pydantic`.
## Configuration
### Constructor Parameters
```python
from mosaicstack_telemetry import TelemetryConfig
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
```
All three fields (`server_url`, `api_key`, `instance_id`) are required when telemetry is enabled. The `api_key` must be a 64-character hexadecimal string issued by a Mosaic Telemetry administrator. The `instance_id` is a UUID that identifies your Mosaic Stack installation and must match the instance associated with your API key on the server.
### Environment Variables
Instead of passing values to the constructor, set environment variables:
```bash
export MOSAIC_TELEMETRY_ENABLED=true
export MOSAIC_TELEMETRY_SERVER_URL=https://tel-api.mosaicstack.dev
export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key
export MOSAIC_TELEMETRY_INSTANCE_ID=a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d
```
Then create the config with no arguments:
```python
config = TelemetryConfig() # Reads from environment
```
Constructor values take priority over environment variables.
### Full Configuration Reference
| Parameter | Type | Default | Env Var | Description |
|-----------|------|---------|---------|-------------|
| `server_url` | `str` | `""` (required) | `MOSAIC_TELEMETRY_SERVER_URL` | Telemetry API base URL (no trailing slash) |
| `api_key` | `str` | `""` (required) | `MOSAIC_TELEMETRY_API_KEY` | 64-character hex API key |
| `instance_id` | `str` | `""` (required) | `MOSAIC_TELEMETRY_INSTANCE_ID` | UUID identifying this Mosaic Stack instance |
| `enabled` | `bool` | `True` | `MOSAIC_TELEMETRY_ENABLED` | Enable/disable telemetry entirely |
| `submit_interval_seconds` | `float` | `300.0` | -- | How often the background submitter flushes queued events (seconds) |
| `max_queue_size` | `int` | `1000` | -- | Maximum events held in the in-memory queue |
| `batch_size` | `int` | `100` | -- | Events per batch (server maximum is 100) |
| `request_timeout_seconds` | `float` | `10.0` | -- | HTTP request timeout for API calls |
| `prediction_cache_ttl_seconds` | `float` | `21600.0` | -- | Prediction cache TTL (default 6 hours) |
| `dry_run` | `bool` | `False` | -- | Log batches but don't send to server |
| `max_retries` | `int` | `3` | -- | Retries on transient failures (429, timeouts, network errors) |
### Environment-Specific Configuration
**Development:**
```python
config = TelemetryConfig(
server_url="http://localhost:8000",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True, # Log but don't send
submit_interval_seconds=10.0, # Flush quickly for testing
)
```
**Production:**
```python
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
submit_interval_seconds=300.0, # Default: flush every 5 minutes
max_retries=3, # Retry transient failures
)
```
---
## Sync Usage (Threading)
Best for scripts, CLI tools, aider integrations, and non-async contexts. The SDK spawns a daemon thread that periodically flushes queued events.
```python
from mosaicstack_telemetry import (
TelemetryClient,
TelemetryConfig,
EventBuilder,
TaskType,
Provider,
Harness,
Complexity,
Outcome,
QualityGate,
)
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
client = TelemetryClient(config)
client.start() # Starts background daemon thread
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-5-20250929")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.CLAUDE_CODE)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome.SUCCESS)
.duration_ms(45000)
.tokens(estimated_in=105000, estimated_out=45000, actual_in=112340, actual_out=38760)
.cost(estimated=630000, actual=919200)
.quality(
passed=True,
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.TYPECHECK],
)
.context(compactions=2, rotations=0, utilization=0.72)
.language("typescript")
.build()
)
client.track(event) # Non-blocking, thread-safe
client.stop() # Flushes remaining events and stops the thread
```
### Sync Context Manager
The context manager calls `start()` on entry and `stop()` (with flush) on exit:
```python
with TelemetryClient(config) as client:
client.track(event)
# Automatically flushed and stopped here
```
---
## Async Usage (asyncio)
For asyncio-based applications (FastAPI, aiohttp, etc.). The SDK creates an asyncio task that periodically flushes events.
```python
import asyncio
from mosaicstack_telemetry import TelemetryClient, TelemetryConfig
async def main():
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
client = TelemetryClient(config)
await client.start_async() # Starts asyncio background task
# track() is always synchronous and non-blocking
client.track(event)
await client.stop_async() # Flushes remaining events
asyncio.run(main())
```
### Async Context Manager
```python
async with TelemetryClient(config) as client:
client.track(event)
# Automatically flushed and stopped here
```
### Key Difference: Sync vs Async
| Aspect | Sync | Async |
|--------|------|-------|
| Start | `client.start()` | `await client.start_async()` |
| Stop | `client.stop()` | `await client.stop_async()` |
| Context manager | `with TelemetryClient(config)` | `async with TelemetryClient(config)` |
| Background mechanism | `threading.Timer` (daemon thread) | `asyncio.Task` |
| `track()` | Always synchronous | Always synchronous |
| `refresh_predictions` | `refresh_predictions_sync(queries)` | `await refresh_predictions(queries)` |
The `track()` method is always synchronous regardless of which mode you use. It simply appends to a thread-safe in-memory queue and returns immediately. The background submitter handles batching and sending.
---
## Integration Examples
### Example 1: Instrumenting a FastAPI Service
```python
import os
import time
from contextlib import asynccontextmanager
from uuid import UUID
from fastapi import FastAPI
from mosaicstack_telemetry import (
Complexity,
EventBuilder,
Harness,
Outcome,
Provider,
QualityGate,
TaskType,
TelemetryClient,
TelemetryConfig,
)
# Initialize telemetry once at startup
config = TelemetryConfig(
server_url=os.environ.get("MOSAIC_TELEMETRY_SERVER_URL", "https://tel-api.mosaicstack.dev"),
api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
)
telemetry = TelemetryClient(config)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start telemetry on app startup, flush on shutdown."""
await telemetry.start_async()
yield
await telemetry.stop_async()
app = FastAPI(lifespan=lifespan)
@app.post("/tasks/complete")
async def complete_task(
task_type: str,
model: str,
provider: str,
complexity: str,
actual_input_tokens: int,
actual_output_tokens: int,
actual_cost_usd_micros: int,
duration_ms: int,
outcome: str,
):
"""Record a completed AI coding task."""
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType(task_type))
.model(model)
.provider(Provider(provider))
.harness_type(Harness.CLAUDE_CODE)
.complexity_level(Complexity(complexity))
.outcome_value(Outcome(outcome))
.duration_ms(duration_ms)
.tokens(
estimated_in=0,
estimated_out=0,
actual_in=actual_input_tokens,
actual_out=actual_output_tokens,
)
.cost(estimated=0, actual=actual_cost_usd_micros)
.quality(passed=outcome == "success", gates_run=[QualityGate.BUILD, QualityGate.TEST])
.context(compactions=0, rotations=0, utilization=0.0)
.build()
)
telemetry.track(event) # Non-blocking, never throws
return {"status": "tracked"}
```
### Example 2: Instrumenting a Generic Python App
```python
"""
Generic Python script that tracks AI coding tasks.
Suitable for CLI tools, batch processors, or any non-async application.
"""
import os
import time
from mosaicstack_telemetry import (
Complexity,
EventBuilder,
Harness,
Outcome,
Provider,
QualityGate,
RepoSizeCategory,
TaskType,
TelemetryClient,
TelemetryConfig,
)
def run_coding_task() -> dict:
"""Simulate an AI coding task. Returns task metrics."""
start = time.monotonic()
# ... your AI coding logic here ...
elapsed_ms = int((time.monotonic() - start) * 1000)
return {
"duration_ms": elapsed_ms,
"actual_input_tokens": 4500,
"actual_output_tokens": 1800,
"actual_cost_usd_micros": 48000,
"outcome": "success",
"quality_gates_passed": True,
}
def main():
config = TelemetryConfig() # Reads from MOSAIC_TELEMETRY_* env vars
with TelemetryClient(config) as client:
result = run_coding_task()
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-5-20250929")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.AIDER)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome(result["outcome"]))
.duration_ms(result["duration_ms"])
.tokens(
estimated_in=5000,
estimated_out=2000,
actual_in=result["actual_input_tokens"],
actual_out=result["actual_output_tokens"],
)
.cost(estimated=50000, actual=result["actual_cost_usd_micros"])
.quality(
passed=result["quality_gates_passed"],
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST],
)
.context(compactions=0, rotations=0, utilization=0.35)
.language("python")
.repo_size(RepoSizeCategory.MEDIUM)
.build()
)
client.track(event)
print(f"Tracked task: {event.event_id}")
# Client is automatically flushed and stopped after the `with` block
if __name__ == "__main__":
main()
```
---
## Building Events
The `EventBuilder` provides a fluent API for constructing `TaskCompletionEvent` objects with sensible defaults. All setter methods return the builder instance for chaining.
### Required Fields
Every event requires these fields to be set (either via builder methods or from defaults):
| Builder Method | Sets Field | Default |
|----------------|-----------|---------|
| `EventBuilder(instance_id=...)` | `instance_id` | (required) |
| `.task_type(TaskType.X)` | `task_type` | `unknown` |
| `.model("model-name")` | `model` | `"unknown"` |
| `.provider(Provider.X)` | `provider` | `unknown` |
| `.harness_type(Harness.X)` | `harness` | `unknown` |
| `.complexity_level(Complexity.X)` | `complexity` | `medium` |
| `.outcome_value(Outcome.X)` | `outcome` | `failure` |
| `.duration_ms(N)` | `task_duration_ms` | `0` |
| `.tokens(...)` | token fields | all `0` |
| `.cost(...)` | cost fields | all `0` |
| `.quality(...)` | quality fields | `passed=False, gates_run=[], gates_failed=[]` |
| `.context(...)` | context fields | all `0` / `0.0` |
### Auto-Generated Fields
| Field | Auto-generated Value |
|-------|---------------------|
| `event_id` | Random UUID (override with `.event_id(uuid)`) |
| `timestamp` | Current UTC time (override with `.timestamp(dt)`) |
| `schema_version` | `"1.0"` (set automatically by `TaskCompletionEvent`) |
### Optional Fields
| Builder Method | Sets Field | Default |
|----------------|-----------|---------|
| `.language("python")` | `language` | `None` |
| `.repo_size(RepoSizeCategory.MEDIUM)` | `repo_size_category` | `None` |
| `.retry_count(N)` | `retry_count` | `0` |
### Token and Cost Values
Costs are expressed in **microdollars** (1 USD = 1,000,000 microdollars). This avoids floating-point precision issues.
```python
event = (
EventBuilder(instance_id=config.instance_id)
# ... other fields ...
.tokens(
estimated_in=105000, # Pre-task estimate: input tokens
estimated_out=45000, # Pre-task estimate: output tokens
actual_in=112340, # Actual input tokens consumed
actual_out=38760, # Actual output tokens generated
)
.cost(
estimated=630000, # $0.63 in microdollars
actual=919200, # $0.92 in microdollars
)
.build()
)
```
### Quality Gates
Record which quality gates were executed and their results:
```python
event = (
EventBuilder(instance_id=config.instance_id)
# ... other fields ...
.quality(
passed=False,
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.COVERAGE],
gates_failed=[QualityGate.COVERAGE],
)
.build()
)
```
Available gates: `BUILD`, `LINT`, `TEST`, `COVERAGE`, `TYPECHECK`, `SECURITY`.
---
## Querying Predictions
The SDK can query crowd-sourced predictions from the telemetry server. Predictions provide percentile distributions for token usage, cost, duration, and quality metrics based on aggregated data from all participating instances.
### Fetching Predictions
```python
from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity
queries = [
PredictionQuery(
task_type=TaskType.IMPLEMENTATION,
model="claude-sonnet-4-5-20250929",
provider=Provider.ANTHROPIC,
complexity=Complexity.MEDIUM,
),
PredictionQuery(
task_type=TaskType.TESTING,
model="claude-haiku-4-5-20251001",
provider=Provider.ANTHROPIC,
complexity=Complexity.LOW,
),
]
# Async
await client.refresh_predictions(queries)
# Sync
client.refresh_predictions_sync(queries)
```
### Reading from Cache
Predictions are stored in a TTL-based in-memory cache (default: 6 hours):
```python
prediction = client.get_prediction(queries[0])
if prediction and prediction.prediction:
data = prediction.prediction
print(f"Input tokens (median): {data.input_tokens.median}")
print(f"Input tokens (p90): {data.input_tokens.p90}")
print(f"Output tokens (median): {data.output_tokens.median}")
print(f"Cost (median): ${data.cost_usd_micros['median'] / 1_000_000:.4f}")
print(f"Duration (median): {data.duration_ms['median'] / 1000:.1f}s")
print(f"Correction factor (input): {data.correction_factors.input:.2f}")
print(f"Quality gate pass rate: {data.quality.gate_pass_rate:.0%}")
print(f"Success rate: {data.quality.success_rate:.0%}")
meta = prediction.metadata
print(f"Sample size: {meta.sample_size}")
print(f"Confidence: {meta.confidence}")
if meta.fallback_note:
print(f"Note: {meta.fallback_note}")
else:
print("No prediction data available for this combination")
```
### Prediction Confidence Levels
| Level | Meaning |
|-------|---------|
| `high` | 100+ samples, exact dimension match |
| `medium` | 30-99 samples, exact dimension match |
| `low` | <30 samples or fallback was used |
| `none` | No data available; `prediction` is `None` |
---
## Error Handling
### The `track()` Contract
**`track()` never throws and never blocks the caller.** If anything goes wrong (queue full, telemetry disabled, unexpected error), the event is silently dropped and the error is logged. This ensures telemetry instrumentation never affects your application's behavior.
```python
# This is always safe, even if telemetry is misconfigured
client.track(event)
```
### Queue Overflow
When the in-memory queue reaches `max_queue_size` (default 1000), the oldest events are evicted to make room for new ones. A warning is logged when this happens.
### Submission Retries
The background submitter retries transient failures with exponential backoff and jitter:
- **429 Too Many Requests**: Honors the server's `Retry-After` header
- **Timeouts**: Retried with backoff
- **Network errors**: Retried with backoff
- **403 Forbidden**: Not retried (configuration error)
Failed batches are re-queued for the next submission cycle (up to queue capacity).
### Logging
All SDK logging uses the `mosaicstack_telemetry` logger. Enable it to see submission activity:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Or target the SDK logger specifically:
logging.getLogger("mosaicstack_telemetry").setLevel(logging.DEBUG)
```
---
## Dry-Run Mode
Test your integration without sending data to the server:
```python
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True,
)
with TelemetryClient(config) as client:
client.track(event)
# Logs: "[DRY RUN] Would submit batch of 1 events to ..."
```
## Disabling Telemetry
Set `enabled=False` or the environment variable `MOSAIC_TELEMETRY_ENABLED=false`:
```python
config = TelemetryConfig(enabled=False)
with TelemetryClient(config) as client:
client.track(event) # Silently dropped, no background thread started
```
---
## Versioning
This SDK publishes two types of versions to the Mosaic Stack PyPI registry at `git.mosaicstack.dev`:
| Channel | Version Format | Example | Branch | Install |
|---------|---------------|---------|--------|---------|
| **Stable** | `X.Y.Z` | `0.1.0` | `main` / tags | `pip install mosaicstack-telemetry` |
| **Dev** | `X.Y.Z.devYYYYMMDDHHMMSS` | `0.1.0.dev20260215045901` | `develop` | `pip install mosaicstack-telemetry --pre` |
Dev versions use a UTC timestamp suffix following [PEP 440](https://peps.python.org/pep-0440/#developmental-releases). This convention is shared across Mosaic Stack SDKs:
| SDK | Dev Version Format | Example |
|-----|-------------------|---------|
| Python (this SDK) | `X.Y.Z.devYYYYMMDDHHMMSS` | `0.1.0.dev20260215045901` |
| JavaScript | `X.Y.Z-dev.YYYYMMDDHHmmss` | `0.1.0-dev.20260215045901` |
By default, `pip install` resolves only stable releases. To install the latest dev build:
```bash
pip install mosaicstack-telemetry --pre --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
```
To pin to a specific dev version:
```bash
pip install mosaicstack-telemetry==0.1.0.dev20260215045901 --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
```
---
## API Compatibility
| SDK Version | Telemetry API | Event Schema | Notes |
|-------------|---------------|--------------|-------|
| 0.1.x | v1 (`/v1/*`) | 1.0 | Current release |
The SDK submits events to `POST /v1/events/batch` and queries predictions from `POST /v1/predictions/batch`. These are the only two server endpoints the SDK communicates with.
For the full server API documentation, see the [Mosaic Telemetry API Reference](https://github.com/mosaicstack/telemetry).