feat(#151): Implement context compaction (TDD - GREEN phase)
Implement context compaction to free memory when agents reach 80% context usage. Features: - ContextCompactor class for handling compaction operations - Generates summary prompt asking agent to summarize completed work - Replaces conversation history with concise summary - Measures context reduction before/after compaction - Logs compaction metrics (tokens freed, reduction percentage) - Integration with ContextMonitor via trigger_compaction() method Implementation details: - CompactionResult dataclass tracks before/after metrics - Target: 40-50% context reduction when triggered at 80% - Error handling for API failures - Type-safe with mypy strict mode - 100% test coverage for new code Quality gates passed: ✅ Build (mypy): No type errors ✅ Lint (ruff): All checks passed ✅ Tests: 41/41 tests passing ✅ Coverage: 100% for context_compaction.py, 97% for context_monitor.py Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
170
apps/coordinator/src/context_compaction.py
Normal file
170
apps/coordinator/src/context_compaction.py
Normal file
@@ -0,0 +1,170 @@
|
|||||||
|
"""Context compaction for reducing agent memory usage.
|
||||||
|
|
||||||
|
Compaction process:
|
||||||
|
1. Request summary from agent of completed work, patterns, and decisions
|
||||||
|
2. Replace conversation history with concise summary
|
||||||
|
3. Measure and validate context reduction achieved
|
||||||
|
|
||||||
|
Target: 40-50% context reduction when triggered at 80% threshold.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CompactionResult:
|
||||||
|
"""Result of context compaction operation.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
before_tokens: Token count before compaction
|
||||||
|
after_tokens: Token count after compaction
|
||||||
|
before_percent: Usage percentage before compaction
|
||||||
|
after_percent: Usage percentage after compaction
|
||||||
|
tokens_freed: Number of tokens freed by compaction
|
||||||
|
reduction_percent: Percentage of context freed
|
||||||
|
success: Whether compaction succeeded
|
||||||
|
error_message: Error message if compaction failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
agent_id: str
|
||||||
|
before_tokens: int
|
||||||
|
after_tokens: int
|
||||||
|
before_percent: float
|
||||||
|
after_percent: float
|
||||||
|
tokens_freed: int
|
||||||
|
reduction_percent: float
|
||||||
|
success: bool
|
||||||
|
error_message: str = ""
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
"""String representation."""
|
||||||
|
status = "success" if self.success else "failed"
|
||||||
|
return (
|
||||||
|
f"CompactionResult(agent_id={self.agent_id!r}, "
|
||||||
|
f"reduction={self.reduction_percent:.1f}%, "
|
||||||
|
f"status={status})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ContextCompactor:
|
||||||
|
"""Handles context compaction to free agent memory.
|
||||||
|
|
||||||
|
Compaction is triggered when an agent reaches 80% context usage.
|
||||||
|
The compactor requests a summary from the agent and replaces the
|
||||||
|
conversation history with a concise summary, freeing memory.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SUMMARY_PROMPT = """Please provide a concise summary of your completed work so far.
|
||||||
|
|
||||||
|
Focus on:
|
||||||
|
- Key tasks completed
|
||||||
|
- Important patterns or approaches discovered
|
||||||
|
- Critical decisions made and rationale
|
||||||
|
- Any findings that future work should be aware of
|
||||||
|
|
||||||
|
Keep the summary concise but informative. This will replace the detailed conversation history."""
|
||||||
|
|
||||||
|
def __init__(self, api_client: Any) -> None:
|
||||||
|
"""Initialize context compactor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_client: Claude API client for compaction operations
|
||||||
|
"""
|
||||||
|
self.api_client = api_client
|
||||||
|
|
||||||
|
async def request_summary(self, agent_id: str) -> str:
|
||||||
|
"""Request agent to summarize completed work.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Summary text from agent
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If API call fails
|
||||||
|
"""
|
||||||
|
logger.info(f"Requesting work summary from agent {agent_id}")
|
||||||
|
|
||||||
|
response = await self.api_client.send_message(agent_id, self.SUMMARY_PROMPT)
|
||||||
|
summary: str = response["content"]
|
||||||
|
|
||||||
|
logger.debug(f"Received summary from {agent_id}: {len(summary)} characters")
|
||||||
|
return summary
|
||||||
|
|
||||||
|
async def compact(self, agent_id: str) -> CompactionResult:
|
||||||
|
"""Compact agent's context by replacing history with summary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompactionResult with before/after metrics
|
||||||
|
"""
|
||||||
|
logger.info(f"Starting context compaction for agent {agent_id}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get context usage before compaction
|
||||||
|
before_usage = await self.api_client.get_context_usage(agent_id)
|
||||||
|
before_tokens = before_usage["used_tokens"]
|
||||||
|
before_total = before_usage["total_tokens"]
|
||||||
|
before_percent = (before_tokens / before_total * 100) if before_total > 0 else 0
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Agent {agent_id} context before compaction: "
|
||||||
|
f"{before_tokens}/{before_total} ({before_percent:.1f}%)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Request summary from agent
|
||||||
|
summary = await self.request_summary(agent_id)
|
||||||
|
|
||||||
|
# Replace conversation history with summary
|
||||||
|
await self.api_client.replace_history(agent_id, summary)
|
||||||
|
|
||||||
|
# Get context usage after compaction
|
||||||
|
after_usage = await self.api_client.get_context_usage(agent_id)
|
||||||
|
after_tokens = after_usage["used_tokens"]
|
||||||
|
after_total = after_usage["total_tokens"]
|
||||||
|
after_percent = (after_tokens / after_total * 100) if after_total > 0 else 0
|
||||||
|
|
||||||
|
# Calculate reduction metrics
|
||||||
|
tokens_freed = before_tokens - after_tokens
|
||||||
|
reduction_percent = (
|
||||||
|
(tokens_freed / before_tokens * 100) if before_tokens > 0 else 0
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Agent {agent_id} context after compaction: "
|
||||||
|
f"{after_tokens}/{after_total} ({after_percent:.1f}%), "
|
||||||
|
f"freed {tokens_freed} tokens ({reduction_percent:.1f}% reduction)"
|
||||||
|
)
|
||||||
|
|
||||||
|
return CompactionResult(
|
||||||
|
agent_id=agent_id,
|
||||||
|
before_tokens=before_tokens,
|
||||||
|
after_tokens=after_tokens,
|
||||||
|
before_percent=before_percent,
|
||||||
|
after_percent=after_percent,
|
||||||
|
tokens_freed=tokens_freed,
|
||||||
|
reduction_percent=reduction_percent,
|
||||||
|
success=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Compaction failed for agent {agent_id}: {e}")
|
||||||
|
return CompactionResult(
|
||||||
|
agent_id=agent_id,
|
||||||
|
before_tokens=0,
|
||||||
|
after_tokens=0,
|
||||||
|
before_percent=0.0,
|
||||||
|
after_percent=0.0,
|
||||||
|
tokens_freed=0,
|
||||||
|
reduction_percent=0.0,
|
||||||
|
success=False,
|
||||||
|
error_message=str(e),
|
||||||
|
)
|
||||||
@@ -6,6 +6,7 @@ from collections import defaultdict
|
|||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from src.context_compaction import CompactionResult, ContextCompactor
|
||||||
from src.models import ContextAction, ContextUsage
|
from src.models import ContextAction, ContextUsage
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -34,6 +35,7 @@ class ContextMonitor:
|
|||||||
self.poll_interval = poll_interval
|
self.poll_interval = poll_interval
|
||||||
self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list)
|
self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list)
|
||||||
self._monitoring_tasks: dict[str, bool] = {}
|
self._monitoring_tasks: dict[str, bool] = {}
|
||||||
|
self._compactor = ContextCompactor(api_client=api_client)
|
||||||
|
|
||||||
async def get_context_usage(self, agent_id: str) -> ContextUsage:
|
async def get_context_usage(self, agent_id: str) -> ContextUsage:
|
||||||
"""Get current context usage for an agent.
|
"""Get current context usage for an agent.
|
||||||
@@ -137,3 +139,28 @@ class ContextMonitor:
|
|||||||
"""
|
"""
|
||||||
self._monitoring_tasks[agent_id] = False
|
self._monitoring_tasks[agent_id] = False
|
||||||
logger.info(f"Requested stop for agent {agent_id} monitoring")
|
logger.info(f"Requested stop for agent {agent_id} monitoring")
|
||||||
|
|
||||||
|
async def trigger_compaction(self, agent_id: str) -> CompactionResult:
|
||||||
|
"""Trigger context compaction for an agent.
|
||||||
|
|
||||||
|
Replaces conversation history with a concise summary to free memory.
|
||||||
|
Target: 40-50% context reduction.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_id: Unique identifier for the agent
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CompactionResult with before/after metrics
|
||||||
|
"""
|
||||||
|
logger.info(f"Triggering context compaction for agent {agent_id}")
|
||||||
|
result = await self._compactor.compact(agent_id)
|
||||||
|
|
||||||
|
if result.success:
|
||||||
|
logger.info(
|
||||||
|
f"Compaction successful for {agent_id}: "
|
||||||
|
f"freed {result.tokens_freed} tokens ({result.reduction_percent:.1f}% reduction)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.error(f"Compaction failed for {agent_id}: {result.error_message}")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|||||||
Reference in New Issue
Block a user