- Fix 20 ruff errors: UP035 (Callable import), UP042 (StrEnum), E501 (line length), F401 (unused imports), UP045 (Optional -> X | None), I001 (import sorting) - Fix mypy error: wrap slowapi rate limit handler with Exception-compatible signature for add_exception_handler - Pin pip >= 25.3 in Dockerfile (CVE-2025-8869, CVE-2026-1703) - Add nosec B104 to config.py (container-bound 0.0.0.0 is acceptable) - Add nosec B101 to telemetry.py (assert for type narrowing) - Create bandit.yaml to suppress B404/B607/B603 in gates/ tooling Fixes #365 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
186 lines
7.2 KiB
Python
186 lines
7.2 KiB
Python
"""Tests for OpenTelemetry telemetry initialization."""
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.telemetry import TelemetryService, get_tracer
|
|
|
|
|
|
@pytest.fixture
|
|
def reset_telemetry():
|
|
"""Fixture to preserve and restore global telemetry state."""
|
|
import src.telemetry
|
|
original = src.telemetry._telemetry_service
|
|
yield
|
|
src.telemetry._telemetry_service = original
|
|
|
|
|
|
class TestTelemetryService:
|
|
"""Test suite for TelemetryService."""
|
|
|
|
def test_telemetry_service_init_enabled(self) -> None:
|
|
"""Test TelemetryService initialization when enabled."""
|
|
with patch.dict("os.environ", {"OTEL_ENABLED": "true"}):
|
|
service = TelemetryService()
|
|
assert service.enabled is True
|
|
assert service.service_name == "mosaic-coordinator"
|
|
|
|
def test_telemetry_service_init_disabled(self) -> None:
|
|
"""Test TelemetryService initialization when disabled."""
|
|
with patch.dict("os.environ", {"OTEL_ENABLED": "false"}):
|
|
service = TelemetryService()
|
|
assert service.enabled is False
|
|
|
|
def test_telemetry_service_custom_service_name(self) -> None:
|
|
"""Test TelemetryService with custom service name."""
|
|
with patch.dict("os.environ", {"OTEL_SERVICE_NAME": "custom-service"}):
|
|
service = TelemetryService()
|
|
assert service.service_name == "custom-service"
|
|
|
|
@patch("src.telemetry.TracerProvider")
|
|
@patch("src.telemetry.Resource.create")
|
|
@patch("src.telemetry.OTLPSpanExporter")
|
|
def test_telemetry_service_initialize(
|
|
self,
|
|
mock_exporter: MagicMock,
|
|
mock_resource_create: MagicMock,
|
|
mock_provider: MagicMock,
|
|
) -> None:
|
|
"""Test TelemetryService initialization with SDK setup."""
|
|
with patch.dict(
|
|
"os.environ",
|
|
{
|
|
"OTEL_ENABLED": "true",
|
|
"OTEL_SERVICE_NAME": "test-service",
|
|
"OTEL_DEPLOYMENT_ENVIRONMENT": "test",
|
|
},
|
|
):
|
|
service = TelemetryService()
|
|
service.initialize()
|
|
|
|
# Verify Resource was created with correct attributes
|
|
mock_resource_create.assert_called_once()
|
|
call_kwargs = mock_resource_create.call_args[1]
|
|
assert call_kwargs["attributes"]["service.name"] == "test-service"
|
|
assert call_kwargs["attributes"]["service.version"] == "0.0.1"
|
|
assert call_kwargs["attributes"]["deployment.environment"] == "test"
|
|
|
|
# Verify exporter was created
|
|
mock_exporter.assert_called_once()
|
|
|
|
# Verify TracerProvider was created
|
|
mock_provider.assert_called_once()
|
|
|
|
def test_telemetry_service_get_tracer(self) -> None:
|
|
"""Test getting tracer instance."""
|
|
with patch.dict("os.environ", {"OTEL_ENABLED": "false"}):
|
|
service = TelemetryService()
|
|
tracer = service.get_tracer()
|
|
assert tracer is not None
|
|
|
|
@patch("src.telemetry.TracerProvider")
|
|
def test_telemetry_service_shutdown(self, mock_provider: MagicMock) -> None:
|
|
"""Test TelemetryService shutdown."""
|
|
with patch.dict("os.environ", {"OTEL_ENABLED": "true"}):
|
|
service = TelemetryService()
|
|
service.provider = mock_provider.return_value
|
|
service.shutdown()
|
|
mock_provider.return_value.shutdown.assert_called_once()
|
|
|
|
def test_telemetry_service_shutdown_when_disabled(self) -> None:
|
|
"""Test shutdown when telemetry is disabled."""
|
|
with patch.dict("os.environ", {"OTEL_ENABLED": "false"}):
|
|
service = TelemetryService()
|
|
# Should not raise exception
|
|
service.shutdown()
|
|
|
|
def test_get_sampling_ratio_default(self) -> None:
|
|
"""Test default sampling ratio."""
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
service = TelemetryService()
|
|
ratio = service._get_sampling_ratio()
|
|
assert ratio == 1.0
|
|
|
|
def test_get_sampling_ratio_custom(self) -> None:
|
|
"""Test custom sampling ratio."""
|
|
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "0.5"}):
|
|
service = TelemetryService()
|
|
ratio = service._get_sampling_ratio()
|
|
assert ratio == 0.5
|
|
|
|
def test_get_sampling_ratio_invalid(self) -> None:
|
|
"""Test invalid sampling ratio falls back to default."""
|
|
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "invalid"}):
|
|
service = TelemetryService()
|
|
ratio = service._get_sampling_ratio()
|
|
assert ratio == 1.0
|
|
|
|
def test_get_sampling_ratio_out_of_range(self) -> None:
|
|
"""Test sampling ratio is clamped to valid range."""
|
|
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "1.5"}):
|
|
service = TelemetryService()
|
|
ratio = service._get_sampling_ratio()
|
|
assert ratio == 1.0
|
|
|
|
with patch.dict("os.environ", {"OTEL_TRACES_SAMPLER_ARG": "-0.5"}):
|
|
service = TelemetryService()
|
|
ratio = service._get_sampling_ratio()
|
|
assert ratio == 0.0
|
|
|
|
def test_get_deployment_environment_default(self) -> None:
|
|
"""Test default deployment environment."""
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
service = TelemetryService()
|
|
env = service._get_deployment_environment()
|
|
assert env == "development"
|
|
|
|
def test_get_deployment_environment_custom(self) -> None:
|
|
"""Test custom deployment environment."""
|
|
with patch.dict("os.environ", {"OTEL_DEPLOYMENT_ENVIRONMENT": "production"}):
|
|
service = TelemetryService()
|
|
env = service._get_deployment_environment()
|
|
assert env == "production"
|
|
|
|
def test_get_otlp_endpoint_default(self) -> None:
|
|
"""Test default OTLP endpoint."""
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
service = TelemetryService()
|
|
endpoint = service._get_otlp_endpoint()
|
|
assert endpoint == "http://localhost:4318/v1/traces"
|
|
|
|
def test_get_otlp_endpoint_custom(self) -> None:
|
|
"""Test custom OTLP endpoint."""
|
|
with patch.dict(
|
|
"os.environ", {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://jaeger:4318/v1/traces"}
|
|
):
|
|
service = TelemetryService()
|
|
endpoint = service._get_otlp_endpoint()
|
|
assert endpoint == "http://jaeger:4318/v1/traces"
|
|
|
|
|
|
class TestGetTracer:
|
|
"""Test suite for get_tracer helper function."""
|
|
|
|
def test_get_tracer_returns_tracer(self) -> None:
|
|
"""Test that get_tracer returns a tracer instance."""
|
|
tracer = get_tracer()
|
|
assert tracer is not None
|
|
|
|
@patch("src.telemetry.trace.get_tracer")
|
|
@patch("src.telemetry.trace.set_tracer_provider")
|
|
def test_get_tracer_uses_service_name(
|
|
self, mock_set_provider: MagicMock, mock_get_tracer_func: MagicMock, reset_telemetry
|
|
) -> None:
|
|
"""Test that get_tracer uses the correct service name."""
|
|
with patch.dict(
|
|
"os.environ",
|
|
{"OTEL_SERVICE_NAME": "test-service", "OTEL_ENABLED": "true"},
|
|
):
|
|
# Reset global state
|
|
import src.telemetry
|
|
src.telemetry._telemetry_service = None
|
|
|
|
get_tracer()
|
|
mock_get_tracer_func.assert_called_with("test-service")
|