Standalone Python package (mosaicstack-telemetry) for reporting task-completion telemetry and querying predictions from the Mosaic Stack Telemetry server. - Sync/async TelemetryClient with context manager support - Thread-safe EventQueue with bounded deque - BatchSubmitter with httpx, exponential backoff, Retry-After - PredictionCache with TTL - EventBuilder convenience class - All types standalone (no server dependency) - 55 tests, 90% coverage, mypy strict clean Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
183 lines
5.3 KiB
Python
183 lines
5.3 KiB
Python
"""Shared test fixtures."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from uuid import UUID, uuid4
|
|
|
|
import pytest
|
|
|
|
from mosaicstack_telemetry.config import TelemetryConfig
|
|
from mosaicstack_telemetry.types.events import (
|
|
Complexity,
|
|
Harness,
|
|
Outcome,
|
|
Provider,
|
|
TaskCompletionEvent,
|
|
TaskType,
|
|
)
|
|
from mosaicstack_telemetry.types.predictions import (
|
|
CorrectionFactors,
|
|
PredictionData,
|
|
PredictionMetadata,
|
|
PredictionQuery,
|
|
PredictionResponse,
|
|
QualityPrediction,
|
|
TokenDistribution,
|
|
)
|
|
|
|
TEST_API_KEY = "a" * 64
|
|
TEST_INSTANCE_ID = "12345678-1234-1234-1234-123456789abc"
|
|
TEST_SERVER_URL = "https://telemetry.example.com"
|
|
|
|
|
|
@pytest.fixture()
|
|
def config() -> TelemetryConfig:
|
|
"""Create a valid test configuration."""
|
|
return TelemetryConfig(
|
|
server_url=TEST_SERVER_URL,
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
submit_interval_seconds=1.0,
|
|
max_queue_size=100,
|
|
batch_size=10,
|
|
request_timeout_seconds=5.0,
|
|
max_retries=1,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def dry_run_config() -> TelemetryConfig:
|
|
"""Create a test configuration with dry_run enabled."""
|
|
return TelemetryConfig(
|
|
server_url=TEST_SERVER_URL,
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
submit_interval_seconds=1.0,
|
|
max_queue_size=100,
|
|
batch_size=10,
|
|
request_timeout_seconds=5.0,
|
|
dry_run=True,
|
|
max_retries=1,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def disabled_config() -> TelemetryConfig:
|
|
"""Create a disabled test configuration."""
|
|
return TelemetryConfig(
|
|
server_url=TEST_SERVER_URL,
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
enabled=False,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def sample_instance_id() -> UUID:
|
|
"""Return a fixed instance UUID for testing."""
|
|
return UUID(TEST_INSTANCE_ID)
|
|
|
|
|
|
@pytest.fixture()
|
|
def sample_event(sample_instance_id: UUID) -> TaskCompletionEvent:
|
|
"""Create a sample task completion event."""
|
|
return TaskCompletionEvent(
|
|
instance_id=sample_instance_id,
|
|
event_id=uuid4(),
|
|
timestamp=datetime.now(timezone.utc),
|
|
task_duration_ms=30000,
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
complexity=Complexity.MEDIUM,
|
|
harness=Harness.CLAUDE_CODE,
|
|
model="claude-sonnet-4-20250514",
|
|
provider=Provider.ANTHROPIC,
|
|
estimated_input_tokens=5000,
|
|
estimated_output_tokens=2000,
|
|
actual_input_tokens=5200,
|
|
actual_output_tokens=1800,
|
|
estimated_cost_usd_micros=10000,
|
|
actual_cost_usd_micros=9500,
|
|
quality_gate_passed=True,
|
|
quality_gates_run=[],
|
|
quality_gates_failed=[],
|
|
context_compactions=0,
|
|
context_rotations=0,
|
|
context_utilization_final=0.4,
|
|
outcome=Outcome.SUCCESS,
|
|
retry_count=0,
|
|
language="python",
|
|
repo_size_category=None,
|
|
)
|
|
|
|
|
|
def make_event(instance_id: UUID | None = None) -> TaskCompletionEvent:
|
|
"""Factory helper to create a sample event with optional overrides."""
|
|
return TaskCompletionEvent(
|
|
instance_id=instance_id or UUID(TEST_INSTANCE_ID),
|
|
event_id=uuid4(),
|
|
timestamp=datetime.now(timezone.utc),
|
|
task_duration_ms=15000,
|
|
task_type=TaskType.DEBUGGING,
|
|
complexity=Complexity.LOW,
|
|
harness=Harness.AIDER,
|
|
model="gpt-4o",
|
|
provider=Provider.OPENAI,
|
|
estimated_input_tokens=1000,
|
|
estimated_output_tokens=500,
|
|
actual_input_tokens=1100,
|
|
actual_output_tokens=480,
|
|
estimated_cost_usd_micros=3000,
|
|
actual_cost_usd_micros=2800,
|
|
quality_gate_passed=True,
|
|
quality_gates_run=[],
|
|
quality_gates_failed=[],
|
|
context_compactions=0,
|
|
context_rotations=0,
|
|
context_utilization_final=0.2,
|
|
outcome=Outcome.SUCCESS,
|
|
retry_count=0,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def sample_prediction_query() -> PredictionQuery:
|
|
"""Create a sample prediction query."""
|
|
return PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="claude-sonnet-4-20250514",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def sample_prediction_response() -> PredictionResponse:
|
|
"""Create a sample prediction response."""
|
|
return PredictionResponse(
|
|
prediction=PredictionData(
|
|
input_tokens=TokenDistribution(p10=1000, p25=2000, median=3000, p75=4000, p90=5000),
|
|
output_tokens=TokenDistribution(p10=500, p25=1000, median=1500, p75=2000, p90=2500),
|
|
cost_usd_micros={"p10": 1000, "median": 3000, "p90": 5000},
|
|
duration_ms={"p10": 10000, "median": 30000, "p90": 60000},
|
|
correction_factors=CorrectionFactors(input=1.05, output=0.95),
|
|
quality=QualityPrediction(gate_pass_rate=0.85, success_rate=0.9),
|
|
),
|
|
metadata=PredictionMetadata(
|
|
sample_size=150,
|
|
fallback_level=0,
|
|
confidence="high",
|
|
),
|
|
)
|
|
|
|
|
|
def make_batch_response_json(events: list[TaskCompletionEvent]) -> dict:
|
|
"""Create a batch response JSON dict for a list of events (all accepted)."""
|
|
return {
|
|
"accepted": len(events),
|
|
"rejected": 0,
|
|
"results": [
|
|
{"event_id": str(e.event_id), "status": "accepted", "error": None} for e in events
|
|
],
|
|
}
|