- Log ERROR when queue corruption detected with error details - Create timestamped backup before discarding corrupted data - Add comprehensive tests for corruption handling Refs #338 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
274 lines
8.3 KiB
Python
274 lines
8.3 KiB
Python
"""Queue manager for issue coordination."""
|
|
|
|
import json
|
|
import logging
|
|
import shutil
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from src.models import IssueMetadata
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class QueueItemStatus(str, Enum):
|
|
"""Status of a queue item."""
|
|
|
|
PENDING = "pending"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
|
|
|
|
@dataclass
|
|
class QueueItem:
|
|
"""Represents an issue in the queue."""
|
|
|
|
issue_number: int
|
|
metadata: IssueMetadata
|
|
status: QueueItemStatus = QueueItemStatus.PENDING
|
|
ready: bool = field(default=False)
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Update ready status after initialization."""
|
|
# Item is ready if it has no blockers (or all blockers are completed)
|
|
self.ready = len(self.metadata.blocked_by) == 0
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert queue item to dictionary for JSON serialization.
|
|
|
|
Returns:
|
|
Dictionary representation of queue item
|
|
"""
|
|
return {
|
|
"issue_number": self.issue_number,
|
|
"status": self.status.value,
|
|
"ready": self.ready,
|
|
"metadata": self.metadata.model_dump(),
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict[str, Any]) -> "QueueItem":
|
|
"""Create queue item from dictionary.
|
|
|
|
Args:
|
|
data: Dictionary with queue item data
|
|
|
|
Returns:
|
|
QueueItem instance
|
|
"""
|
|
return cls(
|
|
issue_number=data["issue_number"],
|
|
status=QueueItemStatus(data["status"]),
|
|
ready=data["ready"],
|
|
metadata=IssueMetadata(**data["metadata"]),
|
|
)
|
|
|
|
|
|
class QueueManager:
|
|
"""Manages the queue of issues to be processed."""
|
|
|
|
def __init__(self, queue_file: Path | None = None) -> None:
|
|
"""Initialize queue manager.
|
|
|
|
Args:
|
|
queue_file: Path to JSON file for persistence. If None, uses default.
|
|
"""
|
|
self.queue_file = queue_file or Path("queue.json")
|
|
self._items: dict[int, QueueItem] = {}
|
|
self._load()
|
|
|
|
def enqueue(self, issue_number: int, metadata: IssueMetadata) -> None:
|
|
"""Add an issue to the queue.
|
|
|
|
Args:
|
|
issue_number: Issue number
|
|
metadata: Parsed issue metadata
|
|
"""
|
|
item = QueueItem(
|
|
issue_number=issue_number,
|
|
metadata=metadata,
|
|
)
|
|
self._items[issue_number] = item
|
|
self._update_ready_status()
|
|
self.save()
|
|
|
|
def dequeue(self, issue_number: int) -> None:
|
|
"""Remove an issue from the queue.
|
|
|
|
Args:
|
|
issue_number: Issue number to remove
|
|
"""
|
|
if issue_number in self._items:
|
|
del self._items[issue_number]
|
|
self._update_ready_status()
|
|
self.save()
|
|
|
|
def get_next_ready(self) -> QueueItem | None:
|
|
"""Get the next ready item from the queue.
|
|
|
|
Returns:
|
|
Next ready QueueItem, or None if no items are ready
|
|
"""
|
|
ready_items = [
|
|
item
|
|
for item in self._items.values()
|
|
if item.ready and item.status == QueueItemStatus.PENDING
|
|
]
|
|
|
|
if not ready_items:
|
|
# If no items are ready but items exist, check for circular dependencies
|
|
# In that case, return the first pending item to break the cycle
|
|
pending_items = [
|
|
item for item in self._items.values() if item.status == QueueItemStatus.PENDING
|
|
]
|
|
if pending_items:
|
|
return pending_items[0]
|
|
return None
|
|
|
|
# Return first ready item (sorted by issue number for determinism)
|
|
ready_items.sort(key=lambda x: x.issue_number)
|
|
return ready_items[0]
|
|
|
|
def mark_complete(self, issue_number: int) -> None:
|
|
"""Mark an issue as completed.
|
|
|
|
Args:
|
|
issue_number: Issue number to mark as complete
|
|
"""
|
|
if issue_number in self._items:
|
|
self._items[issue_number].status = QueueItemStatus.COMPLETED
|
|
self._update_ready_status()
|
|
self.save()
|
|
|
|
def mark_in_progress(self, issue_number: int) -> None:
|
|
"""Mark an issue as in progress.
|
|
|
|
Args:
|
|
issue_number: Issue number to mark as in progress
|
|
"""
|
|
if issue_number in self._items:
|
|
self._items[issue_number].status = QueueItemStatus.IN_PROGRESS
|
|
self.save()
|
|
|
|
def get_item(self, issue_number: int) -> QueueItem | None:
|
|
"""Get a specific queue item.
|
|
|
|
Args:
|
|
issue_number: Issue number
|
|
|
|
Returns:
|
|
QueueItem if found, None otherwise
|
|
"""
|
|
return self._items.get(issue_number)
|
|
|
|
def list_all(self) -> list[QueueItem]:
|
|
"""Get all items in the queue.
|
|
|
|
Returns:
|
|
List of all queue items
|
|
"""
|
|
return list(self._items.values())
|
|
|
|
def list_ready(self) -> list[QueueItem]:
|
|
"""Get all ready items in the queue.
|
|
|
|
Returns:
|
|
List of ready queue items
|
|
"""
|
|
return [item for item in self._items.values() if item.ready]
|
|
|
|
def size(self) -> int:
|
|
"""Get the number of items in the queue.
|
|
|
|
Returns:
|
|
Number of items in queue
|
|
"""
|
|
return len(self._items)
|
|
|
|
def _update_ready_status(self) -> None:
|
|
"""Update ready status for all items based on dependencies.
|
|
|
|
An item is ready if all its blockers are completed.
|
|
"""
|
|
# Get all completed issue numbers
|
|
completed_issues = {
|
|
issue_num
|
|
for issue_num, item in self._items.items()
|
|
if item.status == QueueItemStatus.COMPLETED
|
|
}
|
|
|
|
# Update ready status for each item
|
|
for item in self._items.values():
|
|
# Item is ready if it has no blockers or all blockers are completed
|
|
if not item.metadata.blocked_by:
|
|
item.ready = True
|
|
else:
|
|
# Check if all blockers are completed (they must be in the queue and completed)
|
|
blockers_satisfied = all(
|
|
blocker in completed_issues for blocker in item.metadata.blocked_by
|
|
)
|
|
item.ready = blockers_satisfied
|
|
|
|
def save(self) -> None:
|
|
"""Persist queue to disk as JSON."""
|
|
queue_data = {"items": [item.to_dict() for item in self._items.values()]}
|
|
|
|
with open(self.queue_file, "w") as f:
|
|
json.dump(queue_data, f, indent=2)
|
|
|
|
def _load(self) -> None:
|
|
"""Load queue from disk if it exists."""
|
|
if not self.queue_file.exists():
|
|
return
|
|
|
|
try:
|
|
with open(self.queue_file) as f:
|
|
data = json.load(f)
|
|
|
|
for item_data in data.get("items", []):
|
|
item = QueueItem.from_dict(item_data)
|
|
self._items[item.issue_number] = item
|
|
|
|
# Update ready status after loading
|
|
self._update_ready_status()
|
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
|
# Log corruption details and create backup before discarding
|
|
self._handle_corrupted_queue(e)
|
|
|
|
def _handle_corrupted_queue(self, error: Exception) -> None:
|
|
"""Handle corrupted queue file by logging, backing up, and resetting.
|
|
|
|
Args:
|
|
error: The exception that was raised during loading
|
|
"""
|
|
# Log error with details
|
|
logger.error(
|
|
"Queue file corruption detected in '%s': %s - %s",
|
|
self.queue_file,
|
|
type(error).__name__,
|
|
str(error),
|
|
)
|
|
|
|
# Create backup of corrupted file with timestamp
|
|
if self.queue_file.exists():
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = self.queue_file.with_suffix(f".corrupted.{timestamp}.json")
|
|
try:
|
|
shutil.copy2(self.queue_file, backup_path)
|
|
logger.error(
|
|
"Corrupted queue file backed up to '%s'",
|
|
backup_path,
|
|
)
|
|
except OSError as backup_error:
|
|
logger.error(
|
|
"Failed to create backup of corrupted queue file: %s",
|
|
backup_error,
|
|
)
|
|
|
|
# Reset to empty queue
|
|
self._items = {}
|
|
logger.error("Queue reset to empty state after corruption")
|