"""OpenTelemetry telemetry initialization and configuration.""" import logging import os from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased logger = logging.getLogger(__name__) class TelemetryService: """Service responsible for OpenTelemetry distributed tracing. Initializes the OTEL SDK with OTLP exporters and provides tracing utilities for coordinator operations. Example: >>> service = TelemetryService() >>> service.initialize() >>> tracer = service.get_tracer() """ def __init__(self) -> None: """Initialize the TelemetryService.""" self.enabled = os.getenv("OTEL_ENABLED", "true").lower() != "false" self.service_name = os.getenv("OTEL_SERVICE_NAME", "mosaic-coordinator") self.provider: TracerProvider | None = None self._tracer: trace.Tracer | None = None def _get_sampling_ratio(self) -> float: """Get the trace sampling ratio from environment variable. Returns 1.0 (sample all traces) by default. Clamps value between 0.0 and 1.0. Returns: The sampling ratio between 0.0 and 1.0 """ env_value = os.getenv("OTEL_TRACES_SAMPLER_ARG") if not env_value: return 1.0 try: parsed = float(env_value) except ValueError: logger.warning( f"Invalid OTEL_TRACES_SAMPLER_ARG value: {env_value}, using default 1.0" ) return 1.0 # Clamp to valid range clamped = max(0.0, min(1.0, parsed)) if clamped != parsed: logger.warning(f"OTEL_TRACES_SAMPLER_ARG clamped from {parsed} to {clamped}") return clamped def _get_deployment_environment(self) -> str: """Get the deployment environment from environment variables. Defaults to 'development' if not set. Returns: The deployment environment string """ return os.getenv("OTEL_DEPLOYMENT_ENVIRONMENT", "development") def _get_otlp_endpoint(self) -> str: """Get the OTLP endpoint from environment variable. Returns: The OTLP endpoint URL """ return os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318/v1/traces") def initialize(self) -> None: """Initialize the OpenTelemetry SDK with configured exporters. This should be called during application startup. """ if not self.enabled: logger.info("OpenTelemetry tracing is disabled") self._tracer = trace.get_tracer("noop") return try: # Create resource with service metadata resource = Resource.create( attributes={ "service.name": self.service_name, "service.version": "0.0.1", "deployment.environment": self._get_deployment_environment(), } ) # Create sampler with parent-based strategy sampling_ratio = self._get_sampling_ratio() sampler = ParentBased(root=TraceIdRatioBased(sampling_ratio)) # Create tracer provider self.provider = TracerProvider(resource=resource, sampler=sampler) # Create OTLP exporter otlp_endpoint = self._get_otlp_endpoint() exporter = OTLPSpanExporter(endpoint=otlp_endpoint) # Add span processor processor = BatchSpanProcessor(exporter) self.provider.add_span_processor(processor) # Set global tracer provider trace.set_tracer_provider(self.provider) # Get tracer instance self._tracer = trace.get_tracer(self.service_name) logger.info( f"OpenTelemetry SDK started for service: {self.service_name} " f"(environment: {self._get_deployment_environment()}, " f"sampling: {sampling_ratio}, endpoint: {otlp_endpoint})" ) except Exception as e: logger.error(f"Failed to initialize OpenTelemetry SDK: {e}") # Fallback to noop tracer to prevent application failures self._tracer = trace.get_tracer("noop") def get_tracer(self) -> trace.Tracer: """Get the tracer instance for creating spans. Returns: The configured tracer instance """ if self._tracer is None: # Initialize if not already done self.initialize() # Type narrowing after None guard assert self._tracer is not None # nosec B101 return self._tracer def shutdown(self) -> None: """Shutdown the OpenTelemetry SDK gracefully. This should be called during application shutdown. """ if self.provider is not None: try: self.provider.shutdown() logger.info("OpenTelemetry SDK shut down successfully") except Exception as e: logger.error(f"Error shutting down OpenTelemetry SDK: {e}") # Global telemetry service instance _telemetry_service: TelemetryService | None = None def get_tracer() -> trace.Tracer: """Get the global tracer instance. Returns: The configured tracer instance """ global _telemetry_service if _telemetry_service is None: _telemetry_service = TelemetryService() _telemetry_service.initialize() return _telemetry_service.get_tracer() def shutdown_telemetry() -> None: """Shutdown the global telemetry service. This should be called during application shutdown. """ global _telemetry_service if _telemetry_service is not None: _telemetry_service.shutdown()