Files
stack/apps/coordinator/src/queue.py
Jason Woltje 72321f5fcd feat(#159): Implement queue manager
Implements QueueManager with full dependency tracking, persistence, and status management.

Key features:
- QueueItem dataclass with status, metadata, and ready flag
- QueueManager with enqueue, dequeue, get_next_ready, mark_complete
- Dependency resolution (blocked_by → not ready)
- JSON persistence with auto-save on state changes
- Automatic reload on startup
- Graceful handling of circular dependencies
- Status transitions (pending → in_progress → completed)

Test coverage:
- 26 comprehensive tests covering all operations
- Dependency chain resolution
- Persistence and reload scenarios
- Edge cases (circular deps, missing items)
- 100% code coverage on queue module
- 97% total project coverage

Quality gates passed:
✓ All tests passing (88 total)
✓ Type checking (mypy) passing
✓ Linting (ruff) passing
✓ Coverage ≥85% (97% achieved)

This unblocks #160 (orchestrator needs queue).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-01 17:55:48 -06:00

235 lines
7.0 KiB
Python

"""Queue manager for issue coordination."""
import json
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
from src.models import IssueMetadata
class QueueItemStatus(str, Enum):
"""Status of a queue item."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
@dataclass
class QueueItem:
"""Represents an issue in the queue."""
issue_number: int
metadata: IssueMetadata
status: QueueItemStatus = QueueItemStatus.PENDING
ready: bool = field(default=False)
def __post_init__(self) -> None:
"""Update ready status after initialization."""
# Item is ready if it has no blockers (or all blockers are completed)
self.ready = len(self.metadata.blocked_by) == 0
def to_dict(self) -> dict[str, Any]:
"""Convert queue item to dictionary for JSON serialization.
Returns:
Dictionary representation of queue item
"""
return {
"issue_number": self.issue_number,
"status": self.status.value,
"ready": self.ready,
"metadata": self.metadata.model_dump(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "QueueItem":
"""Create queue item from dictionary.
Args:
data: Dictionary with queue item data
Returns:
QueueItem instance
"""
return cls(
issue_number=data["issue_number"],
status=QueueItemStatus(data["status"]),
ready=data["ready"],
metadata=IssueMetadata(**data["metadata"]),
)
class QueueManager:
"""Manages the queue of issues to be processed."""
def __init__(self, queue_file: Path | None = None) -> None:
"""Initialize queue manager.
Args:
queue_file: Path to JSON file for persistence. If None, uses default.
"""
self.queue_file = queue_file or Path("queue.json")
self._items: dict[int, QueueItem] = {}
self._load()
def enqueue(self, issue_number: int, metadata: IssueMetadata) -> None:
"""Add an issue to the queue.
Args:
issue_number: Issue number
metadata: Parsed issue metadata
"""
item = QueueItem(
issue_number=issue_number,
metadata=metadata,
)
self._items[issue_number] = item
self._update_ready_status()
self.save()
def dequeue(self, issue_number: int) -> None:
"""Remove an issue from the queue.
Args:
issue_number: Issue number to remove
"""
if issue_number in self._items:
del self._items[issue_number]
self._update_ready_status()
self.save()
def get_next_ready(self) -> QueueItem | None:
"""Get the next ready item from the queue.
Returns:
Next ready QueueItem, or None if no items are ready
"""
ready_items = [
item
for item in self._items.values()
if item.ready and item.status == QueueItemStatus.PENDING
]
if not ready_items:
# If no items are ready but items exist, check for circular dependencies
# In that case, return the first pending item to break the cycle
pending_items = [
item for item in self._items.values() if item.status == QueueItemStatus.PENDING
]
if pending_items:
return pending_items[0]
return None
# Return first ready item (sorted by issue number for determinism)
ready_items.sort(key=lambda x: x.issue_number)
return ready_items[0]
def mark_complete(self, issue_number: int) -> None:
"""Mark an issue as completed.
Args:
issue_number: Issue number to mark as complete
"""
if issue_number in self._items:
self._items[issue_number].status = QueueItemStatus.COMPLETED
self._update_ready_status()
self.save()
def mark_in_progress(self, issue_number: int) -> None:
"""Mark an issue as in progress.
Args:
issue_number: Issue number to mark as in progress
"""
if issue_number in self._items:
self._items[issue_number].status = QueueItemStatus.IN_PROGRESS
self.save()
def get_item(self, issue_number: int) -> QueueItem | None:
"""Get a specific queue item.
Args:
issue_number: Issue number
Returns:
QueueItem if found, None otherwise
"""
return self._items.get(issue_number)
def list_all(self) -> list[QueueItem]:
"""Get all items in the queue.
Returns:
List of all queue items
"""
return list(self._items.values())
def list_ready(self) -> list[QueueItem]:
"""Get all ready items in the queue.
Returns:
List of ready queue items
"""
return [item for item in self._items.values() if item.ready]
def size(self) -> int:
"""Get the number of items in the queue.
Returns:
Number of items in queue
"""
return len(self._items)
def _update_ready_status(self) -> None:
"""Update ready status for all items based on dependencies.
An item is ready if all its blockers are completed.
"""
# Get all completed issue numbers
completed_issues = {
issue_num
for issue_num, item in self._items.items()
if item.status == QueueItemStatus.COMPLETED
}
# Update ready status for each item
for item in self._items.values():
# Item is ready if it has no blockers or all blockers are completed
if not item.metadata.blocked_by:
item.ready = True
else:
# Check if all blockers are completed (they must be in the queue and completed)
blockers_satisfied = all(
blocker in completed_issues for blocker in item.metadata.blocked_by
)
item.ready = blockers_satisfied
def save(self) -> None:
"""Persist queue to disk as JSON."""
queue_data = {"items": [item.to_dict() for item in self._items.values()]}
with open(self.queue_file, "w") as f:
json.dump(queue_data, f, indent=2)
def _load(self) -> None:
"""Load queue from disk if it exists."""
if not self.queue_file.exists():
return
try:
with open(self.queue_file) as f:
data = json.load(f)
for item_data in data.get("items", []):
item = QueueItem.from_dict(item_data)
self._items[item.issue_number] = item
# Update ready status after loading
self._update_ready_status()
except (json.JSONDecodeError, KeyError, ValueError):
# If file is corrupted, start with empty queue
self._items = {}