"""Main TelemetryClient — the public entry point for the SDK.""" from __future__ import annotations import logging from typing import Any import httpx from mosaicstack_telemetry._async import AsyncSubmitter from mosaicstack_telemetry._sync import SyncSubmitter from mosaicstack_telemetry.config import TelemetryConfig from mosaicstack_telemetry.prediction_cache import PredictionCache from mosaicstack_telemetry.queue import EventQueue from mosaicstack_telemetry.types.events import TaskCompletionEvent from mosaicstack_telemetry.types.predictions import ( PredictionQuery, PredictionResponse, ) logger = logging.getLogger("mosaicstack_telemetry") class TelemetryClient: """Main client for Mosaic Stack Telemetry. Supports both sync and async usage patterns: **Sync (threading-based):** client = TelemetryClient(config) client.start() client.track(event) client.stop() **Async (asyncio-based):** client = TelemetryClient(config) await client.start_async() client.track(event) await client.stop_async() **Context managers:** with TelemetryClient(config) as client: client.track(event) async with TelemetryClient(config) as client: client.track(event) """ def __init__(self, config: TelemetryConfig) -> None: errors = config.validate() if errors and config.enabled: logger.warning("Telemetry config validation errors: %s", "; ".join(errors)) self._config = config self._queue = EventQueue(max_size=config.max_queue_size) self._prediction_cache = PredictionCache(ttl_seconds=config.prediction_cache_ttl_seconds) self._sync_submitter: SyncSubmitter | None = None self._async_submitter: AsyncSubmitter | None = None def start(self) -> None: """Start background submission using threading.Timer loop.""" if not self._config.enabled: logger.info("Telemetry disabled, skipping start") return self._sync_submitter = SyncSubmitter(self._config, self._queue) self._sync_submitter.start() async def start_async(self) -> None: """Start with asyncio.Task for async contexts.""" if not self._config.enabled: logger.info("Telemetry disabled, skipping async start") return self._async_submitter = AsyncSubmitter(self._config, self._queue) await self._async_submitter.start() def stop(self) -> None: """Stop background submission, flush remaining events synchronously.""" if self._sync_submitter is not None: self._sync_submitter.stop() self._sync_submitter = None async def stop_async(self) -> None: """Async stop and flush.""" if self._async_submitter is not None: await self._async_submitter.stop() self._async_submitter = None def track(self, event: TaskCompletionEvent) -> None: """Queue an event for submission. Always synchronous. Never blocks or throws. If telemetry is disabled, the event is silently dropped. """ try: if not self._config.enabled: return self._queue.put(event) logger.debug("Event queued: %s", event.event_id) except Exception: logger.exception("Unexpected error in track()") def get_prediction(self, query: PredictionQuery) -> PredictionResponse | None: """Get a cached prediction. Returns None if not cached or expired.""" return self._prediction_cache.get(query) def refresh_predictions_sync(self, queries: list[PredictionQuery]) -> None: """Fetch fresh predictions from server synchronously.""" if not queries: return url = f"{self._config.server_url}/v1/predictions/batch" body = {"queries": [q.model_dump(mode="json") for q in queries]} try: with httpx.Client() as client: response = client.post( url, json=body, headers={"User-Agent": self._config.user_agent}, timeout=self._config.request_timeout_seconds, ) if response.status_code == 200: data = response.json() results = data.get("results", []) for query, result_data in zip(queries, results): pred = PredictionResponse.model_validate(result_data) self._prediction_cache.put(query, pred) logger.debug("Refreshed %d predictions", len(results)) else: logger.warning( "Prediction refresh failed with status %d", response.status_code, ) except Exception: logger.exception("Error refreshing predictions") async def refresh_predictions(self, queries: list[PredictionQuery]) -> None: """Fetch fresh predictions from server asynchronously.""" if not queries: return url = f"{self._config.server_url}/v1/predictions/batch" body = {"queries": [q.model_dump(mode="json") for q in queries]} try: async with httpx.AsyncClient() as client: response = await client.post( url, json=body, headers={"User-Agent": self._config.user_agent}, timeout=self._config.request_timeout_seconds, ) if response.status_code == 200: data = response.json() results = data.get("results", []) for query, result_data in zip(queries, results): pred = PredictionResponse.model_validate(result_data) self._prediction_cache.put(query, pred) logger.debug("Refreshed %d predictions", len(results)) else: logger.warning( "Prediction refresh failed with status %d", response.status_code, ) except Exception: logger.exception("Error refreshing predictions") @property def queue_size(self) -> int: """Number of events currently in the queue.""" return self._queue.size @property def is_running(self) -> bool: """Whether background submission is active.""" if self._sync_submitter is not None: return self._sync_submitter.is_running if self._async_submitter is not None: return self._async_submitter.is_running return False # Sync context manager def __enter__(self) -> TelemetryClient: self.start() return self def __exit__(self, *exc: Any) -> None: self.stop() # Async context manager async def __aenter__(self) -> TelemetryClient: await self.start_async() return self async def __aexit__(self, *exc: Any) -> None: await self.stop_async()