Files
telemetry-client-py/tests/test_submitter.py

208 lines
6.8 KiB
Python

"""Tests for batch submission logic."""
from __future__ import annotations
import httpx
import pytest
import respx
from mosaicstack_telemetry.config import TelemetryConfig
from mosaicstack_telemetry.submitter import submit_batch_async, submit_batch_sync
from tests.conftest import (
TEST_API_KEY,
TEST_INSTANCE_ID,
TEST_SERVER_URL,
make_batch_response_json,
make_event,
)
@pytest.fixture()
def fast_config() -> TelemetryConfig:
"""Config with minimal retries and timeouts for fast tests."""
return TelemetryConfig(
server_url=TEST_SERVER_URL,
api_key=TEST_API_KEY,
instance_id=TEST_INSTANCE_ID,
max_retries=1,
request_timeout_seconds=2.0,
)
class TestSubmitBatchSync:
"""Tests for synchronous batch submission."""
@respx.mock
def test_successful_submission(self, fast_config: TelemetryConfig) -> None:
"""Successful 202 response returns BatchEventResponse."""
events = [make_event() for _ in range(3)]
response_json = make_batch_response_json(events)
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
return_value=httpx.Response(202, json=response_json)
)
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is not None
assert result.accepted == 3
assert result.rejected == 0
@respx.mock
def test_429_with_retry_after(self, fast_config: TelemetryConfig) -> None:
"""429 response respects Retry-After header and retries."""
events = [make_event()]
response_json = make_batch_response_json(events)
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
route.side_effect = [
httpx.Response(429, headers={"Retry-After": "0.1"}),
httpx.Response(202, json=response_json),
]
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is not None
assert result.accepted == 1
@respx.mock
def test_403_returns_none(self, fast_config: TelemetryConfig) -> None:
"""403 response returns None immediately."""
events = [make_event()]
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
return_value=httpx.Response(403, json={"error": "Forbidden"})
)
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is None
@respx.mock
def test_network_error_retries(self, fast_config: TelemetryConfig) -> None:
"""Network errors trigger retry with backoff."""
events = [make_event()]
response_json = make_batch_response_json(events)
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
route.side_effect = [
httpx.ConnectError("Connection refused"),
httpx.Response(202, json=response_json),
]
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is not None
assert result.accepted == 1
@respx.mock
def test_all_retries_exhausted(self, fast_config: TelemetryConfig) -> None:
"""When all retries fail, returns None."""
events = [make_event()]
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
side_effect=httpx.ConnectError("Connection refused")
)
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is None
def test_dry_run_mode(self, fast_config: TelemetryConfig) -> None:
"""Dry run mode logs but doesn't send."""
fast_config.dry_run = True
events = [make_event() for _ in range(5)]
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is not None
assert result.accepted == 5
assert result.rejected == 0
@respx.mock
def test_500_error_retries(self, fast_config: TelemetryConfig) -> None:
"""Server errors (500) trigger retries."""
events = [make_event()]
response_json = make_batch_response_json(events)
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
route.side_effect = [
httpx.Response(500, text="Internal Server Error"),
httpx.Response(202, json=response_json),
]
with httpx.Client() as client:
result = submit_batch_sync(client, fast_config, events)
assert result is not None
assert result.accepted == 1
class TestSubmitBatchAsync:
"""Tests for asynchronous batch submission."""
@respx.mock
async def test_successful_submission(self, fast_config: TelemetryConfig) -> None:
"""Successful 202 response returns BatchEventResponse."""
events = [make_event() for _ in range(3)]
response_json = make_batch_response_json(events)
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
return_value=httpx.Response(202, json=response_json)
)
async with httpx.AsyncClient() as client:
result = await submit_batch_async(client, fast_config, events)
assert result is not None
assert result.accepted == 3
@respx.mock
async def test_429_with_retry_after(self, fast_config: TelemetryConfig) -> None:
"""429 response respects Retry-After and retries asynchronously."""
events = [make_event()]
response_json = make_batch_response_json(events)
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
route.side_effect = [
httpx.Response(429, headers={"Retry-After": "0.1"}),
httpx.Response(202, json=response_json),
]
async with httpx.AsyncClient() as client:
result = await submit_batch_async(client, fast_config, events)
assert result is not None
assert result.accepted == 1
@respx.mock
async def test_403_returns_none(self, fast_config: TelemetryConfig) -> None:
"""403 returns None immediately."""
events = [make_event()]
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
return_value=httpx.Response(403, json={"error": "Forbidden"})
)
async with httpx.AsyncClient() as client:
result = await submit_batch_async(client, fast_config, events)
assert result is None
async def test_dry_run_mode(self, fast_config: TelemetryConfig) -> None:
"""Dry run mode returns mock response without HTTP."""
fast_config.dry_run = True
events = [make_event() for _ in range(3)]
async with httpx.AsyncClient() as client:
result = await submit_batch_async(client, fast_config, events)
assert result is not None
assert result.accepted == 3