Implement session rotation that spawns fresh agents when context reaches 95% threshold. TDD Process: 1. RED: Write comprehensive tests (all initially fail) 2. GREEN: Implement trigger_rotation method (all tests pass) Changes: - Add SessionRotation dataclass to track rotation metrics - Implement trigger_rotation method in ContextMonitor - Add 6 new unit tests covering all acceptance criteria Rotation process: 1. Get current context usage metrics 2. Close current agent session 3. Spawn new agent with same type 4. Transfer next issue to new agent 5. Log rotation event with metrics Test Results: - All 47 tests pass (34 context_monitor + 13 context_compaction) - 97% coverage on context_monitor.py (exceeds 85% requirement) - 97% coverage on context_compaction.py (exceeds 85% requirement) Prevents context exhaustion by starting fresh when compaction is insufficient. Acceptance Criteria (All Met): ✓ Rotation triggered at 95% context threshold ✓ Current session closed cleanly ✓ New agent spawned with same type ✓ Next issue transferred to new agent ✓ Rotation logged with session IDs and context metrics ✓ Unit tests with 85%+ coverage Fixes #152 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
206 lines
6.8 KiB
Python
206 lines
6.8 KiB
Python
"""Context compaction for reducing agent memory usage.
|
|
|
|
Compaction process:
|
|
1. Request summary from agent of completed work, patterns, and decisions
|
|
2. Replace conversation history with concise summary
|
|
3. Measure and validate context reduction achieved
|
|
|
|
Target: 40-50% context reduction when triggered at 80% threshold.
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class CompactionResult:
|
|
"""Result of context compaction operation.
|
|
|
|
Attributes:
|
|
agent_id: Unique identifier for the agent
|
|
before_tokens: Token count before compaction
|
|
after_tokens: Token count after compaction
|
|
before_percent: Usage percentage before compaction
|
|
after_percent: Usage percentage after compaction
|
|
tokens_freed: Number of tokens freed by compaction
|
|
reduction_percent: Percentage of context freed
|
|
success: Whether compaction succeeded
|
|
error_message: Error message if compaction failed
|
|
"""
|
|
|
|
agent_id: str
|
|
before_tokens: int
|
|
after_tokens: int
|
|
before_percent: float
|
|
after_percent: float
|
|
tokens_freed: int
|
|
reduction_percent: float
|
|
success: bool
|
|
error_message: str = ""
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation."""
|
|
status = "success" if self.success else "failed"
|
|
return (
|
|
f"CompactionResult(agent_id={self.agent_id!r}, "
|
|
f"reduction={self.reduction_percent:.1f}%, "
|
|
f"status={status})"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class SessionRotation:
|
|
"""Result of session rotation operation.
|
|
|
|
Attributes:
|
|
old_agent_id: Identifier of the closed agent session
|
|
new_agent_id: Identifier of the newly spawned agent
|
|
agent_type: Type of agent (sonnet, haiku, opus, glm)
|
|
next_issue_number: Issue number transferred to new agent
|
|
context_before_tokens: Token count before rotation
|
|
context_before_percent: Usage percentage before rotation
|
|
success: Whether rotation succeeded
|
|
error_message: Error message if rotation failed
|
|
"""
|
|
|
|
old_agent_id: str
|
|
new_agent_id: str
|
|
agent_type: str
|
|
next_issue_number: int
|
|
context_before_tokens: int
|
|
context_before_percent: float
|
|
success: bool
|
|
error_message: str = ""
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation."""
|
|
status = "success" if self.success else "failed"
|
|
return (
|
|
f"SessionRotation(old={self.old_agent_id!r}, "
|
|
f"new={self.new_agent_id!r}, "
|
|
f"issue=#{self.next_issue_number}, "
|
|
f"status={status})"
|
|
)
|
|
|
|
|
|
class ContextCompactor:
|
|
"""Handles context compaction to free agent memory.
|
|
|
|
Compaction is triggered when an agent reaches 80% context usage.
|
|
The compactor requests a summary from the agent and replaces the
|
|
conversation history with a concise summary, freeing memory.
|
|
"""
|
|
|
|
SUMMARY_PROMPT = """Please provide a concise summary of your completed work so far.
|
|
|
|
Focus on:
|
|
- Key tasks completed
|
|
- Important patterns or approaches discovered
|
|
- Critical decisions made and rationale
|
|
- Any findings that future work should be aware of
|
|
|
|
Keep the summary concise but informative. This will replace the detailed conversation history."""
|
|
|
|
def __init__(self, api_client: Any) -> None:
|
|
"""Initialize context compactor.
|
|
|
|
Args:
|
|
api_client: Claude API client for compaction operations
|
|
"""
|
|
self.api_client = api_client
|
|
|
|
async def request_summary(self, agent_id: str) -> str:
|
|
"""Request agent to summarize completed work.
|
|
|
|
Args:
|
|
agent_id: Unique identifier for the agent
|
|
|
|
Returns:
|
|
Summary text from agent
|
|
|
|
Raises:
|
|
Exception: If API call fails
|
|
"""
|
|
logger.info(f"Requesting work summary from agent {agent_id}")
|
|
|
|
response = await self.api_client.send_message(agent_id, self.SUMMARY_PROMPT)
|
|
summary: str = response["content"]
|
|
|
|
logger.debug(f"Received summary from {agent_id}: {len(summary)} characters")
|
|
return summary
|
|
|
|
async def compact(self, agent_id: str) -> CompactionResult:
|
|
"""Compact agent's context by replacing history with summary.
|
|
|
|
Args:
|
|
agent_id: Unique identifier for the agent
|
|
|
|
Returns:
|
|
CompactionResult with before/after metrics
|
|
"""
|
|
logger.info(f"Starting context compaction for agent {agent_id}")
|
|
|
|
try:
|
|
# Get context usage before compaction
|
|
before_usage = await self.api_client.get_context_usage(agent_id)
|
|
before_tokens = before_usage["used_tokens"]
|
|
before_total = before_usage["total_tokens"]
|
|
before_percent = (before_tokens / before_total * 100) if before_total > 0 else 0
|
|
|
|
logger.info(
|
|
f"Agent {agent_id} context before compaction: "
|
|
f"{before_tokens}/{before_total} ({before_percent:.1f}%)"
|
|
)
|
|
|
|
# Request summary from agent
|
|
summary = await self.request_summary(agent_id)
|
|
|
|
# Replace conversation history with summary
|
|
await self.api_client.replace_history(agent_id, summary)
|
|
|
|
# Get context usage after compaction
|
|
after_usage = await self.api_client.get_context_usage(agent_id)
|
|
after_tokens = after_usage["used_tokens"]
|
|
after_total = after_usage["total_tokens"]
|
|
after_percent = (after_tokens / after_total * 100) if after_total > 0 else 0
|
|
|
|
# Calculate reduction metrics
|
|
tokens_freed = before_tokens - after_tokens
|
|
reduction_percent = (
|
|
(tokens_freed / before_tokens * 100) if before_tokens > 0 else 0
|
|
)
|
|
|
|
logger.info(
|
|
f"Agent {agent_id} context after compaction: "
|
|
f"{after_tokens}/{after_total} ({after_percent:.1f}%), "
|
|
f"freed {tokens_freed} tokens ({reduction_percent:.1f}% reduction)"
|
|
)
|
|
|
|
return CompactionResult(
|
|
agent_id=agent_id,
|
|
before_tokens=before_tokens,
|
|
after_tokens=after_tokens,
|
|
before_percent=before_percent,
|
|
after_percent=after_percent,
|
|
tokens_freed=tokens_freed,
|
|
reduction_percent=reduction_percent,
|
|
success=True,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Compaction failed for agent {agent_id}: {e}")
|
|
return CompactionResult(
|
|
agent_id=agent_id,
|
|
before_tokens=0,
|
|
after_tokens=0,
|
|
before_percent=0.0,
|
|
after_percent=0.0,
|
|
tokens_freed=0,
|
|
reduction_percent=0.0,
|
|
success=False,
|
|
error_message=str(e),
|
|
)
|