"""Tests for EventQueue.""" from __future__ import annotations import threading from mosaicstack_telemetry.queue import EventQueue from tests.conftest import make_event class TestEventQueue: """Tests for the bounded thread-safe event queue.""" def test_put_and_drain(self) -> None: """Events can be put in and drained out in FIFO order.""" queue = EventQueue(max_size=10) e1 = make_event() e2 = make_event() queue.put(e1) queue.put(e2) drained = queue.drain(10) assert len(drained) == 2 assert drained[0].event_id == e1.event_id assert drained[1].event_id == e2.event_id def test_drain_max_items(self) -> None: """Drain respects the max_items limit.""" queue = EventQueue(max_size=10) for _ in range(5): queue.put(make_event()) drained = queue.drain(3) assert len(drained) == 3 assert queue.size == 2 def test_drain_empty_queue(self) -> None: """Draining an empty queue returns empty list.""" queue = EventQueue(max_size=10) drained = queue.drain(5) assert drained == [] def test_bounded_fifo_eviction(self) -> None: """When queue is full, oldest events are evicted.""" queue = EventQueue(max_size=3) events = [make_event() for _ in range(5)] for e in events: queue.put(e) assert queue.size == 3 drained = queue.drain(3) # Should have the last 3 events (oldest 2 were evicted) assert drained[0].event_id == events[2].event_id assert drained[1].event_id == events[3].event_id assert drained[2].event_id == events[4].event_id def test_size_property(self) -> None: """Size property reflects current queue length.""" queue = EventQueue(max_size=10) assert queue.size == 0 queue.put(make_event()) assert queue.size == 1 queue.put(make_event()) assert queue.size == 2 queue.drain(1) assert queue.size == 1 def test_is_empty_property(self) -> None: """is_empty property works correctly.""" queue = EventQueue(max_size=10) assert queue.is_empty is True queue.put(make_event()) assert queue.is_empty is False queue.drain(1) assert queue.is_empty is True def test_put_back(self) -> None: """put_back re-adds events to the front of the queue.""" queue = EventQueue(max_size=10) e1 = make_event() e2 = make_event() queue.put(e1) queue.put_back([e2]) drained = queue.drain(2) # e2 should be first (put_back adds to front) assert drained[0].event_id == e2.event_id assert drained[1].event_id == e1.event_id def test_put_back_respects_max_size(self) -> None: """put_back doesn't exceed max_size.""" queue = EventQueue(max_size=3) for _ in range(3): queue.put(make_event()) events_to_add = [make_event() for _ in range(5)] queue.put_back(events_to_add) assert queue.size == 3 def test_thread_safety_concurrent_put_drain(self) -> None: """Queue handles concurrent put and drain operations.""" queue = EventQueue(max_size=1000) total_puts = 500 errors: list[Exception] = [] def put_events() -> None: try: for _ in range(total_puts): queue.put(make_event()) except Exception as e: errors.append(e) def drain_events() -> None: try: drained_count = 0 while drained_count < total_puts: batch = queue.drain(10) drained_count += len(batch) if not batch: threading.Event().wait(0.001) except Exception as e: errors.append(e) put_thread = threading.Thread(target=put_events) drain_thread = threading.Thread(target=drain_events) put_thread.start() drain_thread.start() put_thread.join(timeout=5) drain_thread.join(timeout=5) assert not errors, f"Thread errors: {errors}"