208 lines
6.8 KiB
Python
208 lines
6.8 KiB
Python
"""Tests for batch submission logic."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import httpx
|
|
import pytest
|
|
import respx
|
|
|
|
from mosaicstack_telemetry.config import TelemetryConfig
|
|
from mosaicstack_telemetry.submitter import submit_batch_async, submit_batch_sync
|
|
from tests.conftest import (
|
|
TEST_API_KEY,
|
|
TEST_INSTANCE_ID,
|
|
TEST_SERVER_URL,
|
|
make_batch_response_json,
|
|
make_event,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def fast_config() -> TelemetryConfig:
|
|
"""Config with minimal retries and timeouts for fast tests."""
|
|
return TelemetryConfig(
|
|
server_url=TEST_SERVER_URL,
|
|
api_key=TEST_API_KEY,
|
|
instance_id=TEST_INSTANCE_ID,
|
|
max_retries=1,
|
|
request_timeout_seconds=2.0,
|
|
)
|
|
|
|
|
|
class TestSubmitBatchSync:
|
|
"""Tests for synchronous batch submission."""
|
|
|
|
@respx.mock
|
|
def test_successful_submission(self, fast_config: TelemetryConfig) -> None:
|
|
"""Successful 202 response returns BatchEventResponse."""
|
|
events = [make_event() for _ in range(3)]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(202, json=response_json)
|
|
)
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 3
|
|
assert result.rejected == 0
|
|
|
|
@respx.mock
|
|
def test_429_with_retry_after(self, fast_config: TelemetryConfig) -> None:
|
|
"""429 response respects Retry-After header and retries."""
|
|
events = [make_event()]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
|
|
route.side_effect = [
|
|
httpx.Response(429, headers={"Retry-After": "0.1"}),
|
|
httpx.Response(202, json=response_json),
|
|
]
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 1
|
|
|
|
@respx.mock
|
|
def test_403_returns_none(self, fast_config: TelemetryConfig) -> None:
|
|
"""403 response returns None immediately."""
|
|
events = [make_event()]
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(403, json={"error": "Forbidden"})
|
|
)
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is None
|
|
|
|
@respx.mock
|
|
def test_network_error_retries(self, fast_config: TelemetryConfig) -> None:
|
|
"""Network errors trigger retry with backoff."""
|
|
events = [make_event()]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
|
|
route.side_effect = [
|
|
httpx.ConnectError("Connection refused"),
|
|
httpx.Response(202, json=response_json),
|
|
]
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 1
|
|
|
|
@respx.mock
|
|
def test_all_retries_exhausted(self, fast_config: TelemetryConfig) -> None:
|
|
"""When all retries fail, returns None."""
|
|
events = [make_event()]
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
side_effect=httpx.ConnectError("Connection refused")
|
|
)
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is None
|
|
|
|
def test_dry_run_mode(self, fast_config: TelemetryConfig) -> None:
|
|
"""Dry run mode logs but doesn't send."""
|
|
fast_config.dry_run = True
|
|
events = [make_event() for _ in range(5)]
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 5
|
|
assert result.rejected == 0
|
|
|
|
@respx.mock
|
|
def test_500_error_retries(self, fast_config: TelemetryConfig) -> None:
|
|
"""Server errors (500) trigger retries."""
|
|
events = [make_event()]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
|
|
route.side_effect = [
|
|
httpx.Response(500, text="Internal Server Error"),
|
|
httpx.Response(202, json=response_json),
|
|
]
|
|
|
|
with httpx.Client() as client:
|
|
result = submit_batch_sync(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 1
|
|
|
|
|
|
class TestSubmitBatchAsync:
|
|
"""Tests for asynchronous batch submission."""
|
|
|
|
@respx.mock
|
|
async def test_successful_submission(self, fast_config: TelemetryConfig) -> None:
|
|
"""Successful 202 response returns BatchEventResponse."""
|
|
events = [make_event() for _ in range(3)]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(202, json=response_json)
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
result = await submit_batch_async(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 3
|
|
|
|
@respx.mock
|
|
async def test_429_with_retry_after(self, fast_config: TelemetryConfig) -> None:
|
|
"""429 response respects Retry-After and retries asynchronously."""
|
|
events = [make_event()]
|
|
response_json = make_batch_response_json(events)
|
|
|
|
route = respx.post(f"{TEST_SERVER_URL}/v1/events/batch")
|
|
route.side_effect = [
|
|
httpx.Response(429, headers={"Retry-After": "0.1"}),
|
|
httpx.Response(202, json=response_json),
|
|
]
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
result = await submit_batch_async(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 1
|
|
|
|
@respx.mock
|
|
async def test_403_returns_none(self, fast_config: TelemetryConfig) -> None:
|
|
"""403 returns None immediately."""
|
|
events = [make_event()]
|
|
|
|
respx.post(f"{TEST_SERVER_URL}/v1/events/batch").mock(
|
|
return_value=httpx.Response(403, json={"error": "Forbidden"})
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
result = await submit_batch_async(client, fast_config, events)
|
|
|
|
assert result is None
|
|
|
|
async def test_dry_run_mode(self, fast_config: TelemetryConfig) -> None:
|
|
"""Dry run mode returns mock response without HTTP."""
|
|
fast_config.dry_run = True
|
|
events = [make_event() for _ in range(3)]
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
result = await submit_batch_async(client, fast_config, events)
|
|
|
|
assert result is not None
|
|
assert result.accepted == 3
|