Files
telemetry-client-py/docs/integration-guide.md
Jason Woltje dd83d55aa7
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
docs(#1): Document release vs dev versioning scheme
- Add Versioning section to README with version format table and
  cross-SDK alignment note (Python PEP 440 vs JS semver)
- Add Versioning section to integration guide with install commands
  for stable and dev builds, pinning examples, and SDK format comparison
- Update API reference header to reflect dev version format

Refs #1

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:02:47 -06:00

20 KiB

Integration Guide

This guide covers installing and integrating mosaicstack-telemetry into Python applications. The SDK reports AI coding task-completion telemetry to a Mosaic Stack Telemetry server and queries crowd-sourced predictions.

Telemetry API version: This SDK targets the Mosaic Telemetry API v1 with event schema version 1.0.

Installation

Install from the Mosaic Stack package registry:

pip install mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/

Or with uv:

uv add mosaicstack-telemetry --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/

To avoid repeating the index URL, configure it in your project's pyproject.toml:

[[tool.uv.index]]
name = "mosaic"
url = "https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/"

Or in pip.conf (~/.config/pip/pip.conf on Linux):

[global]
extra-index-url = https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/

Requirements: Python 3.10+. Runtime dependencies: httpx and pydantic.

Configuration

Constructor Parameters

from mosaicstack_telemetry import TelemetryConfig

config = TelemetryConfig(
    server_url="https://tel-api.mosaicstack.dev",
    api_key="your-64-char-hex-api-key-here...",
    instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)

All three fields (server_url, api_key, instance_id) are required when telemetry is enabled. The api_key must be a 64-character hexadecimal string issued by a Mosaic Telemetry administrator. The instance_id is a UUID that identifies your Mosaic Stack installation and must match the instance associated with your API key on the server.

Environment Variables

Instead of passing values to the constructor, set environment variables:

export MOSAIC_TELEMETRY_ENABLED=true
export MOSAIC_TELEMETRY_SERVER_URL=https://tel-api.mosaicstack.dev
export MOSAIC_TELEMETRY_API_KEY=your-64-char-hex-api-key
export MOSAIC_TELEMETRY_INSTANCE_ID=a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d

Then create the config with no arguments:

config = TelemetryConfig()  # Reads from environment

Constructor values take priority over environment variables.

Full Configuration Reference

Parameter Type Default Env Var Description
server_url str "" (required) MOSAIC_TELEMETRY_SERVER_URL Telemetry API base URL (no trailing slash)
api_key str "" (required) MOSAIC_TELEMETRY_API_KEY 64-character hex API key
instance_id str "" (required) MOSAIC_TELEMETRY_INSTANCE_ID UUID identifying this Mosaic Stack instance
enabled bool True MOSAIC_TELEMETRY_ENABLED Enable/disable telemetry entirely
submit_interval_seconds float 300.0 -- How often the background submitter flushes queued events (seconds)
max_queue_size int 1000 -- Maximum events held in the in-memory queue
batch_size int 100 -- Events per batch (server maximum is 100)
request_timeout_seconds float 10.0 -- HTTP request timeout for API calls
prediction_cache_ttl_seconds float 21600.0 -- Prediction cache TTL (default 6 hours)
dry_run bool False -- Log batches but don't send to server
max_retries int 3 -- Retries on transient failures (429, timeouts, network errors)

Environment-Specific Configuration

Development:

config = TelemetryConfig(
    server_url="http://localhost:8000",
    api_key="a" * 64,
    instance_id="12345678-1234-1234-1234-123456789abc",
    dry_run=True,                    # Log but don't send
    submit_interval_seconds=10.0,    # Flush quickly for testing
)

Production:

config = TelemetryConfig(
    server_url="https://tel-api.mosaicstack.dev",
    api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
    instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
    submit_interval_seconds=300.0,   # Default: flush every 5 minutes
    max_retries=3,                   # Retry transient failures
)

Sync Usage (Threading)

Best for scripts, CLI tools, aider integrations, and non-async contexts. The SDK spawns a daemon thread that periodically flushes queued events.

from mosaicstack_telemetry import (
    TelemetryClient,
    TelemetryConfig,
    EventBuilder,
    TaskType,
    Provider,
    Harness,
    Complexity,
    Outcome,
    QualityGate,
)

config = TelemetryConfig(
    server_url="https://tel-api.mosaicstack.dev",
    api_key="your-64-char-hex-api-key-here...",
    instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
)

client = TelemetryClient(config)
client.start()  # Starts background daemon thread

event = (
    EventBuilder(instance_id=config.instance_id)
    .task_type(TaskType.IMPLEMENTATION)
    .model("claude-sonnet-4-5-20250929")
    .provider(Provider.ANTHROPIC)
    .harness_type(Harness.CLAUDE_CODE)
    .complexity_level(Complexity.MEDIUM)
    .outcome_value(Outcome.SUCCESS)
    .duration_ms(45000)
    .tokens(estimated_in=105000, estimated_out=45000, actual_in=112340, actual_out=38760)
    .cost(estimated=630000, actual=919200)
    .quality(
        passed=True,
        gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.TYPECHECK],
    )
    .context(compactions=2, rotations=0, utilization=0.72)
    .language("typescript")
    .build()
)

client.track(event)  # Non-blocking, thread-safe

client.stop()  # Flushes remaining events and stops the thread

Sync Context Manager

The context manager calls start() on entry and stop() (with flush) on exit:

with TelemetryClient(config) as client:
    client.track(event)
# Automatically flushed and stopped here

Async Usage (asyncio)

For asyncio-based applications (FastAPI, aiohttp, etc.). The SDK creates an asyncio task that periodically flushes events.

import asyncio
from mosaicstack_telemetry import TelemetryClient, TelemetryConfig

async def main():
    config = TelemetryConfig(
        server_url="https://tel-api.mosaicstack.dev",
        api_key="your-64-char-hex-api-key-here...",
        instance_id="a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d",
    )

    client = TelemetryClient(config)
    await client.start_async()  # Starts asyncio background task

    # track() is always synchronous and non-blocking
    client.track(event)

    await client.stop_async()  # Flushes remaining events

asyncio.run(main())

Async Context Manager

async with TelemetryClient(config) as client:
    client.track(event)
# Automatically flushed and stopped here

Key Difference: Sync vs Async

Aspect Sync Async
Start client.start() await client.start_async()
Stop client.stop() await client.stop_async()
Context manager with TelemetryClient(config) async with TelemetryClient(config)
Background mechanism threading.Timer (daemon thread) asyncio.Task
track() Always synchronous Always synchronous
refresh_predictions refresh_predictions_sync(queries) await refresh_predictions(queries)

The track() method is always synchronous regardless of which mode you use. It simply appends to a thread-safe in-memory queue and returns immediately. The background submitter handles batching and sending.


Integration Examples

Example 1: Instrumenting a FastAPI Service

import os
import time
from contextlib import asynccontextmanager
from uuid import UUID

from fastapi import FastAPI

from mosaicstack_telemetry import (
    Complexity,
    EventBuilder,
    Harness,
    Outcome,
    Provider,
    QualityGate,
    TaskType,
    TelemetryClient,
    TelemetryConfig,
)

# Initialize telemetry once at startup
config = TelemetryConfig(
    server_url=os.environ.get("MOSAIC_TELEMETRY_SERVER_URL", "https://tel-api.mosaicstack.dev"),
    api_key=os.environ["MOSAIC_TELEMETRY_API_KEY"],
    instance_id=os.environ["MOSAIC_TELEMETRY_INSTANCE_ID"],
)

telemetry = TelemetryClient(config)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Start telemetry on app startup, flush on shutdown."""
    await telemetry.start_async()
    yield
    await telemetry.stop_async()


app = FastAPI(lifespan=lifespan)


@app.post("/tasks/complete")
async def complete_task(
    task_type: str,
    model: str,
    provider: str,
    complexity: str,
    actual_input_tokens: int,
    actual_output_tokens: int,
    actual_cost_usd_micros: int,
    duration_ms: int,
    outcome: str,
):
    """Record a completed AI coding task."""
    event = (
        EventBuilder(instance_id=config.instance_id)
        .task_type(TaskType(task_type))
        .model(model)
        .provider(Provider(provider))
        .harness_type(Harness.CLAUDE_CODE)
        .complexity_level(Complexity(complexity))
        .outcome_value(Outcome(outcome))
        .duration_ms(duration_ms)
        .tokens(
            estimated_in=0,
            estimated_out=0,
            actual_in=actual_input_tokens,
            actual_out=actual_output_tokens,
        )
        .cost(estimated=0, actual=actual_cost_usd_micros)
        .quality(passed=outcome == "success", gates_run=[QualityGate.BUILD, QualityGate.TEST])
        .context(compactions=0, rotations=0, utilization=0.0)
        .build()
    )

    telemetry.track(event)  # Non-blocking, never throws
    return {"status": "tracked"}

Example 2: Instrumenting a Generic Python App

"""
Generic Python script that tracks AI coding tasks.
Suitable for CLI tools, batch processors, or any non-async application.
"""

import os
import time

from mosaicstack_telemetry import (
    Complexity,
    EventBuilder,
    Harness,
    Outcome,
    Provider,
    QualityGate,
    RepoSizeCategory,
    TaskType,
    TelemetryClient,
    TelemetryConfig,
)


def run_coding_task() -> dict:
    """Simulate an AI coding task. Returns task metrics."""
    start = time.monotonic()

    # ... your AI coding logic here ...

    elapsed_ms = int((time.monotonic() - start) * 1000)
    return {
        "duration_ms": elapsed_ms,
        "actual_input_tokens": 4500,
        "actual_output_tokens": 1800,
        "actual_cost_usd_micros": 48000,
        "outcome": "success",
        "quality_gates_passed": True,
    }


def main():
    config = TelemetryConfig()  # Reads from MOSAIC_TELEMETRY_* env vars

    with TelemetryClient(config) as client:
        result = run_coding_task()

        event = (
            EventBuilder(instance_id=config.instance_id)
            .task_type(TaskType.IMPLEMENTATION)
            .model("claude-sonnet-4-5-20250929")
            .provider(Provider.ANTHROPIC)
            .harness_type(Harness.AIDER)
            .complexity_level(Complexity.MEDIUM)
            .outcome_value(Outcome(result["outcome"]))
            .duration_ms(result["duration_ms"])
            .tokens(
                estimated_in=5000,
                estimated_out=2000,
                actual_in=result["actual_input_tokens"],
                actual_out=result["actual_output_tokens"],
            )
            .cost(estimated=50000, actual=result["actual_cost_usd_micros"])
            .quality(
                passed=result["quality_gates_passed"],
                gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST],
            )
            .context(compactions=0, rotations=0, utilization=0.35)
            .language("python")
            .repo_size(RepoSizeCategory.MEDIUM)
            .build()
        )

        client.track(event)
        print(f"Tracked task: {event.event_id}")

    # Client is automatically flushed and stopped after the `with` block


if __name__ == "__main__":
    main()

Building Events

The EventBuilder provides a fluent API for constructing TaskCompletionEvent objects with sensible defaults. All setter methods return the builder instance for chaining.

Required Fields

Every event requires these fields to be set (either via builder methods or from defaults):

Builder Method Sets Field Default
EventBuilder(instance_id=...) instance_id (required)
.task_type(TaskType.X) task_type unknown
.model("model-name") model "unknown"
.provider(Provider.X) provider unknown
.harness_type(Harness.X) harness unknown
.complexity_level(Complexity.X) complexity medium
.outcome_value(Outcome.X) outcome failure
.duration_ms(N) task_duration_ms 0
.tokens(...) token fields all 0
.cost(...) cost fields all 0
.quality(...) quality fields passed=False, gates_run=[], gates_failed=[]
.context(...) context fields all 0 / 0.0

Auto-Generated Fields

Field Auto-generated Value
event_id Random UUID (override with .event_id(uuid))
timestamp Current UTC time (override with .timestamp(dt))
schema_version "1.0" (set automatically by TaskCompletionEvent)

Optional Fields

Builder Method Sets Field Default
.language("python") language None
.repo_size(RepoSizeCategory.MEDIUM) repo_size_category None
.retry_count(N) retry_count 0

Token and Cost Values

Costs are expressed in microdollars (1 USD = 1,000,000 microdollars). This avoids floating-point precision issues.

event = (
    EventBuilder(instance_id=config.instance_id)
    # ... other fields ...
    .tokens(
        estimated_in=105000,    # Pre-task estimate: input tokens
        estimated_out=45000,    # Pre-task estimate: output tokens
        actual_in=112340,       # Actual input tokens consumed
        actual_out=38760,       # Actual output tokens generated
    )
    .cost(
        estimated=630000,       # $0.63 in microdollars
        actual=919200,          # $0.92 in microdollars
    )
    .build()
)

Quality Gates

Record which quality gates were executed and their results:

event = (
    EventBuilder(instance_id=config.instance_id)
    # ... other fields ...
    .quality(
        passed=False,
        gates_run=[QualityGate.BUILD, QualityGate.LINT, QualityGate.TEST, QualityGate.COVERAGE],
        gates_failed=[QualityGate.COVERAGE],
    )
    .build()
)

Available gates: BUILD, LINT, TEST, COVERAGE, TYPECHECK, SECURITY.


Querying Predictions

The SDK can query crowd-sourced predictions from the telemetry server. Predictions provide percentile distributions for token usage, cost, duration, and quality metrics based on aggregated data from all participating instances.

Fetching Predictions

from mosaicstack_telemetry import PredictionQuery, TaskType, Provider, Complexity

queries = [
    PredictionQuery(
        task_type=TaskType.IMPLEMENTATION,
        model="claude-sonnet-4-5-20250929",
        provider=Provider.ANTHROPIC,
        complexity=Complexity.MEDIUM,
    ),
    PredictionQuery(
        task_type=TaskType.TESTING,
        model="claude-haiku-4-5-20251001",
        provider=Provider.ANTHROPIC,
        complexity=Complexity.LOW,
    ),
]

# Async
await client.refresh_predictions(queries)

# Sync
client.refresh_predictions_sync(queries)

Reading from Cache

Predictions are stored in a TTL-based in-memory cache (default: 6 hours):

prediction = client.get_prediction(queries[0])

if prediction and prediction.prediction:
    data = prediction.prediction
    print(f"Input tokens (median): {data.input_tokens.median}")
    print(f"Input tokens (p90): {data.input_tokens.p90}")
    print(f"Output tokens (median): {data.output_tokens.median}")
    print(f"Cost (median): ${data.cost_usd_micros['median'] / 1_000_000:.4f}")
    print(f"Duration (median): {data.duration_ms['median'] / 1000:.1f}s")
    print(f"Correction factor (input): {data.correction_factors.input:.2f}")
    print(f"Quality gate pass rate: {data.quality.gate_pass_rate:.0%}")
    print(f"Success rate: {data.quality.success_rate:.0%}")

    meta = prediction.metadata
    print(f"Sample size: {meta.sample_size}")
    print(f"Confidence: {meta.confidence}")
    if meta.fallback_note:
        print(f"Note: {meta.fallback_note}")
else:
    print("No prediction data available for this combination")

Prediction Confidence Levels

Level Meaning
high 100+ samples, exact dimension match
medium 30-99 samples, exact dimension match
low <30 samples or fallback was used
none No data available; prediction is None

Error Handling

The track() Contract

track() never throws and never blocks the caller. If anything goes wrong (queue full, telemetry disabled, unexpected error), the event is silently dropped and the error is logged. This ensures telemetry instrumentation never affects your application's behavior.

# This is always safe, even if telemetry is misconfigured
client.track(event)

Queue Overflow

When the in-memory queue reaches max_queue_size (default 1000), the oldest events are evicted to make room for new ones. A warning is logged when this happens.

Submission Retries

The background submitter retries transient failures with exponential backoff and jitter:

  • 429 Too Many Requests: Honors the server's Retry-After header
  • Timeouts: Retried with backoff
  • Network errors: Retried with backoff
  • 403 Forbidden: Not retried (configuration error)

Failed batches are re-queued for the next submission cycle (up to queue capacity).

Logging

All SDK logging uses the mosaicstack_telemetry logger. Enable it to see submission activity:

import logging

logging.basicConfig(level=logging.DEBUG)
# Or target the SDK logger specifically:
logging.getLogger("mosaicstack_telemetry").setLevel(logging.DEBUG)

Dry-Run Mode

Test your integration without sending data to the server:

config = TelemetryConfig(
    server_url="https://tel-api.mosaicstack.dev",
    api_key="a" * 64,
    instance_id="12345678-1234-1234-1234-123456789abc",
    dry_run=True,
)

with TelemetryClient(config) as client:
    client.track(event)
    # Logs: "[DRY RUN] Would submit batch of 1 events to ..."

Disabling Telemetry

Set enabled=False or the environment variable MOSAIC_TELEMETRY_ENABLED=false:

config = TelemetryConfig(enabled=False)

with TelemetryClient(config) as client:
    client.track(event)  # Silently dropped, no background thread started

Versioning

This SDK publishes two types of versions to the Mosaic Stack PyPI registry at git.mosaicstack.dev:

Channel Version Format Example Branch Install
Stable X.Y.Z 0.1.0 main / tags pip install mosaicstack-telemetry
Dev X.Y.Z.devYYYYMMDDHHMMSS 0.1.0.dev20260215045901 develop pip install mosaicstack-telemetry --pre

Dev versions use a UTC timestamp suffix following PEP 440. This convention is shared across Mosaic Stack SDKs:

SDK Dev Version Format Example
Python (this SDK) X.Y.Z.devYYYYMMDDHHMMSS 0.1.0.dev20260215045901
JavaScript X.Y.Z-dev.YYYYMMDDHHmmss 0.1.0-dev.20260215045901

By default, pip install resolves only stable releases. To install the latest dev build:

pip install mosaicstack-telemetry --pre --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/

To pin to a specific dev version:

pip install mosaicstack-telemetry==0.1.0.dev20260215045901 --index-url https://git.mosaicstack.dev/api/packages/mosaic/pypi/simple/

API Compatibility

SDK Version Telemetry API Event Schema Notes
0.1.x v1 (/v1/*) 1.0 Current release

The SDK submits events to POST /v1/events/batch and queries predictions from POST /v1/predictions/batch. These are the only two server endpoints the SDK communicates with.

For the full server API documentation, see the Mosaic Telemetry API Reference.