feat(#159): Implement queue manager

Implements QueueManager with full dependency tracking, persistence, and status management.

Key features:
- QueueItem dataclass with status, metadata, and ready flag
- QueueManager with enqueue, dequeue, get_next_ready, mark_complete
- Dependency resolution (blocked_by → not ready)
- JSON persistence with auto-save on state changes
- Automatic reload on startup
- Graceful handling of circular dependencies
- Status transitions (pending → in_progress → completed)

Test coverage:
- 26 comprehensive tests covering all operations
- Dependency chain resolution
- Persistence and reload scenarios
- Edge cases (circular deps, missing items)
- 100% code coverage on queue module
- 97% total project coverage

Quality gates passed:
✓ All tests passing (88 total)
✓ Type checking (mypy) passing
✓ Linting (ruff) passing
✓ Coverage ≥85% (97% achieved)

This unblocks #160 (orchestrator needs queue).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 17:55:48 -06:00
parent dad4b68f66
commit 72321f5fcd
2 changed files with 710 additions and 0 deletions

View File

@@ -0,0 +1,234 @@
"""Queue manager for issue coordination."""
import json
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
from src.models import IssueMetadata
class QueueItemStatus(str, Enum):
"""Status of a queue item."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
@dataclass
class QueueItem:
"""Represents an issue in the queue."""
issue_number: int
metadata: IssueMetadata
status: QueueItemStatus = QueueItemStatus.PENDING
ready: bool = field(default=False)
def __post_init__(self) -> None:
"""Update ready status after initialization."""
# Item is ready if it has no blockers (or all blockers are completed)
self.ready = len(self.metadata.blocked_by) == 0
def to_dict(self) -> dict[str, Any]:
"""Convert queue item to dictionary for JSON serialization.
Returns:
Dictionary representation of queue item
"""
return {
"issue_number": self.issue_number,
"status": self.status.value,
"ready": self.ready,
"metadata": self.metadata.model_dump(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "QueueItem":
"""Create queue item from dictionary.
Args:
data: Dictionary with queue item data
Returns:
QueueItem instance
"""
return cls(
issue_number=data["issue_number"],
status=QueueItemStatus(data["status"]),
ready=data["ready"],
metadata=IssueMetadata(**data["metadata"]),
)
class QueueManager:
"""Manages the queue of issues to be processed."""
def __init__(self, queue_file: Path | None = None) -> None:
"""Initialize queue manager.
Args:
queue_file: Path to JSON file for persistence. If None, uses default.
"""
self.queue_file = queue_file or Path("queue.json")
self._items: dict[int, QueueItem] = {}
self._load()
def enqueue(self, issue_number: int, metadata: IssueMetadata) -> None:
"""Add an issue to the queue.
Args:
issue_number: Issue number
metadata: Parsed issue metadata
"""
item = QueueItem(
issue_number=issue_number,
metadata=metadata,
)
self._items[issue_number] = item
self._update_ready_status()
self.save()
def dequeue(self, issue_number: int) -> None:
"""Remove an issue from the queue.
Args:
issue_number: Issue number to remove
"""
if issue_number in self._items:
del self._items[issue_number]
self._update_ready_status()
self.save()
def get_next_ready(self) -> QueueItem | None:
"""Get the next ready item from the queue.
Returns:
Next ready QueueItem, or None if no items are ready
"""
ready_items = [
item
for item in self._items.values()
if item.ready and item.status == QueueItemStatus.PENDING
]
if not ready_items:
# If no items are ready but items exist, check for circular dependencies
# In that case, return the first pending item to break the cycle
pending_items = [
item for item in self._items.values() if item.status == QueueItemStatus.PENDING
]
if pending_items:
return pending_items[0]
return None
# Return first ready item (sorted by issue number for determinism)
ready_items.sort(key=lambda x: x.issue_number)
return ready_items[0]
def mark_complete(self, issue_number: int) -> None:
"""Mark an issue as completed.
Args:
issue_number: Issue number to mark as complete
"""
if issue_number in self._items:
self._items[issue_number].status = QueueItemStatus.COMPLETED
self._update_ready_status()
self.save()
def mark_in_progress(self, issue_number: int) -> None:
"""Mark an issue as in progress.
Args:
issue_number: Issue number to mark as in progress
"""
if issue_number in self._items:
self._items[issue_number].status = QueueItemStatus.IN_PROGRESS
self.save()
def get_item(self, issue_number: int) -> QueueItem | None:
"""Get a specific queue item.
Args:
issue_number: Issue number
Returns:
QueueItem if found, None otherwise
"""
return self._items.get(issue_number)
def list_all(self) -> list[QueueItem]:
"""Get all items in the queue.
Returns:
List of all queue items
"""
return list(self._items.values())
def list_ready(self) -> list[QueueItem]:
"""Get all ready items in the queue.
Returns:
List of ready queue items
"""
return [item for item in self._items.values() if item.ready]
def size(self) -> int:
"""Get the number of items in the queue.
Returns:
Number of items in queue
"""
return len(self._items)
def _update_ready_status(self) -> None:
"""Update ready status for all items based on dependencies.
An item is ready if all its blockers are completed.
"""
# Get all completed issue numbers
completed_issues = {
issue_num
for issue_num, item in self._items.items()
if item.status == QueueItemStatus.COMPLETED
}
# Update ready status for each item
for item in self._items.values():
# Item is ready if it has no blockers or all blockers are completed
if not item.metadata.blocked_by:
item.ready = True
else:
# Check if all blockers are completed (they must be in the queue and completed)
blockers_satisfied = all(
blocker in completed_issues for blocker in item.metadata.blocked_by
)
item.ready = blockers_satisfied
def save(self) -> None:
"""Persist queue to disk as JSON."""
queue_data = {"items": [item.to_dict() for item in self._items.values()]}
with open(self.queue_file, "w") as f:
json.dump(queue_data, f, indent=2)
def _load(self) -> None:
"""Load queue from disk if it exists."""
if not self.queue_file.exists():
return
try:
with open(self.queue_file) as f:
data = json.load(f)
for item_data in data.get("items", []):
item = QueueItem.from_dict(item_data)
self._items[item.issue_number] = item
# Update ready status after loading
self._update_ready_status()
except (json.JSONDecodeError, KeyError, ValueError):
# If file is corrupted, start with empty queue
self._items = {}

View File

@@ -0,0 +1,476 @@
"""Tests for queue manager."""
import json
import tempfile
from collections.abc import Generator
from pathlib import Path
import pytest
from src.models import IssueMetadata
from src.queue import QueueItem, QueueItemStatus, QueueManager
class TestQueueItem:
"""Tests for QueueItem dataclass."""
def test_queue_item_creation(self) -> None:
"""Test creating a queue item with all fields."""
metadata = IssueMetadata(
estimated_context=50000,
difficulty="medium",
assigned_agent="sonnet",
blocks=[161, 162],
blocked_by=[158],
)
item = QueueItem(
issue_number=159,
metadata=metadata,
status=QueueItemStatus.PENDING,
)
assert item.issue_number == 159
assert item.metadata == metadata
assert item.status == QueueItemStatus.PENDING
assert item.ready is False # Should not be ready (blocked_by exists)
def test_queue_item_defaults(self) -> None:
"""Test queue item with default values."""
metadata = IssueMetadata()
item = QueueItem(
issue_number=160,
metadata=metadata,
)
assert item.issue_number == 160
assert item.status == QueueItemStatus.PENDING
assert item.ready is True # Should be ready (no blockers)
def test_queue_item_serialization(self) -> None:
"""Test converting queue item to dict for JSON serialization."""
metadata = IssueMetadata(
estimated_context=30000,
difficulty="easy",
assigned_agent="haiku",
blocks=[165],
blocked_by=[],
)
item = QueueItem(
issue_number=164,
metadata=metadata,
status=QueueItemStatus.IN_PROGRESS,
ready=True,
)
data = item.to_dict()
assert data["issue_number"] == 164
assert data["status"] == "in_progress"
assert data["ready"] is True
assert data["metadata"]["estimated_context"] == 30000
assert data["metadata"]["difficulty"] == "easy"
def test_queue_item_deserialization(self) -> None:
"""Test creating queue item from dict."""
data = {
"issue_number": 161,
"status": "completed",
"ready": False,
"metadata": {
"estimated_context": 75000,
"difficulty": "hard",
"assigned_agent": "opus",
"blocks": [166, 167],
"blocked_by": [159],
},
}
item = QueueItem.from_dict(data)
assert item.issue_number == 161
assert item.status == QueueItemStatus.COMPLETED
assert item.ready is False
assert item.metadata.estimated_context == 75000
assert item.metadata.difficulty == "hard"
assert item.metadata.assigned_agent == "opus"
assert item.metadata.blocks == [166, 167]
assert item.metadata.blocked_by == [159]
class TestQueueManager:
"""Tests for QueueManager."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
def test_enqueue_single_item(self, queue_manager: QueueManager) -> None:
"""Test enqueuing a single item."""
metadata = IssueMetadata(
estimated_context=40000,
difficulty="medium",
assigned_agent="sonnet",
blocks=[],
blocked_by=[],
)
queue_manager.enqueue(159, metadata)
assert queue_manager.size() == 1
item = queue_manager.get_item(159)
assert item is not None
assert item.issue_number == 159
assert item.status == QueueItemStatus.PENDING
assert item.ready is True
def test_enqueue_multiple_items(self, queue_manager: QueueManager) -> None:
"""Test enqueuing multiple items."""
meta1 = IssueMetadata(assigned_agent="sonnet")
meta2 = IssueMetadata(assigned_agent="haiku")
meta3 = IssueMetadata(assigned_agent="glm")
queue_manager.enqueue(159, meta1)
queue_manager.enqueue(160, meta2)
queue_manager.enqueue(161, meta3)
assert queue_manager.size() == 3
def test_dequeue_item(self, queue_manager: QueueManager) -> None:
"""Test removing an item from the queue."""
metadata = IssueMetadata()
queue_manager.enqueue(159, metadata)
assert queue_manager.size() == 1
queue_manager.dequeue(159)
assert queue_manager.size() == 0
assert queue_manager.get_item(159) is None
def test_dequeue_nonexistent_item(self, queue_manager: QueueManager) -> None:
"""Test dequeuing an item that doesn't exist."""
# Should not raise error, just be a no-op
queue_manager.dequeue(999)
assert queue_manager.size() == 0
def test_get_next_ready_simple(self, queue_manager: QueueManager) -> None:
"""Test getting next ready item with no dependencies."""
meta1 = IssueMetadata(assigned_agent="sonnet")
meta2 = IssueMetadata(assigned_agent="haiku")
queue_manager.enqueue(159, meta1)
queue_manager.enqueue(160, meta2)
next_item = queue_manager.get_next_ready()
assert next_item is not None
# Should return first item (159) since both are ready
assert next_item.issue_number == 159
def test_get_next_ready_with_dependencies(self, queue_manager: QueueManager) -> None:
"""Test getting next ready item with dependency chain."""
# Issue 160 blocks 161, 158 blocks 159
meta_158 = IssueMetadata(blocks=[159], blocked_by=[])
meta_159 = IssueMetadata(blocks=[161], blocked_by=[158])
meta_160 = IssueMetadata(blocks=[161], blocked_by=[])
meta_161 = IssueMetadata(blocks=[], blocked_by=[159, 160])
queue_manager.enqueue(158, meta_158)
queue_manager.enqueue(159, meta_159)
queue_manager.enqueue(160, meta_160)
queue_manager.enqueue(161, meta_161)
# Should get 158 or 160 (both ready, no blockers)
next_item = queue_manager.get_next_ready()
assert next_item is not None
assert next_item.issue_number in [158, 160]
assert next_item.ready is True
def test_get_next_ready_empty_queue(self, queue_manager: QueueManager) -> None:
"""Test getting next ready item from empty queue."""
next_item = queue_manager.get_next_ready()
assert next_item is None
def test_get_next_ready_all_blocked(self, queue_manager: QueueManager) -> None:
"""Test getting next ready when all items are blocked."""
# Circular dependency: 159 blocks 160, 160 blocks 159
meta_159 = IssueMetadata(blocks=[160], blocked_by=[160])
meta_160 = IssueMetadata(blocks=[159], blocked_by=[159])
queue_manager.enqueue(159, meta_159)
queue_manager.enqueue(160, meta_160)
next_item = queue_manager.get_next_ready()
# Should still return one (circular dependencies handled)
assert next_item is not None
def test_mark_complete(self, queue_manager: QueueManager) -> None:
"""Test marking an item as complete."""
metadata = IssueMetadata()
queue_manager.enqueue(159, metadata)
queue_manager.mark_complete(159)
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.COMPLETED
def test_mark_complete_unblocks_dependents(self, queue_manager: QueueManager) -> None:
"""Test that completing an item unblocks dependent items."""
# 158 blocks 159
meta_158 = IssueMetadata(blocks=[159], blocked_by=[])
meta_159 = IssueMetadata(blocks=[], blocked_by=[158])
queue_manager.enqueue(158, meta_158)
queue_manager.enqueue(159, meta_159)
# Initially, 159 should not be ready
item_159 = queue_manager.get_item(159)
assert item_159 is not None
assert item_159.ready is False
# Complete 158
queue_manager.mark_complete(158)
# Now 159 should be ready
item_159_updated = queue_manager.get_item(159)
assert item_159_updated is not None
assert item_159_updated.ready is True
def test_mark_complete_nonexistent_item(self, queue_manager: QueueManager) -> None:
"""Test marking nonexistent item as complete."""
# Should not raise error, just be a no-op
queue_manager.mark_complete(999)
def test_update_ready_status(self, queue_manager: QueueManager) -> None:
"""Test updating ready status for all items."""
# Complex dependency chain
meta_158 = IssueMetadata(blocks=[159], blocked_by=[])
meta_159 = IssueMetadata(blocks=[160, 161], blocked_by=[158])
meta_160 = IssueMetadata(blocks=[], blocked_by=[159])
meta_161 = IssueMetadata(blocks=[], blocked_by=[159])
queue_manager.enqueue(158, meta_158)
queue_manager.enqueue(159, meta_159)
queue_manager.enqueue(160, meta_160)
queue_manager.enqueue(161, meta_161)
# Initially: 158 ready, others blocked
item_158 = queue_manager.get_item(158)
item_159 = queue_manager.get_item(159)
item_160 = queue_manager.get_item(160)
item_161 = queue_manager.get_item(161)
assert item_158 is not None
assert item_159 is not None
assert item_160 is not None
assert item_161 is not None
assert item_158.ready is True
assert item_159.ready is False
assert item_160.ready is False
assert item_161.ready is False
# Complete 158
queue_manager.mark_complete(158)
# Now: 159 ready, 160 and 161 still blocked
item_159_updated = queue_manager.get_item(159)
item_160_updated = queue_manager.get_item(160)
item_161_updated = queue_manager.get_item(161)
assert item_159_updated is not None
assert item_160_updated is not None
assert item_161_updated is not None
assert item_159_updated.ready is True
assert item_160_updated.ready is False
assert item_161_updated.ready is False
def test_persistence_save(self, queue_manager: QueueManager, temp_queue_file: Path) -> None:
"""Test saving queue to disk."""
metadata = IssueMetadata(
estimated_context=50000,
difficulty="medium",
assigned_agent="sonnet",
blocks=[161],
blocked_by=[158],
)
queue_manager.enqueue(159, metadata)
queue_manager.save()
assert temp_queue_file.exists()
# Verify JSON structure
with open(temp_queue_file) as f:
data = json.load(f)
assert "items" in data
assert len(data["items"]) == 1
assert data["items"][0]["issue_number"] == 159
def test_persistence_load(self, temp_queue_file: Path) -> None:
"""Test loading queue from disk."""
# Create test data
queue_data = {
"items": [
{
"issue_number": 159,
"status": "pending",
"ready": False,
"metadata": {
"estimated_context": 50000,
"difficulty": "medium",
"assigned_agent": "sonnet",
"blocks": [161],
"blocked_by": [158],
},
},
{
"issue_number": 160,
"status": "in_progress",
"ready": True,
"metadata": {
"estimated_context": 30000,
"difficulty": "easy",
"assigned_agent": "haiku",
"blocks": [],
"blocked_by": [],
},
},
]
}
with open(temp_queue_file, "w") as f:
json.dump(queue_data, f)
# Load queue
queue_manager = QueueManager(queue_file=temp_queue_file)
assert queue_manager.size() == 2
item_159 = queue_manager.get_item(159)
assert item_159 is not None
assert item_159.status == QueueItemStatus.PENDING
assert item_159.ready is False
item_160 = queue_manager.get_item(160)
assert item_160 is not None
assert item_160.status == QueueItemStatus.IN_PROGRESS
assert item_160.ready is True
def test_persistence_load_nonexistent_file(self, temp_queue_file: Path) -> None:
"""Test loading from nonexistent file creates empty queue."""
# Don't create the file
temp_queue_file.unlink(missing_ok=True)
queue_manager = QueueManager(queue_file=temp_queue_file)
assert queue_manager.size() == 0
def test_persistence_autosave_on_enqueue(
self, queue_manager: QueueManager, temp_queue_file: Path
) -> None:
"""Test that enqueue automatically saves to disk."""
metadata = IssueMetadata()
queue_manager.enqueue(159, metadata)
# Should auto-save
assert temp_queue_file.exists()
# Load in new manager to verify
new_manager = QueueManager(queue_file=temp_queue_file)
assert new_manager.size() == 1
def test_persistence_autosave_on_mark_complete(
self, queue_manager: QueueManager, temp_queue_file: Path
) -> None:
"""Test that mark_complete automatically saves to disk."""
metadata = IssueMetadata()
queue_manager.enqueue(159, metadata)
queue_manager.mark_complete(159)
# Load in new manager to verify
new_manager = QueueManager(queue_file=temp_queue_file)
item = new_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.COMPLETED
def test_circular_dependency_detection(self, queue_manager: QueueManager) -> None:
"""Test handling of circular dependencies."""
# Create circular dependency: 159 -> 160 -> 161 -> 159
meta_159 = IssueMetadata(blocks=[160], blocked_by=[161])
meta_160 = IssueMetadata(blocks=[161], blocked_by=[159])
meta_161 = IssueMetadata(blocks=[159], blocked_by=[160])
queue_manager.enqueue(159, meta_159)
queue_manager.enqueue(160, meta_160)
queue_manager.enqueue(161, meta_161)
# Should still be able to get next ready (break the cycle gracefully)
next_item = queue_manager.get_next_ready()
assert next_item is not None
def test_list_all_items(self, queue_manager: QueueManager) -> None:
"""Test listing all items in queue."""
meta1 = IssueMetadata(assigned_agent="sonnet")
meta2 = IssueMetadata(assigned_agent="haiku")
meta3 = IssueMetadata(assigned_agent="glm")
queue_manager.enqueue(159, meta1)
queue_manager.enqueue(160, meta2)
queue_manager.enqueue(161, meta3)
all_items = queue_manager.list_all()
assert len(all_items) == 3
issue_numbers = [item.issue_number for item in all_items]
assert 159 in issue_numbers
assert 160 in issue_numbers
assert 161 in issue_numbers
def test_list_ready_items(self, queue_manager: QueueManager) -> None:
"""Test listing only ready items."""
meta_ready = IssueMetadata(blocked_by=[])
meta_blocked = IssueMetadata(blocked_by=[158])
queue_manager.enqueue(159, meta_ready)
queue_manager.enqueue(160, meta_ready)
queue_manager.enqueue(161, meta_blocked)
ready_items = queue_manager.list_ready()
assert len(ready_items) == 2
issue_numbers = [item.issue_number for item in ready_items]
assert 159 in issue_numbers
assert 160 in issue_numbers
assert 161 not in issue_numbers
def test_get_item_nonexistent(self, queue_manager: QueueManager) -> None:
"""Test getting an item that doesn't exist."""
item = queue_manager.get_item(999)
assert item is None
def test_status_transitions(self, queue_manager: QueueManager) -> None:
"""Test valid status transitions."""
metadata = IssueMetadata()
queue_manager.enqueue(159, metadata)
# PENDING -> IN_PROGRESS
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.PENDING
queue_manager.mark_in_progress(159)
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.IN_PROGRESS
# IN_PROGRESS -> COMPLETED
queue_manager.mark_complete(159)
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.COMPLETED