From 72321f5fcdfe43ece68959bc2bf683f7a6f3d357 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 17:55:48 -0600 Subject: [PATCH] feat(#159): Implement queue manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements QueueManager with full dependency tracking, persistence, and status management. Key features: - QueueItem dataclass with status, metadata, and ready flag - QueueManager with enqueue, dequeue, get_next_ready, mark_complete - Dependency resolution (blocked_by → not ready) - JSON persistence with auto-save on state changes - Automatic reload on startup - Graceful handling of circular dependencies - Status transitions (pending → in_progress → completed) Test coverage: - 26 comprehensive tests covering all operations - Dependency chain resolution - Persistence and reload scenarios - Edge cases (circular deps, missing items) - 100% code coverage on queue module - 97% total project coverage Quality gates passed: ✓ All tests passing (88 total) ✓ Type checking (mypy) passing ✓ Linting (ruff) passing ✓ Coverage ≥85% (97% achieved) This unblocks #160 (orchestrator needs queue). Co-Authored-By: Claude Sonnet 4.5 --- apps/coordinator/src/queue.py | 234 +++++++++++++ apps/coordinator/tests/test_queue.py | 476 +++++++++++++++++++++++++++ 2 files changed, 710 insertions(+) create mode 100644 apps/coordinator/src/queue.py create mode 100644 apps/coordinator/tests/test_queue.py diff --git a/apps/coordinator/src/queue.py b/apps/coordinator/src/queue.py new file mode 100644 index 0000000..6634a50 --- /dev/null +++ b/apps/coordinator/src/queue.py @@ -0,0 +1,234 @@ +"""Queue manager for issue coordination.""" + +import json +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any + +from src.models import IssueMetadata + + +class QueueItemStatus(str, Enum): + """Status of a queue item.""" + + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + + +@dataclass +class QueueItem: + """Represents an issue in the queue.""" + + issue_number: int + metadata: IssueMetadata + status: QueueItemStatus = QueueItemStatus.PENDING + ready: bool = field(default=False) + + def __post_init__(self) -> None: + """Update ready status after initialization.""" + # Item is ready if it has no blockers (or all blockers are completed) + self.ready = len(self.metadata.blocked_by) == 0 + + def to_dict(self) -> dict[str, Any]: + """Convert queue item to dictionary for JSON serialization. + + Returns: + Dictionary representation of queue item + """ + return { + "issue_number": self.issue_number, + "status": self.status.value, + "ready": self.ready, + "metadata": self.metadata.model_dump(), + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "QueueItem": + """Create queue item from dictionary. + + Args: + data: Dictionary with queue item data + + Returns: + QueueItem instance + """ + return cls( + issue_number=data["issue_number"], + status=QueueItemStatus(data["status"]), + ready=data["ready"], + metadata=IssueMetadata(**data["metadata"]), + ) + + +class QueueManager: + """Manages the queue of issues to be processed.""" + + def __init__(self, queue_file: Path | None = None) -> None: + """Initialize queue manager. + + Args: + queue_file: Path to JSON file for persistence. If None, uses default. + """ + self.queue_file = queue_file or Path("queue.json") + self._items: dict[int, QueueItem] = {} + self._load() + + def enqueue(self, issue_number: int, metadata: IssueMetadata) -> None: + """Add an issue to the queue. + + Args: + issue_number: Issue number + metadata: Parsed issue metadata + """ + item = QueueItem( + issue_number=issue_number, + metadata=metadata, + ) + self._items[issue_number] = item + self._update_ready_status() + self.save() + + def dequeue(self, issue_number: int) -> None: + """Remove an issue from the queue. + + Args: + issue_number: Issue number to remove + """ + if issue_number in self._items: + del self._items[issue_number] + self._update_ready_status() + self.save() + + def get_next_ready(self) -> QueueItem | None: + """Get the next ready item from the queue. + + Returns: + Next ready QueueItem, or None if no items are ready + """ + ready_items = [ + item + for item in self._items.values() + if item.ready and item.status == QueueItemStatus.PENDING + ] + + if not ready_items: + # If no items are ready but items exist, check for circular dependencies + # In that case, return the first pending item to break the cycle + pending_items = [ + item for item in self._items.values() if item.status == QueueItemStatus.PENDING + ] + if pending_items: + return pending_items[0] + return None + + # Return first ready item (sorted by issue number for determinism) + ready_items.sort(key=lambda x: x.issue_number) + return ready_items[0] + + def mark_complete(self, issue_number: int) -> None: + """Mark an issue as completed. + + Args: + issue_number: Issue number to mark as complete + """ + if issue_number in self._items: + self._items[issue_number].status = QueueItemStatus.COMPLETED + self._update_ready_status() + self.save() + + def mark_in_progress(self, issue_number: int) -> None: + """Mark an issue as in progress. + + Args: + issue_number: Issue number to mark as in progress + """ + if issue_number in self._items: + self._items[issue_number].status = QueueItemStatus.IN_PROGRESS + self.save() + + def get_item(self, issue_number: int) -> QueueItem | None: + """Get a specific queue item. + + Args: + issue_number: Issue number + + Returns: + QueueItem if found, None otherwise + """ + return self._items.get(issue_number) + + def list_all(self) -> list[QueueItem]: + """Get all items in the queue. + + Returns: + List of all queue items + """ + return list(self._items.values()) + + def list_ready(self) -> list[QueueItem]: + """Get all ready items in the queue. + + Returns: + List of ready queue items + """ + return [item for item in self._items.values() if item.ready] + + def size(self) -> int: + """Get the number of items in the queue. + + Returns: + Number of items in queue + """ + return len(self._items) + + def _update_ready_status(self) -> None: + """Update ready status for all items based on dependencies. + + An item is ready if all its blockers are completed. + """ + # Get all completed issue numbers + completed_issues = { + issue_num + for issue_num, item in self._items.items() + if item.status == QueueItemStatus.COMPLETED + } + + # Update ready status for each item + for item in self._items.values(): + # Item is ready if it has no blockers or all blockers are completed + if not item.metadata.blocked_by: + item.ready = True + else: + # Check if all blockers are completed (they must be in the queue and completed) + blockers_satisfied = all( + blocker in completed_issues for blocker in item.metadata.blocked_by + ) + item.ready = blockers_satisfied + + def save(self) -> None: + """Persist queue to disk as JSON.""" + queue_data = {"items": [item.to_dict() for item in self._items.values()]} + + with open(self.queue_file, "w") as f: + json.dump(queue_data, f, indent=2) + + def _load(self) -> None: + """Load queue from disk if it exists.""" + if not self.queue_file.exists(): + return + + try: + with open(self.queue_file) as f: + data = json.load(f) + + for item_data in data.get("items", []): + item = QueueItem.from_dict(item_data) + self._items[item.issue_number] = item + + # Update ready status after loading + self._update_ready_status() + except (json.JSONDecodeError, KeyError, ValueError): + # If file is corrupted, start with empty queue + self._items = {} diff --git a/apps/coordinator/tests/test_queue.py b/apps/coordinator/tests/test_queue.py new file mode 100644 index 0000000..161eb73 --- /dev/null +++ b/apps/coordinator/tests/test_queue.py @@ -0,0 +1,476 @@ +"""Tests for queue manager.""" + +import json +import tempfile +from collections.abc import Generator +from pathlib import Path + +import pytest + +from src.models import IssueMetadata +from src.queue import QueueItem, QueueItemStatus, QueueManager + + +class TestQueueItem: + """Tests for QueueItem dataclass.""" + + def test_queue_item_creation(self) -> None: + """Test creating a queue item with all fields.""" + metadata = IssueMetadata( + estimated_context=50000, + difficulty="medium", + assigned_agent="sonnet", + blocks=[161, 162], + blocked_by=[158], + ) + item = QueueItem( + issue_number=159, + metadata=metadata, + status=QueueItemStatus.PENDING, + ) + + assert item.issue_number == 159 + assert item.metadata == metadata + assert item.status == QueueItemStatus.PENDING + assert item.ready is False # Should not be ready (blocked_by exists) + + def test_queue_item_defaults(self) -> None: + """Test queue item with default values.""" + metadata = IssueMetadata() + item = QueueItem( + issue_number=160, + metadata=metadata, + ) + + assert item.issue_number == 160 + assert item.status == QueueItemStatus.PENDING + assert item.ready is True # Should be ready (no blockers) + + def test_queue_item_serialization(self) -> None: + """Test converting queue item to dict for JSON serialization.""" + metadata = IssueMetadata( + estimated_context=30000, + difficulty="easy", + assigned_agent="haiku", + blocks=[165], + blocked_by=[], + ) + item = QueueItem( + issue_number=164, + metadata=metadata, + status=QueueItemStatus.IN_PROGRESS, + ready=True, + ) + + data = item.to_dict() + + assert data["issue_number"] == 164 + assert data["status"] == "in_progress" + assert data["ready"] is True + assert data["metadata"]["estimated_context"] == 30000 + assert data["metadata"]["difficulty"] == "easy" + + def test_queue_item_deserialization(self) -> None: + """Test creating queue item from dict.""" + data = { + "issue_number": 161, + "status": "completed", + "ready": False, + "metadata": { + "estimated_context": 75000, + "difficulty": "hard", + "assigned_agent": "opus", + "blocks": [166, 167], + "blocked_by": [159], + }, + } + + item = QueueItem.from_dict(data) + + assert item.issue_number == 161 + assert item.status == QueueItemStatus.COMPLETED + assert item.ready is False + assert item.metadata.estimated_context == 75000 + assert item.metadata.difficulty == "hard" + assert item.metadata.assigned_agent == "opus" + assert item.metadata.blocks == [166, 167] + assert item.metadata.blocked_by == [159] + + +class TestQueueManager: + """Tests for QueueManager.""" + + @pytest.fixture + def temp_queue_file(self) -> Generator[Path, None, None]: + """Create a temporary file for queue persistence.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f: + temp_path = Path(f.name) + yield temp_path + # Cleanup + if temp_path.exists(): + temp_path.unlink() + + @pytest.fixture + def queue_manager(self, temp_queue_file: Path) -> QueueManager: + """Create a queue manager with temporary storage.""" + return QueueManager(queue_file=temp_queue_file) + + def test_enqueue_single_item(self, queue_manager: QueueManager) -> None: + """Test enqueuing a single item.""" + metadata = IssueMetadata( + estimated_context=40000, + difficulty="medium", + assigned_agent="sonnet", + blocks=[], + blocked_by=[], + ) + + queue_manager.enqueue(159, metadata) + + assert queue_manager.size() == 1 + item = queue_manager.get_item(159) + assert item is not None + assert item.issue_number == 159 + assert item.status == QueueItemStatus.PENDING + assert item.ready is True + + def test_enqueue_multiple_items(self, queue_manager: QueueManager) -> None: + """Test enqueuing multiple items.""" + meta1 = IssueMetadata(assigned_agent="sonnet") + meta2 = IssueMetadata(assigned_agent="haiku") + meta3 = IssueMetadata(assigned_agent="glm") + + queue_manager.enqueue(159, meta1) + queue_manager.enqueue(160, meta2) + queue_manager.enqueue(161, meta3) + + assert queue_manager.size() == 3 + + def test_dequeue_item(self, queue_manager: QueueManager) -> None: + """Test removing an item from the queue.""" + metadata = IssueMetadata() + queue_manager.enqueue(159, metadata) + + assert queue_manager.size() == 1 + queue_manager.dequeue(159) + assert queue_manager.size() == 0 + assert queue_manager.get_item(159) is None + + def test_dequeue_nonexistent_item(self, queue_manager: QueueManager) -> None: + """Test dequeuing an item that doesn't exist.""" + # Should not raise error, just be a no-op + queue_manager.dequeue(999) + assert queue_manager.size() == 0 + + def test_get_next_ready_simple(self, queue_manager: QueueManager) -> None: + """Test getting next ready item with no dependencies.""" + meta1 = IssueMetadata(assigned_agent="sonnet") + meta2 = IssueMetadata(assigned_agent="haiku") + + queue_manager.enqueue(159, meta1) + queue_manager.enqueue(160, meta2) + + next_item = queue_manager.get_next_ready() + assert next_item is not None + # Should return first item (159) since both are ready + assert next_item.issue_number == 159 + + def test_get_next_ready_with_dependencies(self, queue_manager: QueueManager) -> None: + """Test getting next ready item with dependency chain.""" + # Issue 160 blocks 161, 158 blocks 159 + meta_158 = IssueMetadata(blocks=[159], blocked_by=[]) + meta_159 = IssueMetadata(blocks=[161], blocked_by=[158]) + meta_160 = IssueMetadata(blocks=[161], blocked_by=[]) + meta_161 = IssueMetadata(blocks=[], blocked_by=[159, 160]) + + queue_manager.enqueue(158, meta_158) + queue_manager.enqueue(159, meta_159) + queue_manager.enqueue(160, meta_160) + queue_manager.enqueue(161, meta_161) + + # Should get 158 or 160 (both ready, no blockers) + next_item = queue_manager.get_next_ready() + assert next_item is not None + assert next_item.issue_number in [158, 160] + assert next_item.ready is True + + def test_get_next_ready_empty_queue(self, queue_manager: QueueManager) -> None: + """Test getting next ready item from empty queue.""" + next_item = queue_manager.get_next_ready() + assert next_item is None + + def test_get_next_ready_all_blocked(self, queue_manager: QueueManager) -> None: + """Test getting next ready when all items are blocked.""" + # Circular dependency: 159 blocks 160, 160 blocks 159 + meta_159 = IssueMetadata(blocks=[160], blocked_by=[160]) + meta_160 = IssueMetadata(blocks=[159], blocked_by=[159]) + + queue_manager.enqueue(159, meta_159) + queue_manager.enqueue(160, meta_160) + + next_item = queue_manager.get_next_ready() + # Should still return one (circular dependencies handled) + assert next_item is not None + + def test_mark_complete(self, queue_manager: QueueManager) -> None: + """Test marking an item as complete.""" + metadata = IssueMetadata() + queue_manager.enqueue(159, metadata) + + queue_manager.mark_complete(159) + + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED + + def test_mark_complete_unblocks_dependents(self, queue_manager: QueueManager) -> None: + """Test that completing an item unblocks dependent items.""" + # 158 blocks 159 + meta_158 = IssueMetadata(blocks=[159], blocked_by=[]) + meta_159 = IssueMetadata(blocks=[], blocked_by=[158]) + + queue_manager.enqueue(158, meta_158) + queue_manager.enqueue(159, meta_159) + + # Initially, 159 should not be ready + item_159 = queue_manager.get_item(159) + assert item_159 is not None + assert item_159.ready is False + + # Complete 158 + queue_manager.mark_complete(158) + + # Now 159 should be ready + item_159_updated = queue_manager.get_item(159) + assert item_159_updated is not None + assert item_159_updated.ready is True + + def test_mark_complete_nonexistent_item(self, queue_manager: QueueManager) -> None: + """Test marking nonexistent item as complete.""" + # Should not raise error, just be a no-op + queue_manager.mark_complete(999) + + def test_update_ready_status(self, queue_manager: QueueManager) -> None: + """Test updating ready status for all items.""" + # Complex dependency chain + meta_158 = IssueMetadata(blocks=[159], blocked_by=[]) + meta_159 = IssueMetadata(blocks=[160, 161], blocked_by=[158]) + meta_160 = IssueMetadata(blocks=[], blocked_by=[159]) + meta_161 = IssueMetadata(blocks=[], blocked_by=[159]) + + queue_manager.enqueue(158, meta_158) + queue_manager.enqueue(159, meta_159) + queue_manager.enqueue(160, meta_160) + queue_manager.enqueue(161, meta_161) + + # Initially: 158 ready, others blocked + item_158 = queue_manager.get_item(158) + item_159 = queue_manager.get_item(159) + item_160 = queue_manager.get_item(160) + item_161 = queue_manager.get_item(161) + assert item_158 is not None + assert item_159 is not None + assert item_160 is not None + assert item_161 is not None + assert item_158.ready is True + assert item_159.ready is False + assert item_160.ready is False + assert item_161.ready is False + + # Complete 158 + queue_manager.mark_complete(158) + + # Now: 159 ready, 160 and 161 still blocked + item_159_updated = queue_manager.get_item(159) + item_160_updated = queue_manager.get_item(160) + item_161_updated = queue_manager.get_item(161) + assert item_159_updated is not None + assert item_160_updated is not None + assert item_161_updated is not None + assert item_159_updated.ready is True + assert item_160_updated.ready is False + assert item_161_updated.ready is False + + def test_persistence_save(self, queue_manager: QueueManager, temp_queue_file: Path) -> None: + """Test saving queue to disk.""" + metadata = IssueMetadata( + estimated_context=50000, + difficulty="medium", + assigned_agent="sonnet", + blocks=[161], + blocked_by=[158], + ) + + queue_manager.enqueue(159, metadata) + queue_manager.save() + + assert temp_queue_file.exists() + + # Verify JSON structure + with open(temp_queue_file) as f: + data = json.load(f) + + assert "items" in data + assert len(data["items"]) == 1 + assert data["items"][0]["issue_number"] == 159 + + def test_persistence_load(self, temp_queue_file: Path) -> None: + """Test loading queue from disk.""" + # Create test data + queue_data = { + "items": [ + { + "issue_number": 159, + "status": "pending", + "ready": False, + "metadata": { + "estimated_context": 50000, + "difficulty": "medium", + "assigned_agent": "sonnet", + "blocks": [161], + "blocked_by": [158], + }, + }, + { + "issue_number": 160, + "status": "in_progress", + "ready": True, + "metadata": { + "estimated_context": 30000, + "difficulty": "easy", + "assigned_agent": "haiku", + "blocks": [], + "blocked_by": [], + }, + }, + ] + } + + with open(temp_queue_file, "w") as f: + json.dump(queue_data, f) + + # Load queue + queue_manager = QueueManager(queue_file=temp_queue_file) + + assert queue_manager.size() == 2 + + item_159 = queue_manager.get_item(159) + assert item_159 is not None + assert item_159.status == QueueItemStatus.PENDING + assert item_159.ready is False + + item_160 = queue_manager.get_item(160) + assert item_160 is not None + assert item_160.status == QueueItemStatus.IN_PROGRESS + assert item_160.ready is True + + def test_persistence_load_nonexistent_file(self, temp_queue_file: Path) -> None: + """Test loading from nonexistent file creates empty queue.""" + # Don't create the file + temp_queue_file.unlink(missing_ok=True) + + queue_manager = QueueManager(queue_file=temp_queue_file) + + assert queue_manager.size() == 0 + + def test_persistence_autosave_on_enqueue( + self, queue_manager: QueueManager, temp_queue_file: Path + ) -> None: + """Test that enqueue automatically saves to disk.""" + metadata = IssueMetadata() + queue_manager.enqueue(159, metadata) + + # Should auto-save + assert temp_queue_file.exists() + + # Load in new manager to verify + new_manager = QueueManager(queue_file=temp_queue_file) + assert new_manager.size() == 1 + + def test_persistence_autosave_on_mark_complete( + self, queue_manager: QueueManager, temp_queue_file: Path + ) -> None: + """Test that mark_complete automatically saves to disk.""" + metadata = IssueMetadata() + queue_manager.enqueue(159, metadata) + queue_manager.mark_complete(159) + + # Load in new manager to verify + new_manager = QueueManager(queue_file=temp_queue_file) + item = new_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED + + def test_circular_dependency_detection(self, queue_manager: QueueManager) -> None: + """Test handling of circular dependencies.""" + # Create circular dependency: 159 -> 160 -> 161 -> 159 + meta_159 = IssueMetadata(blocks=[160], blocked_by=[161]) + meta_160 = IssueMetadata(blocks=[161], blocked_by=[159]) + meta_161 = IssueMetadata(blocks=[159], blocked_by=[160]) + + queue_manager.enqueue(159, meta_159) + queue_manager.enqueue(160, meta_160) + queue_manager.enqueue(161, meta_161) + + # Should still be able to get next ready (break the cycle gracefully) + next_item = queue_manager.get_next_ready() + assert next_item is not None + + def test_list_all_items(self, queue_manager: QueueManager) -> None: + """Test listing all items in queue.""" + meta1 = IssueMetadata(assigned_agent="sonnet") + meta2 = IssueMetadata(assigned_agent="haiku") + meta3 = IssueMetadata(assigned_agent="glm") + + queue_manager.enqueue(159, meta1) + queue_manager.enqueue(160, meta2) + queue_manager.enqueue(161, meta3) + + all_items = queue_manager.list_all() + assert len(all_items) == 3 + issue_numbers = [item.issue_number for item in all_items] + assert 159 in issue_numbers + assert 160 in issue_numbers + assert 161 in issue_numbers + + def test_list_ready_items(self, queue_manager: QueueManager) -> None: + """Test listing only ready items.""" + meta_ready = IssueMetadata(blocked_by=[]) + meta_blocked = IssueMetadata(blocked_by=[158]) + + queue_manager.enqueue(159, meta_ready) + queue_manager.enqueue(160, meta_ready) + queue_manager.enqueue(161, meta_blocked) + + ready_items = queue_manager.list_ready() + assert len(ready_items) == 2 + issue_numbers = [item.issue_number for item in ready_items] + assert 159 in issue_numbers + assert 160 in issue_numbers + assert 161 not in issue_numbers + + def test_get_item_nonexistent(self, queue_manager: QueueManager) -> None: + """Test getting an item that doesn't exist.""" + item = queue_manager.get_item(999) + assert item is None + + def test_status_transitions(self, queue_manager: QueueManager) -> None: + """Test valid status transitions.""" + metadata = IssueMetadata() + queue_manager.enqueue(159, metadata) + + # PENDING -> IN_PROGRESS + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.PENDING + + queue_manager.mark_in_progress(159) + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.IN_PROGRESS + + # IN_PROGRESS -> COMPLETED + queue_manager.mark_complete(159) + item = queue_manager.get_item(159) + assert item is not None + assert item.status == QueueItemStatus.COMPLETED