feat(#160): Implement basic orchestration loop

Implements the Coordinator class with main orchestration loop:
- Async loop architecture with configurable poll interval
- process_queue() method gets next ready issue and spawns agent (stub)
- Graceful shutdown handling with stop() method
- Error handling that allows loop to continue after failures
- Logging for all actions (start, stop, processing, errors)
- Integration with QueueManager from #159
- Active agent tracking for future agent management

Configuration settings added:
- COORDINATOR_POLL_INTERVAL (default: 5.0s)
- COORDINATOR_MAX_CONCURRENT_AGENTS (default: 10)
- COORDINATOR_ENABLED (default: true)

Tests: 27 new tests covering all acceptance criteria
Coverage: 92% overall (100% for coordinator.py)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 18:03:12 -06:00
parent f0fd0bed41
commit 88953fc998
9 changed files with 1043 additions and 24 deletions

View File

@@ -112,8 +112,9 @@ def client(webhook_secret: str, gitea_url: str, monkeypatch: pytest.MonkeyPatch)
monkeypatch.setenv("LOG_LEVEL", "debug")
# Force reload of settings
from src import config
import importlib
from src import config
importlib.reload(config)
# Import app after settings are configured

View File

@@ -1,8 +1,7 @@
"""Tests for context monitoring."""
import asyncio
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock
import pytest
@@ -60,7 +59,9 @@ class TestContextMonitor:
assert monitor.ROTATE_THRESHOLD == 0.95
@pytest.mark.asyncio
async def test_get_context_usage_api_call(self, monitor: ContextMonitor, mock_claude_api: AsyncMock) -> None:
async def test_get_context_usage_api_call(
self, monitor: ContextMonitor, mock_claude_api: AsyncMock
) -> None:
"""Should call Claude API to get context usage."""
# Mock API response
mock_claude_api.get_context_usage.return_value = {

View File

@@ -0,0 +1,746 @@
"""Tests for the Coordinator orchestration loop."""
import asyncio
import tempfile
from collections.abc import Generator
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
from src.models import IssueMetadata
from src.queue import QueueItem, QueueItemStatus, QueueManager
class TestCoordinator:
"""Tests for the Coordinator class."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
def test_coordinator_initialization(self, queue_manager: QueueManager) -> None:
"""Test creating a Coordinator with required dependencies."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager)
assert coordinator.queue_manager is queue_manager
assert coordinator.is_running is False
assert coordinator.poll_interval == 5.0 # Default poll interval
def test_coordinator_custom_poll_interval(self, queue_manager: QueueManager) -> None:
"""Test creating a Coordinator with custom poll interval."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=2.0)
assert coordinator.poll_interval == 2.0
@pytest.mark.asyncio
async def test_process_queue_no_items(self, queue_manager: QueueManager) -> None:
"""Test process_queue when queue is empty."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager)
result = await coordinator.process_queue()
# Should return None when no items to process
assert result is None
@pytest.mark.asyncio
async def test_process_queue_gets_next_ready(self, queue_manager: QueueManager) -> None:
"""Test process_queue gets the next ready item from queue."""
from src.coordinator import Coordinator
# Add items to queue
meta1 = IssueMetadata(assigned_agent="sonnet")
meta2 = IssueMetadata(assigned_agent="haiku")
queue_manager.enqueue(159, meta1)
queue_manager.enqueue(160, meta2)
coordinator = Coordinator(queue_manager=queue_manager)
result = await coordinator.process_queue()
# Should return the first ready item (159)
assert result is not None
assert result.issue_number == 159
@pytest.mark.asyncio
async def test_process_queue_marks_item_in_progress(
self, queue_manager: QueueManager
) -> None:
"""Test process_queue marks the item as in_progress before spawning agent."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
status_during_spawn: QueueItemStatus | None = None
original_spawn_agent = coordinator.spawn_agent
async def capturing_spawn_agent(item: QueueItem) -> bool:
nonlocal status_during_spawn
# Capture status while agent is "running"
queue_item = queue_manager.get_item(159)
if queue_item:
status_during_spawn = queue_item.status
return await original_spawn_agent(item)
coordinator.spawn_agent = capturing_spawn_agent # type: ignore[method-assign]
await coordinator.process_queue()
# Status during spawn should have been IN_PROGRESS
assert status_during_spawn == QueueItemStatus.IN_PROGRESS
@pytest.mark.asyncio
async def test_process_queue_spawns_agent_stub(self, queue_manager: QueueManager) -> None:
"""Test process_queue calls spawn_agent (stub implementation)."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = True
await coordinator.process_queue()
mock_spawn.assert_called_once()
# Verify it was called with the correct item
call_args = mock_spawn.call_args[0]
assert call_args[0].issue_number == 159
@pytest.mark.asyncio
async def test_process_queue_marks_complete_on_success(
self, queue_manager: QueueManager
) -> None:
"""Test process_queue marks item complete after successful agent spawn."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = True
await coordinator.process_queue()
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.COMPLETED
@pytest.mark.asyncio
async def test_process_queue_handles_agent_failure(
self, queue_manager: QueueManager
) -> None:
"""Test process_queue handles agent spawn failure gracefully."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = False # Agent failed
await coordinator.process_queue()
# Item should remain in progress (not completed) on failure
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.IN_PROGRESS
@pytest.mark.asyncio
async def test_spawn_agent_stub_returns_true(self, queue_manager: QueueManager) -> None:
"""Test spawn_agent stub implementation returns True."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
item = QueueItem(issue_number=159, metadata=meta)
coordinator = Coordinator(queue_manager=queue_manager)
result = await coordinator.spawn_agent(item)
# Stub always returns True
assert result is True
@pytest.mark.asyncio
async def test_spawn_agent_logs_agent_type(self, queue_manager: QueueManager) -> None:
"""Test spawn_agent logs the agent type being spawned."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="opus")
item = QueueItem(issue_number=159, metadata=meta)
coordinator = Coordinator(queue_manager=queue_manager)
with patch("src.coordinator.logger") as mock_logger:
await coordinator.spawn_agent(item)
# Should log that we're spawning an agent
mock_logger.info.assert_called()
call_str = str(mock_logger.info.call_args)
assert "159" in call_str or "opus" in call_str
class TestCoordinatorLoop:
"""Tests for the Coordinator orchestration loop."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
@pytest.mark.asyncio
async def test_start_begins_running(self, queue_manager: QueueManager) -> None:
"""Test that start() sets is_running to True."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1)
# Start in background
task = asyncio.create_task(coordinator.start())
# Give it a moment to start
await asyncio.sleep(0.05)
assert coordinator.is_running is True
# Cleanup
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_stop_halts_loop(self, queue_manager: QueueManager) -> None:
"""Test that stop() halts the orchestration loop."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1)
# Start and then stop
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.05)
await coordinator.stop()
await asyncio.sleep(0.15)
assert coordinator.is_running is False
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_loop_processes_queue_repeatedly(self, queue_manager: QueueManager) -> None:
"""Test that the loop calls process_queue repeatedly."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
call_count = 0
original_process_queue = coordinator.process_queue
async def counting_process_queue() -> QueueItem | None:
nonlocal call_count
call_count += 1
return await original_process_queue()
coordinator.process_queue = counting_process_queue # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.2) # Allow time for multiple iterations
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should have been called multiple times
assert call_count >= 2
@pytest.mark.asyncio
async def test_loop_respects_poll_interval(self, queue_manager: QueueManager) -> None:
"""Test that the loop waits for poll_interval between iterations."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1)
timestamps: list[float] = []
original_process_queue = coordinator.process_queue
async def tracking_process_queue() -> QueueItem | None:
timestamps.append(asyncio.get_event_loop().time())
return await original_process_queue()
coordinator.process_queue = tracking_process_queue # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.35) # Allow time for 3-4 iterations
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Check intervals between calls
if len(timestamps) >= 2:
for i in range(1, len(timestamps)):
interval = timestamps[i] - timestamps[i - 1]
# Should be approximately poll_interval (with some tolerance)
assert interval >= 0.08, f"Interval {interval} is too short"
assert interval <= 0.15, f"Interval {interval} is too long"
class TestCoordinatorErrorHandling:
"""Tests for Coordinator error handling."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
@pytest.mark.asyncio
async def test_loop_continues_after_process_queue_error(
self, queue_manager: QueueManager
) -> None:
"""Test that the loop continues running after process_queue raises an error."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
call_count = 0
error_raised = False
async def failing_process_queue() -> QueueItem | None:
nonlocal call_count, error_raised
call_count += 1
if call_count == 1:
error_raised = True
raise RuntimeError("Simulated error")
return None
coordinator.process_queue = failing_process_queue # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.2)
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should have continued after the error
assert error_raised is True
assert call_count >= 2
@pytest.mark.asyncio
async def test_error_is_logged(self, queue_manager: QueueManager) -> None:
"""Test that errors are logged properly."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
async def failing_process_queue() -> QueueItem | None:
raise RuntimeError("Test error message")
coordinator.process_queue = failing_process_queue # type: ignore[method-assign]
with patch("src.coordinator.logger") as mock_logger:
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.1)
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should have logged the error
mock_logger.error.assert_called()
@pytest.mark.asyncio
async def test_spawn_agent_exception_handled(self, queue_manager: QueueManager) -> None:
"""Test that exceptions in spawn_agent are handled gracefully."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
with patch.object(coordinator, "spawn_agent", new_callable=AsyncMock) as mock_spawn:
mock_spawn.side_effect = RuntimeError("Agent spawn failed")
# Should not raise - error handled internally
await coordinator.process_queue()
# Item should remain in progress
item = queue_manager.get_item(159)
assert item is not None
assert item.status == QueueItemStatus.IN_PROGRESS
class TestCoordinatorGracefulShutdown:
"""Tests for Coordinator graceful shutdown."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
@pytest.mark.asyncio
async def test_stop_is_idempotent(self, queue_manager: QueueManager) -> None:
"""Test that stop() can be called multiple times safely."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1)
# Call stop multiple times without starting
await coordinator.stop()
await coordinator.stop()
await coordinator.stop()
# Should not raise any errors
assert coordinator.is_running is False
@pytest.mark.asyncio
async def test_stop_waits_for_current_process(self, queue_manager: QueueManager) -> None:
"""Test that stop() waits for current process_queue to complete."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.5)
processing_started = asyncio.Event()
processing_done = asyncio.Event()
original_process_queue = coordinator.process_queue
async def slow_process_queue() -> QueueItem | None:
processing_started.set()
await asyncio.sleep(0.2) # Simulate slow processing
result = await original_process_queue()
processing_done.set()
return result
coordinator.process_queue = slow_process_queue # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
# Wait for processing to start
await processing_started.wait()
# Request stop while processing
stop_task = asyncio.create_task(coordinator.stop())
# Wait for both to complete
await asyncio.wait_for(processing_done.wait(), timeout=1.0)
await stop_task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert coordinator.is_running is False
@pytest.mark.asyncio
async def test_shutdown_logs_message(self, queue_manager: QueueManager) -> None:
"""Test that shutdown logs appropriate messages."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.1)
with patch("src.coordinator.logger") as mock_logger:
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.05)
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should log startup and shutdown
info_calls = [str(call) for call in mock_logger.info.call_args_list]
assert any("start" in call.lower() or "stop" in call.lower() for call in info_calls)
class TestCoordinatorIntegration:
"""Integration tests for Coordinator with QueueManager."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
@pytest.mark.asyncio
async def test_processes_multiple_items_in_order(
self, queue_manager: QueueManager
) -> None:
"""Test that coordinator processes items in dependency order."""
from src.coordinator import Coordinator
# 158 blocks 159
meta_158 = IssueMetadata(blocks=[159], blocked_by=[], assigned_agent="sonnet")
meta_159 = IssueMetadata(blocks=[], blocked_by=[158], assigned_agent="haiku")
queue_manager.enqueue(158, meta_158)
queue_manager.enqueue(159, meta_159)
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
processed_items: list[int] = []
original_spawn_agent = coordinator.spawn_agent
async def tracking_spawn_agent(item: QueueItem) -> bool:
processed_items.append(item.issue_number)
return await original_spawn_agent(item)
coordinator.spawn_agent = tracking_spawn_agent # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.3) # Allow time for processing
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# 158 should be processed before 159 (dependency order)
assert 158 in processed_items
assert 159 in processed_items
assert processed_items.index(158) < processed_items.index(159)
@pytest.mark.asyncio
async def test_completes_all_items_in_queue(self, queue_manager: QueueManager) -> None:
"""Test that coordinator eventually completes all items."""
from src.coordinator import Coordinator
# Add multiple items without dependencies
for i in range(157, 162):
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(i, meta)
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.02)
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.5) # Allow time for processing
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# All items should be completed
for i in range(157, 162):
item = queue_manager.get_item(i)
assert item is not None
assert item.status == QueueItemStatus.COMPLETED
@pytest.mark.asyncio
async def test_skips_already_completed_items(self, queue_manager: QueueManager) -> None:
"""Test that coordinator skips items already marked as completed."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
queue_manager.mark_complete(159) # Pre-complete it
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
spawn_count = 0
original_spawn_agent = coordinator.spawn_agent
async def counting_spawn_agent(item: QueueItem) -> bool:
nonlocal spawn_count
spawn_count += 1
return await original_spawn_agent(item)
coordinator.spawn_agent = counting_spawn_agent # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.2)
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should not have spawned any agents (item already completed)
assert spawn_count == 0
@pytest.mark.asyncio
async def test_skips_in_progress_items(self, queue_manager: QueueManager) -> None:
"""Test that coordinator skips items already in progress."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
queue_manager.mark_in_progress(159) # Pre-mark as in progress
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.05)
spawn_count = 0
original_spawn_agent = coordinator.spawn_agent
async def counting_spawn_agent(item: QueueItem) -> bool:
nonlocal spawn_count
spawn_count += 1
return await original_spawn_agent(item)
coordinator.spawn_agent = counting_spawn_agent # type: ignore[method-assign]
task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.2)
await coordinator.stop()
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Should not have spawned any agents (item already in progress)
assert spawn_count == 0
class TestCoordinatorActiveAgents:
"""Tests for tracking active agents."""
@pytest.fixture
def temp_queue_file(self) -> Generator[Path, None, None]:
"""Create a temporary file for queue persistence."""
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_path = Path(f.name)
yield temp_path
# Cleanup
if temp_path.exists():
temp_path.unlink()
@pytest.fixture
def queue_manager(self, temp_queue_file: Path) -> QueueManager:
"""Create a queue manager with temporary storage."""
return QueueManager(queue_file=temp_queue_file)
def test_active_agents_initially_empty(self, queue_manager: QueueManager) -> None:
"""Test that active_agents is empty on initialization."""
from src.coordinator import Coordinator
coordinator = Coordinator(queue_manager=queue_manager)
assert coordinator.active_agents == {}
@pytest.mark.asyncio
async def test_active_agents_tracks_spawned_agents(
self, queue_manager: QueueManager
) -> None:
"""Test that active_agents tracks agents as they are spawned."""
from src.coordinator import Coordinator
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(159, meta)
coordinator = Coordinator(queue_manager=queue_manager)
await coordinator.process_queue()
# Agent should be tracked (stub stores issue number)
assert 159 in coordinator.active_agents
@pytest.mark.asyncio
async def test_get_active_agent_count(self, queue_manager: QueueManager) -> None:
"""Test getting count of active agents."""
from src.coordinator import Coordinator
for i in range(157, 160):
meta = IssueMetadata(assigned_agent="sonnet")
queue_manager.enqueue(i, meta)
coordinator = Coordinator(queue_manager=queue_manager)
# Process all items
await coordinator.process_queue()
await coordinator.process_queue()
await coordinator.process_queue()
assert coordinator.get_active_agent_count() == 3

View File

@@ -1,13 +1,12 @@
"""Tests for issue parser agent."""
import os
from unittest.mock import Mock, patch
import pytest
from unittest.mock import Mock, patch, AsyncMock
from anthropic import Anthropic
from anthropic.types import Message, TextBlock, Usage
from src.parser import parse_issue_metadata, clear_cache
from src.models import IssueMetadata
from src.parser import clear_cache, parse_issue_metadata
@pytest.fixture(autouse=True)
@@ -88,7 +87,10 @@ def mock_anthropic_response() -> Message:
content=[
TextBlock(
type="text",
text='{"estimated_context": 46800, "difficulty": "medium", "assigned_agent": "sonnet", "blocks": [159], "blocked_by": [157]}'
text=(
'{"estimated_context": 46800, "difficulty": "medium", '
'"assigned_agent": "sonnet", "blocks": [159], "blocked_by": [157]}'
),
)
],
model="claude-sonnet-4.5-20250929",
@@ -107,7 +109,10 @@ def mock_anthropic_minimal_response() -> Message:
content=[
TextBlock(
type="text",
text='{"estimated_context": 50000, "difficulty": "medium", "assigned_agent": "sonnet", "blocks": [], "blocked_by": []}'
text=(
'{"estimated_context": 50000, "difficulty": "medium", '
'"assigned_agent": "sonnet", "blocks": [], "blocked_by": []}'
),
)
],
model="claude-sonnet-4.5-20250929",
@@ -306,7 +311,10 @@ class TestParseIssueMetadata:
content=[
TextBlock(
type="text",
text='{"estimated_context": 10000, "difficulty": "invalid", "assigned_agent": "sonnet", "blocks": [], "blocked_by": []}'
text=(
'{"estimated_context": 10000, "difficulty": "invalid", '
'"assigned_agent": "sonnet", "blocks": [], "blocked_by": []}'
),
)
],
model="claude-sonnet-4.5-20250929",
@@ -341,7 +349,10 @@ class TestParseIssueMetadata:
content=[
TextBlock(
type="text",
text='{"estimated_context": 10000, "difficulty": "medium", "assigned_agent": "invalid_agent", "blocks": [], "blocked_by": []}'
text=(
'{"estimated_context": 10000, "difficulty": "medium", '
'"assigned_agent": "invalid_agent", "blocks": [], "blocked_by": []}'
),
)
],
model="claude-sonnet-4.5-20250929",

View File

@@ -3,8 +3,6 @@
import hmac
import json
import pytest
class TestSignatureVerification:
"""Test suite for HMAC SHA256 signature verification."""