Standalone Python package (mosaicstack-telemetry) for reporting task-completion telemetry and querying predictions from the Mosaic Stack Telemetry server. - Sync/async TelemetryClient with context manager support - Thread-safe EventQueue with bounded deque - BatchSubmitter with httpx, exponential backoff, Retry-After - PredictionCache with TTL - EventBuilder convenience class - All types standalone (no server dependency) - 55 tests, 90% coverage, mypy strict clean Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""Tests for TelemetryClient."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import httpx
|
|
import pytest
|
|
import respx
|
|
|
|
from mosaicstack_telemetry.client import TelemetryClient
|
|
from mosaicstack_telemetry.config import TelemetryConfig
|
|
from mosaicstack_telemetry.types.events import (
|
|
Complexity,
|
|
Provider,
|
|
TaskType,
|
|
)
|
|
from mosaicstack_telemetry.types.predictions import (
|
|
PredictionMetadata,
|
|
PredictionQuery,
|
|
PredictionResponse,
|
|
)
|
|
from tests.conftest import (
|
|
TEST_API_KEY,
|
|
TEST_INSTANCE_ID,
|
|
TEST_SERVER_URL,
|
|
make_event,
|
|
)
|
|
|
|
|
|
class TestTelemetryClientLifecycle:
|
|
"""Tests for client start/stop lifecycle."""
|
|
|
|
def test_start_stop_sync(self, config: TelemetryConfig) -> None:
|
|
"""Client can start and stop synchronously."""
|
|
client = TelemetryClient(config)
|
|
client.start()
|
|
assert client.is_running is True
|
|
client.stop()
|
|
assert client.is_running is False
|
|
|
|
async def test_start_stop_async(self, config: TelemetryConfig) -> None:
|
|
"""Client can start and stop asynchronously."""
|
|
client = TelemetryClient(config)
|
|
await client.start_async()
|
|
assert client.is_running is True
|
|
await client.stop_async()
|
|
assert client.is_running is False
|
|
|
|
def test_start_disabled(self, disabled_config: TelemetryConfig) -> None:
|
|
"""Starting a disabled client is a no-op."""
|
|
client = TelemetryClient(disabled_config)
|
|
client.start()
|
|
assert client.is_running is False
|
|
client.stop()
|
|
|
|
async def test_start_async_disabled(self, disabled_config: TelemetryConfig) -> None:
|
|
"""Starting a disabled async client is a no-op."""
|
|
client = TelemetryClient(disabled_config)
|
|
await client.start_async()
|
|
assert client.is_running is False
|
|
await client.stop_async()
|
|
|
|
|
|
class TestTelemetryClientTrack:
|
|
"""Tests for the track() method."""
|
|
|
|
def test_track_queues_event(self, config: TelemetryConfig) -> None:
|
|
"""track() adds event to the queue."""
|
|
client = TelemetryClient(config)
|
|
event = make_event()
|
|
client.track(event)
|
|
assert client.queue_size == 1
|
|
|
|
def test_track_multiple_events(self, config: TelemetryConfig) -> None:
|
|
"""Multiple events can be tracked."""
|
|
client = TelemetryClient(config)
|
|
for _ in range(5):
|
|
client.track(make_event())
|
|
assert client.queue_size == 5
|
|
|
|
def test_track_disabled_drops_event(self, disabled_config: TelemetryConfig) -> None:
|
|
"""track() silently drops events when disabled."""
|
|
client = TelemetryClient(disabled_config)
|
|
client.track(make_event())
|
|
assert client.queue_size == 0
|
|
|
|
def test_track_never_throws(self, config: TelemetryConfig) -> None:
|
|
"""track() should never raise exceptions."""
|
|
client = TelemetryClient(config)
|
|
# This should not raise even with invalid-ish usage
|
|
event = make_event()
|
|
client.track(event)
|
|
assert client.queue_size == 1
|
|
|
|
|
|
class TestTelemetryClientContextManager:
|
|
"""Tests for context manager support."""
|
|
|
|
@respx.mock
|
|
def test_sync_context_manager(self, config: TelemetryConfig) -> None:
|
|
"""Sync context manager starts and stops correctly."""
|
|
# Mock any potential flush calls
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(
|
|
202,
|
|
json={"accepted": 0, "rejected": 0, "results": []},
|
|
)
|
|
)
|
|
|
|
with TelemetryClient(config) as client:
|
|
assert client.is_running is True
|
|
client.track(make_event())
|
|
|
|
assert client.is_running is False
|
|
|
|
@respx.mock
|
|
async def test_async_context_manager(self, config: TelemetryConfig) -> None:
|
|
"""Async context manager starts and stops correctly."""
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(
|
|
202,
|
|
json={"accepted": 0, "rejected": 0, "results": []},
|
|
)
|
|
)
|
|
|
|
async with TelemetryClient(config) as client:
|
|
assert client.is_running is True
|
|
client.track(make_event())
|
|
|
|
assert client.is_running is False
|
|
|
|
|
|
class TestTelemetryClientPredictions:
|
|
"""Tests for prediction caching and retrieval."""
|
|
|
|
def test_get_prediction_miss(self, config: TelemetryConfig) -> None:
|
|
"""get_prediction returns None on cache miss."""
|
|
client = TelemetryClient(config)
|
|
query = PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="test-model",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
assert client.get_prediction(query) is None
|
|
|
|
def test_get_prediction_after_cache_populated(self, config: TelemetryConfig) -> None:
|
|
"""get_prediction returns cached value."""
|
|
client = TelemetryClient(config)
|
|
query = PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="test-model",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
response = PredictionResponse(
|
|
prediction=None,
|
|
metadata=PredictionMetadata(
|
|
sample_size=50,
|
|
fallback_level=0,
|
|
confidence="medium",
|
|
),
|
|
)
|
|
# Directly populate the cache
|
|
client._prediction_cache.put(query, response)
|
|
|
|
result = client.get_prediction(query)
|
|
assert result is not None
|
|
assert result.metadata.sample_size == 50
|
|
|
|
@respx.mock
|
|
async def test_refresh_predictions_async(self, config: TelemetryConfig) -> None:
|
|
"""refresh_predictions fetches and caches predictions."""
|
|
query = PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="test-model",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
|
|
response_data = {
|
|
"results": [
|
|
{
|
|
"prediction": None,
|
|
"metadata": {
|
|
"sample_size": 75,
|
|
"fallback_level": 1,
|
|
"confidence": "medium",
|
|
},
|
|
}
|
|
]
|
|
}
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/predictions/batch").mock(
|
|
return_value=httpx.Response(200, json=response_data)
|
|
)
|
|
|
|
client = TelemetryClient(config)
|
|
await client.refresh_predictions([query])
|
|
|
|
result = client.get_prediction(query)
|
|
assert result is not None
|
|
assert result.metadata.sample_size == 75
|
|
|
|
@respx.mock
|
|
def test_refresh_predictions_sync(self, config: TelemetryConfig) -> None:
|
|
"""refresh_predictions_sync fetches and caches predictions."""
|
|
query = PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="test-model",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
|
|
response_data = {
|
|
"results": [
|
|
{
|
|
"prediction": None,
|
|
"metadata": {
|
|
"sample_size": 60,
|
|
"fallback_level": 0,
|
|
"confidence": "low",
|
|
},
|
|
}
|
|
]
|
|
}
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/predictions/batch").mock(
|
|
return_value=httpx.Response(200, json=response_data)
|
|
)
|
|
|
|
client = TelemetryClient(config)
|
|
client.refresh_predictions_sync([query])
|
|
|
|
result = client.get_prediction(query)
|
|
assert result is not None
|
|
assert result.metadata.sample_size == 60
|
|
|
|
@respx.mock
|
|
async def test_refresh_predictions_server_error(self, config: TelemetryConfig) -> None:
|
|
"""refresh_predictions handles server errors gracefully."""
|
|
query = PredictionQuery(
|
|
task_type=TaskType.IMPLEMENTATION,
|
|
model="test-model",
|
|
provider=Provider.ANTHROPIC,
|
|
complexity=Complexity.MEDIUM,
|
|
)
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/predictions/batch").mock(
|
|
return_value=httpx.Response(500, text="Internal Server Error")
|
|
)
|
|
|
|
client = TelemetryClient(config)
|
|
# Should not raise
|
|
await client.refresh_predictions([query])
|
|
# Cache should still be empty
|
|
assert client.get_prediction(query) is None
|
|
|
|
async def test_refresh_predictions_empty_list(self, config: TelemetryConfig) -> None:
|
|
"""refresh_predictions with empty list is a no-op."""
|
|
client = TelemetryClient(config)
|
|
await client.refresh_predictions([])
|
|
|
|
|
|
class TestTelemetryClientConfig:
|
|
"""Tests for configuration handling."""
|
|
|
|
def test_config_env_vars(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Environment variables override defaults."""
|
|
monkeypatch.setenv("MOSAIC_TELEMETRY_ENABLED", "false")
|
|
monkeypatch.setenv("MOSAIC_TELEMETRY_SERVER_URL", "https://env-server.com")
|
|
monkeypatch.setenv("MOSAIC_TELEMETRY_API_KEY", "b" * 64)
|
|
monkeypatch.setenv("MOSAIC_TELEMETRY_INSTANCE_ID", TEST_INSTANCE_ID)
|
|
|
|
config = TelemetryConfig()
|
|
assert config.enabled is False
|
|
assert config.server_url == "https://env-server.com"
|
|
assert config.api_key == "b" * 64
|
|
assert config.instance_id == TEST_INSTANCE_ID
|
|
|
|
def test_config_validation_errors(self) -> None:
|
|
"""Invalid config produces validation errors."""
|
|
config = TelemetryConfig(
|
|
server_url="",
|
|
api_key="short",
|
|
instance_id="not-a-uuid",
|
|
)
|
|
errors = config.validate()
|
|
assert len(errors) >= 3
|
|
|
|
def test_config_validation_success(self, config: TelemetryConfig) -> None:
|
|
"""Valid config produces no validation errors."""
|
|
errors = config.validate()
|
|
assert errors == []
|
|
|
|
def test_config_strips_trailing_slash(self) -> None:
|
|
"""server_url trailing slashes are stripped."""
|
|
config = TelemetryConfig(
|
|
server_url="https://example.com/",
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
)
|
|
assert config.server_url == "https://example.com"
|
|
|
|
def test_explicit_values_override_env(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Explicit constructor values take priority over env vars."""
|
|
monkeypatch.setenv("MOSAIC_TELEMETRY_SERVER_URL", "https://env-server.com")
|
|
config = TelemetryConfig(
|
|
server_url="https://explicit-server.com",
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
)
|
|
assert config.server_url == "https://explicit-server.com"
|