This repository has been archived on 2026-03-28. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
bootstrap/tests/test_event_watcher.py

100 lines
3.2 KiB
Python

from __future__ import annotations
import json
import unittest
from tests.conftest import append_ndjson, make_temp_paths, sample_events, write_ndjson
from event_watcher import EventWatcher
class EventWatcherTests(unittest.TestCase):
def setUp(self) -> None:
self.tempdir, self.paths = make_temp_paths()
self.events = sample_events()
def tearDown(self) -> None:
self.tempdir.cleanup()
def watcher(self) -> EventWatcher:
return EventWatcher(self.paths.events_path, self.paths.cursor_path, poll_interval=0.1)
def test_poll_empty_file(self) -> None:
watcher = self.watcher()
self.assertEqual(watcher.poll_once(), [])
self.assertFalse(self.paths.cursor_path.exists())
def test_poll_new_events(self) -> None:
write_ndjson(self.paths.events_path, self.events[:3])
polled = self.watcher().poll_once()
self.assertEqual(polled, self.events[:3])
def test_cursor_persistence(self) -> None:
watcher = self.watcher()
write_ndjson(self.paths.events_path, self.events[:3])
first = watcher.poll_once()
second = watcher.poll_once()
self.assertEqual(first, self.events[:3])
self.assertEqual(second, [])
cursor = json.loads(self.paths.cursor_path.read_text(encoding="utf-8"))
self.assertGreater(cursor["position"], 0)
def test_cursor_survives_restart(self) -> None:
write_ndjson(self.paths.events_path, self.events[:3])
first_watcher = self.watcher()
self.assertEqual(first_watcher.poll_once(), self.events[:3])
second_watcher = self.watcher()
self.assertEqual(second_watcher.poll_once(), [])
def test_corrupt_line_skipped(self) -> None:
self.paths.events_path.parent.mkdir(parents=True, exist_ok=True)
with self.paths.events_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(self.events[0]) + "\n")
handle.write("{not-json}\n")
handle.write(json.dumps(self.events[1]) + "\n")
polled = self.watcher().poll_once()
self.assertEqual(polled, [self.events[0], self.events[1]])
def test_callback_filtering(self) -> None:
write_ndjson(self.paths.events_path, self.events)
received: list[dict[str, object]] = []
watcher = self.watcher()
watcher.on(["task.completed"], received.append)
watcher.poll_once()
self.assertEqual(received, [self.events[2]])
def test_callback_receives_events(self) -> None:
write_ndjson(self.paths.events_path, self.events[:2])
received: list[dict[str, object]] = []
watcher = self.watcher()
watcher.on([], received.append)
polled = watcher.poll_once()
self.assertEqual(received, self.events[:2])
self.assertEqual(polled, self.events[:2])
def test_file_grows_between_polls(self) -> None:
watcher = self.watcher()
write_ndjson(self.paths.events_path, self.events[:2])
first = watcher.poll_once()
append_ndjson(self.paths.events_path, self.events[2:5])
second = watcher.poll_once()
self.assertEqual(first, self.events[:2])
self.assertEqual(second, self.events[2:5])
if __name__ == "__main__":
unittest.main()