fix(#338): Log queue corruption and backup corrupted file
- Log ERROR when queue corruption detected with error details - Create timestamped backup before discarding corrupted data - Add comprehensive tests for corruption handling Refs #338 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,18 @@
|
|||||||
"""Queue manager for issue coordination."""
|
"""Queue manager for issue coordination."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
import shutil
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from src.models import IssueMetadata
|
from src.models import IssueMetadata
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class QueueItemStatus(str, Enum):
|
class QueueItemStatus(str, Enum):
|
||||||
"""Status of a queue item."""
|
"""Status of a queue item."""
|
||||||
@@ -229,6 +234,40 @@ class QueueManager:
|
|||||||
|
|
||||||
# Update ready status after loading
|
# Update ready status after loading
|
||||||
self._update_ready_status()
|
self._update_ready_status()
|
||||||
except (json.JSONDecodeError, KeyError, ValueError):
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||||
# If file is corrupted, start with empty queue
|
# Log corruption details and create backup before discarding
|
||||||
self._items = {}
|
self._handle_corrupted_queue(e)
|
||||||
|
|
||||||
|
def _handle_corrupted_queue(self, error: Exception) -> None:
|
||||||
|
"""Handle corrupted queue file by logging, backing up, and resetting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: The exception that was raised during loading
|
||||||
|
"""
|
||||||
|
# Log error with details
|
||||||
|
logger.error(
|
||||||
|
"Queue file corruption detected in '%s': %s - %s",
|
||||||
|
self.queue_file,
|
||||||
|
type(error).__name__,
|
||||||
|
str(error),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create backup of corrupted file with timestamp
|
||||||
|
if self.queue_file.exists():
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
backup_path = self.queue_file.with_suffix(f".corrupted.{timestamp}.json")
|
||||||
|
try:
|
||||||
|
shutil.copy2(self.queue_file, backup_path)
|
||||||
|
logger.error(
|
||||||
|
"Corrupted queue file backed up to '%s'",
|
||||||
|
backup_path,
|
||||||
|
)
|
||||||
|
except OSError as backup_error:
|
||||||
|
logger.error(
|
||||||
|
"Failed to create backup of corrupted queue file: %s",
|
||||||
|
backup_error,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reset to empty queue
|
||||||
|
self._items = {}
|
||||||
|
logger.error("Queue reset to empty state after corruption")
|
||||||
|
|||||||
@@ -474,3 +474,142 @@ class TestQueueManager:
|
|||||||
item = queue_manager.get_item(159)
|
item = queue_manager.get_item(159)
|
||||||
assert item is not None
|
assert item is not None
|
||||||
assert item.status == QueueItemStatus.COMPLETED
|
assert item.status == QueueItemStatus.COMPLETED
|
||||||
|
|
||||||
|
|
||||||
|
class TestQueueCorruptionHandling:
|
||||||
|
"""Tests for queue file corruption handling."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_queue_file(self) -> Generator[Path, None, None]:
|
||||||
|
"""Create a temporary file for queue persistence."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
|
||||||
|
temp_path = Path(f.name)
|
||||||
|
yield temp_path
|
||||||
|
# Cleanup - remove main file and any backup files
|
||||||
|
if temp_path.exists():
|
||||||
|
temp_path.unlink()
|
||||||
|
# Clean up backup files
|
||||||
|
for backup in temp_path.parent.glob(f"{temp_path.stem}.corrupted.*.json"):
|
||||||
|
backup.unlink()
|
||||||
|
|
||||||
|
def test_corrupted_json_logs_error_and_creates_backup(
|
||||||
|
self, temp_queue_file: Path, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that corrupted JSON file triggers logging and backup creation."""
|
||||||
|
# Write invalid JSON to the file
|
||||||
|
with open(temp_queue_file, "w") as f:
|
||||||
|
f.write("{ invalid json content }")
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR):
|
||||||
|
queue_manager = QueueManager(queue_file=temp_queue_file)
|
||||||
|
|
||||||
|
# Verify queue is empty after corruption
|
||||||
|
assert queue_manager.size() == 0
|
||||||
|
|
||||||
|
# Verify error was logged
|
||||||
|
assert "Queue file corruption detected" in caplog.text
|
||||||
|
assert "JSONDecodeError" in caplog.text
|
||||||
|
|
||||||
|
# Verify backup file was created
|
||||||
|
backup_files = list(temp_queue_file.parent.glob(f"{temp_queue_file.stem}.corrupted.*.json"))
|
||||||
|
assert len(backup_files) == 1
|
||||||
|
assert "Corrupted queue file backed up" in caplog.text
|
||||||
|
|
||||||
|
# Verify backup contains original corrupted content
|
||||||
|
with open(backup_files[0]) as f:
|
||||||
|
backup_content = f.read()
|
||||||
|
assert "invalid json content" in backup_content
|
||||||
|
|
||||||
|
def test_corrupted_structure_logs_error_and_creates_backup(
|
||||||
|
self, temp_queue_file: Path, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that valid JSON with invalid structure triggers logging and backup."""
|
||||||
|
# Write valid JSON but with missing required fields
|
||||||
|
with open(temp_queue_file, "w") as f:
|
||||||
|
json.dump(
|
||||||
|
{
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"issue_number": 159,
|
||||||
|
# Missing "status", "ready", "metadata" fields
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
f,
|
||||||
|
)
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR):
|
||||||
|
queue_manager = QueueManager(queue_file=temp_queue_file)
|
||||||
|
|
||||||
|
# Verify queue is empty after corruption
|
||||||
|
assert queue_manager.size() == 0
|
||||||
|
|
||||||
|
# Verify error was logged (KeyError for missing fields)
|
||||||
|
assert "Queue file corruption detected" in caplog.text
|
||||||
|
assert "KeyError" in caplog.text
|
||||||
|
|
||||||
|
# Verify backup file was created
|
||||||
|
backup_files = list(temp_queue_file.parent.glob(f"{temp_queue_file.stem}.corrupted.*.json"))
|
||||||
|
assert len(backup_files) == 1
|
||||||
|
|
||||||
|
def test_invalid_status_value_logs_error_and_creates_backup(
|
||||||
|
self, temp_queue_file: Path, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that invalid enum value triggers logging and backup."""
|
||||||
|
# Write valid JSON but with invalid status enum value
|
||||||
|
with open(temp_queue_file, "w") as f:
|
||||||
|
json.dump(
|
||||||
|
{
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"issue_number": 159,
|
||||||
|
"status": "invalid_status",
|
||||||
|
"ready": True,
|
||||||
|
"metadata": {
|
||||||
|
"estimated_context": 50000,
|
||||||
|
"difficulty": "medium",
|
||||||
|
"assigned_agent": "sonnet",
|
||||||
|
"blocks": [],
|
||||||
|
"blocked_by": [],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
f,
|
||||||
|
)
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR):
|
||||||
|
queue_manager = QueueManager(queue_file=temp_queue_file)
|
||||||
|
|
||||||
|
# Verify queue is empty after corruption
|
||||||
|
assert queue_manager.size() == 0
|
||||||
|
|
||||||
|
# Verify error was logged (ValueError for invalid enum)
|
||||||
|
assert "Queue file corruption detected" in caplog.text
|
||||||
|
assert "ValueError" in caplog.text
|
||||||
|
|
||||||
|
# Verify backup file was created
|
||||||
|
backup_files = list(temp_queue_file.parent.glob(f"{temp_queue_file.stem}.corrupted.*.json"))
|
||||||
|
assert len(backup_files) == 1
|
||||||
|
|
||||||
|
def test_queue_reset_logged_after_corruption(
|
||||||
|
self, temp_queue_file: Path, caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that queue reset is logged after handling corruption."""
|
||||||
|
# Write invalid JSON
|
||||||
|
with open(temp_queue_file, "w") as f:
|
||||||
|
f.write("not valid json")
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
with caplog.at_level(logging.ERROR):
|
||||||
|
QueueManager(queue_file=temp_queue_file)
|
||||||
|
|
||||||
|
# Verify the reset was logged
|
||||||
|
assert "Queue reset to empty state after corruption" in caplog.text
|
||||||
|
|||||||
Reference in New Issue
Block a user