- Add publish step to .woodpecker.yml that builds wheel/sdist and uploads to git.mosaicstack.dev PyPI registry via twine (gated on all quality checks, only on main/develop/tags) - Add link-package step to associate PyPI package with the repository - Update README and integration guide with Gitea registry install instructions (pip --index-url, uv --index-url, pyproject.toml config) - Version check prevents re-publishing existing versions Refs #1 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
19 KiB
Integration Guide
This guide covers installing and integrating mosaicstack-telemetry into Python applications. The SDK reports AI coding task-completion telemetry to a Mosaic Stack Telemetry server and queries crowd-sourced predictions.
Telemetry API version: This SDK targets the Mosaic Telemetry API v1 with event schema version 1.0.
Installation
Install from the Mosaic Stack package registry:
pip install mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
Or with uv:
uv add mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
To avoid repeating the index URL, configure it in your project's pyproject.toml:
[[tool.uv.index]]
name = "mosaic"
url = "https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/"
Or in pip.conf (~/.config/pip/pip.conf on Linux):
[global]
extra-index-url = https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/
Requirements: Python 3.10+. Runtime dependencies: httpx and pydantic.
Configuration
Constructor Parameters
from mosaicstack_telemetry import TelemetryConfig
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
All three fields (server_url, api_key, instance_id) are required when telemetry is enabled. The api_key must be a 64-character hexadecimal string issued by a Mosaic Telemetry administrator. The instance_id is a UUID that identifies your Mosaic Stack installation and must match the instance associated with your API key on the server.
Environment Variables
Instead of passing values to the constructor, set environment variables:
export MOSAIC_TELEMETRY_ENABLED=true
export MOSAIC_TELEMETRY_SERVER_URL=https://tel-api.mosaicstack.dev
export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key
export MOSAIC_TELEMETRY_INSTANCE_ID=a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d
Then create the config with no arguments:
config = TelemetryConfig() # Reads from environment
Constructor values take priority over environment variables.
Full Configuration Reference
| Parameter | Type | Default | Env Var | Description |
|---|---|---|---|---|
server_url |
str |
"" (required) |
MOSAIC_TELEMETRY_SERVER_URL |
Telemetry API base URL (no trailing slash) |
api_key |
str |
"" (required) |
MOSAIC_TELEMETRY_API_KEY |
64-character hex API key |
instance_id |
str |
"" (required) |
MOSAIC_TELEMETRY_INSTANCE_ID |
UUID identifying this Mosaic Stack instance |
enabled |
bool |
True |
MOSAIC_TELEMETRY_ENABLED |
Enable/disable telemetry entirely |
submit_interval_seconds |
float |
300.0 |
-- | How often the background submitter flushes queued events (seconds) |
max_queue_size |
int |
1000 |
-- | Maximum events held in the in-memory queue |
batch_size |
int |
100 |
-- | Events per batch (server maximum is 100) |
request_timeout_seconds |
float |
10.0 |
-- | HTTP request timeout for API calls |
prediction_cache_ttl_seconds |
float |
21600.0 |
-- | Prediction cache TTL (default 6 hours) |
dry_run |
bool |
False |
-- | Log batches but don't send to server |
max_retries |
int |
3 |
-- | Retries on transient failures (429, timeouts, network errors) |
Environment-Specific Configuration
Development:
config = TelemetryConfig(
server_url="http://localhost:8000",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True, # Log but don't send
submit_interval_seconds=10.0, # Flush quickly for testing
)
Production:
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
submit_interval_seconds=300.0, # Default: flush every 5 minutes
max_retries=3, # Retry transient failures
)
Sync Usage (Threading)
Best for scripts, CLI tools, aider integrations, and non-async contexts. The SDK spawns a daemon thread that periodically flushes queued events.
from mosaicstack_telemetry import (
TelemetryClient,
TelemetryConfig,
EventBuilder,
TaskType,
Provider,
Harness,
Complexity,
Outcome,
QualityGate,
)
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
client = TelemetryClient(config)
client.start() # Starts background daemon thread
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-5-20250929")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.CLAUDE_CODE)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome.SUCCESS)
.duration_ms(45000)
.tokens(estimated_in=105000, estimated_out=45000, actual_in=112340, actual_out=38760)
.cost(estimated=630000, actual=919200)
.quality(
passed=True,
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.TYPECHECK],
)
.context(compactions=2, rotations=0, utilization=0.72)
.language("typescript")
.build()
)
client.track(event) # Non-blocking, thread-safe
client.stop() # Flushes remaining events and stops the thread
Sync Context Manager
The context manager calls start() on entry and stop() (with flush) on exit:
with TelemetryClient(config) as client:
client.track(event)
# Automatically flushed and stopped here
Async Usage (asyncio)
For asyncio-based applications (FastAPI, aiohttp, etc.). The SDK creates an asyncio task that periodically flushes events.
import asyncio
from mosaicstack_telemetry import TelemetryClient, TelemetryConfig
async def main():
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="your-64-char-hex-api-key-here...",
instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)
client = TelemetryClient(config)
await client.start_async() # Starts asyncio background task
# track() is always synchronous and non-blocking
client.track(event)
await client.stop_async() # Flushes remaining events
asyncio.run(main())
Async Context Manager
async with TelemetryClient(config) as client:
client.track(event)
# Automatically flushed and stopped here
Key Difference: Sync vs Async
| Aspect | Sync | Async |
|---|---|---|
| Start | client.start() |
await client.start_async() |
| Stop | client.stop() |
await client.stop_async() |
| Context manager | with TelemetryClient(config) |
async with TelemetryClient(config) |
| Background mechanism | threading.Timer (daemon thread) |
asyncio.Task |
track() |
Always synchronous | Always synchronous |
refresh_predictions |
refresh_predictions_sync(queries) |
await refresh_predictions(queries) |
The track() method is always synchronous regardless of which mode you use. It simply appends to a thread-safe in-memory queue and returns immediately. The background submitter handles batching and sending.
Integration Examples
Example 1: Instrumenting a FastAPI Service
import os
import time
from contextlib import asynccontextmanager
from uuid import UUID
from fastapi import FastAPI
from mosaicstack_telemetry import (
Complexity,
EventBuilder,
Harness,
Outcome,
Provider,
QualityGate,
TaskType,
TelemetryClient,
TelemetryConfig,
)
# Initialize telemetry once at startup
config = TelemetryConfig(
server_url=os.environ.get("MOSAIC_TELEMETRY_SERVER_URL", "https://tel-api.mosaicstack.dev"),
api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
)
telemetry = TelemetryClient(config)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start telemetry on app startup, flush on shutdown."""
await telemetry.start_async()
yield
await telemetry.stop_async()
app = FastAPI(lifespan=lifespan)
@app.post("/tasks/complete")
async def complete_task(
task_type: str,
model: str,
provider: str,
complexity: str,
actual_input_tokens: int,
actual_output_tokens: int,
actual_cost_usd_micros: int,
duration_ms: int,
outcome: str,
):
"""Record a completed AI coding task."""
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType(task_type))
.model(model)
.provider(Provider(provider))
.harness_type(Harness.CLAUDE_CODE)
.complexity_level(Complexity(complexity))
.outcome_value(Outcome(outcome))
.duration_ms(duration_ms)
.tokens(
estimated_in=0,
estimated_out=0,
actual_in=actual_input_tokens,
actual_out=actual_output_tokens,
)
.cost(estimated=0, actual=actual_cost_usd_micros)
.quality(passed=outcome == "success", gates_run=[QualityGate.BUILD, QualityGate.TEST])
.context(compactions=0, rotations=0, utilization=0.0)
.build()
)
telemetry.track(event) # Non-blocking, never throws
return {"status": "tracked"}
Example 2: Instrumenting a Generic Python App
"""
Generic Python script that tracks AI coding tasks.
Suitable for CLI tools, batch processors, or any non-async application.
"""
import os
import time
from mosaicstack_telemetry import (
Complexity,
EventBuilder,
Harness,
Outcome,
Provider,
QualityGate,
RepoSizeCategory,
TaskType,
TelemetryClient,
TelemetryConfig,
)
def run_coding_task() -> dict:
"""Simulate an AI coding task. Returns task metrics."""
start = time.monotonic()
# ... your AI coding logic here ...
elapsed_ms = int((time.monotonic() - start) * 1000)
return {
"duration_ms": elapsed_ms,
"actual_input_tokens": 4500,
"actual_output_tokens": 1800,
"actual_cost_usd_micros": 48000,
"outcome": "success",
"quality_gates_passed": True,
}
def main():
config = TelemetryConfig() # Reads from MOSAIC_TELEMETRY_* env vars
with TelemetryClient(config) as client:
result = run_coding_task()
event = (
EventBuilder(instance_id=config.instance_id)
.task_type(TaskType.IMPLEMENTATION)
.model("claude-sonnet-4-5-20250929")
.provider(Provider.ANTHROPIC)
.harness_type(Harness.AIDER)
.complexity_level(Complexity.MEDIUM)
.outcome_value(Outcome(result["outcome"]))
.duration_ms(result["duration_ms"])
.tokens(
estimated_in=5000,
estimated_out=2000,
actual_in=result["actual_input_tokens"],
actual_out=result["actual_output_tokens"],
)
.cost(estimated=50000, actual=result["actual_cost_usd_micros"])
.quality(
passed=result["quality_gates_passed"],
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST],
)
.context(compactions=0, rotations=0, utilization=0.35)
.language("python")
.repo_size(RepoSizeCategory.MEDIUM)
.build()
)
client.track(event)
print(f"Tracked task: {event.event_id}")
# Client is automatically flushed and stopped after the `with` block
if __name__ == "__main__":
main()
Building Events
The EventBuilder provides a fluent API for constructing TaskCompletionEvent objects with sensible defaults. All setter methods return the builder instance for chaining.
Required Fields
Every event requires these fields to be set (either via builder methods or from defaults):
| Builder Method | Sets Field | Default |
|---|---|---|
EventBuilder(instance_id=...) |
instance_id |
(required) |
.task_type(TaskType.X) |
task_type |
unknown |
.model("model-name") |
model |
"unknown" |
.provider(Provider.X) |
provider |
unknown |
.harness_type(Harness.X) |
harness |
unknown |
.complexity_level(Complexity.X) |
complexity |
medium |
.outcome_value(Outcome.X) |
outcome |
failure |
.duration_ms(N) |
task_duration_ms |
0 |
.tokens(...) |
token fields | all 0 |
.cost(...) |
cost fields | all 0 |
.quality(...) |
quality fields | passed=False, gates_run=[], gates_failed=[] |
.context(...) |
context fields | all 0 / 0.0 |
Auto-Generated Fields
| Field | Auto-generated Value |
|---|---|
event_id |
Random UUID (override with .event_id(uuid)) |
timestamp |
Current UTC time (override with .timestamp(dt)) |
schema_version |
"1.0" (set automatically by TaskCompletionEvent) |
Optional Fields
| Builder Method | Sets Field | Default |
|---|---|---|
.language("python") |
language |
None |
.repo_size(RepoSizeCategory.MEDIUM) |
repo_size_category |
None |
.retry_count(N) |
retry_count |
0 |
Token and Cost Values
Costs are expressed in microdollars (1 USD = 1,000,000 microdollars). This avoids floating-point precision issues.
event = (
EventBuilder(instance_id=config.instance_id)
# ... other fields ...
.tokens(
estimated_in=105000, # Pre-task estimate: input tokens
estimated_out=45000, # Pre-task estimate: output tokens
actual_in=112340, # Actual input tokens consumed
actual_out=38760, # Actual output tokens generated
)
.cost(
estimated=630000, # $0.63 in microdollars
actual=919200, # $0.92 in microdollars
)
.build()
)
Quality Gates
Record which quality gates were executed and their results:
event = (
EventBuilder(instance_id=config.instance_id)
# ... other fields ...
.quality(
passed=False,
gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.COVERAGE],
gates_failed=[QualityGate.COVERAGE],
)
.build()
)
Available gates: BUILD, LINT, TEST, COVERAGE, TYPECHECK, SECURITY.
Querying Predictions
The SDK can query crowd-sourced predictions from the telemetry server. Predictions provide percentile distributions for token usage, cost, duration, and quality metrics based on aggregated data from all participating instances.
Fetching Predictions
from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity
queries = [
PredictionQuery(
task_type=TaskType.IMPLEMENTATION,
model="claude-sonnet-4-5-20250929",
provider=Provider.ANTHROPIC,
complexity=Complexity.MEDIUM,
),
PredictionQuery(
task_type=TaskType.TESTING,
model="claude-haiku-4-5-20251001",
provider=Provider.ANTHROPIC,
complexity=Complexity.LOW,
),
]
# Async
await client.refresh_predictions(queries)
# Sync
client.refresh_predictions_sync(queries)
Reading from Cache
Predictions are stored in a TTL-based in-memory cache (default: 6 hours):
prediction = client.get_prediction(queries[0])
if prediction and prediction.prediction:
data = prediction.prediction
print(f"Input tokens (median): {data.input_tokens.median}")
print(f"Input tokens (p90): {data.input_tokens.p90}")
print(f"Output tokens (median): {data.output_tokens.median}")
print(f"Cost (median): ${data.cost_usd_micros['median'] / 1_000_000:.4f}")
print(f"Duration (median): {data.duration_ms['median'] / 1000:.1f}s")
print(f"Correction factor (input): {data.correction_factors.input:.2f}")
print(f"Quality gate pass rate: {data.quality.gate_pass_rate:.0%}")
print(f"Success rate: {data.quality.success_rate:.0%}")
meta = prediction.metadata
print(f"Sample size: {meta.sample_size}")
print(f"Confidence: {meta.confidence}")
if meta.fallback_note:
print(f"Note: {meta.fallback_note}")
else:
print("No prediction data available for this combination")
Prediction Confidence Levels
| Level | Meaning |
|---|---|
high |
100+ samples, exact dimension match |
medium |
30-99 samples, exact dimension match |
low |
<30 samples or fallback was used |
none |
No data available; prediction is None |
Error Handling
The track() Contract
track() never throws and never blocks the caller. If anything goes wrong (queue full, telemetry disabled, unexpected error), the event is silently dropped and the error is logged. This ensures telemetry instrumentation never affects your application's behavior.
# This is always safe, even if telemetry is misconfigured
client.track(event)
Queue Overflow
When the in-memory queue reaches max_queue_size (default 1000), the oldest events are evicted to make room for new ones. A warning is logged when this happens.
Submission Retries
The background submitter retries transient failures with exponential backoff and jitter:
- 429 Too Many Requests: Honors the server's
Retry-Afterheader - Timeouts: Retried with backoff
- Network errors: Retried with backoff
- 403 Forbidden: Not retried (configuration error)
Failed batches are re-queued for the next submission cycle (up to queue capacity).
Logging
All SDK logging uses the mosaicstack_telemetry logger. Enable it to see submission activity:
import logging
logging.basicConfig(level=logging.DEBUG)
# Or target the SDK logger specifically:
logging.getLogger("mosaicstack_telemetry").setLevel(logging.DEBUG)
Dry-Run Mode
Test your integration without sending data to the server:
config = TelemetryConfig(
server_url="https://tel-api.mosaicstack.dev",
api_key="a" * 64,
instance_id="12345678-1234-1234-1234-123456789abc",
dry_run=True,
)
with TelemetryClient(config) as client:
client.track(event)
# Logs: "[DRY RUN] Would submit batch of 1 events to ..."
Disabling Telemetry
Set enabled=False or the environment variable MOSAIC_TELEMETRY_ENABLED=false:
config = TelemetryConfig(enabled=False)
with TelemetryClient(config) as client:
client.track(event) # Silently dropped, no background thread started
API Compatibility
| SDK Version | Telemetry API | Event Schema | Notes |
|---|---|---|---|
| 0.1.x | v1 (/v1/*) |
1.0 | Current release |
The SDK submits events to POST /v1/events/batch and queries predictions from POST /v1/predictions/batch. These are the only two server endpoints the SDK communicates with.
For the full server API documentation, see the Mosaic Telemetry API Reference.