feat(#151): Implement context compaction (TDD - GREEN phase)

Implement context compaction to free memory when agents reach 80% context usage.

Features:
- ContextCompactor class for handling compaction operations
- Generates summary prompt asking agent to summarize completed work
- Replaces conversation history with concise summary
- Measures context reduction before/after compaction
- Logs compaction metrics (tokens freed, reduction percentage)
- Integration with ContextMonitor via trigger_compaction() method

Implementation details:
- CompactionResult dataclass tracks before/after metrics
- Target: 40-50% context reduction when triggered at 80%
- Error handling for API failures
- Type-safe with mypy strict mode
- 100% test coverage for new code

Quality gates passed:
 Build (mypy): No type errors
 Lint (ruff): All checks passed
 Tests: 41/41 tests passing
 Coverage: 100% for context_compaction.py, 97% for context_monitor.py

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 20:30:28 -06:00
parent 32ab2da145
commit d51b1bd749
2 changed files with 197 additions and 0 deletions

View File

@@ -0,0 +1,170 @@
"""Context compaction for reducing agent memory usage.
Compaction process:
1. Request summary from agent of completed work, patterns, and decisions
2. Replace conversation history with concise summary
3. Measure and validate context reduction achieved
Target: 40-50% context reduction when triggered at 80% threshold.
"""
import logging
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class CompactionResult:
"""Result of context compaction operation.
Attributes:
agent_id: Unique identifier for the agent
before_tokens: Token count before compaction
after_tokens: Token count after compaction
before_percent: Usage percentage before compaction
after_percent: Usage percentage after compaction
tokens_freed: Number of tokens freed by compaction
reduction_percent: Percentage of context freed
success: Whether compaction succeeded
error_message: Error message if compaction failed
"""
agent_id: str
before_tokens: int
after_tokens: int
before_percent: float
after_percent: float
tokens_freed: int
reduction_percent: float
success: bool
error_message: str = ""
def __repr__(self) -> str:
"""String representation."""
status = "success" if self.success else "failed"
return (
f"CompactionResult(agent_id={self.agent_id!r}, "
f"reduction={self.reduction_percent:.1f}%, "
f"status={status})"
)
class ContextCompactor:
"""Handles context compaction to free agent memory.
Compaction is triggered when an agent reaches 80% context usage.
The compactor requests a summary from the agent and replaces the
conversation history with a concise summary, freeing memory.
"""
SUMMARY_PROMPT = """Please provide a concise summary of your completed work so far.
Focus on:
- Key tasks completed
- Important patterns or approaches discovered
- Critical decisions made and rationale
- Any findings that future work should be aware of
Keep the summary concise but informative. This will replace the detailed conversation history."""
def __init__(self, api_client: Any) -> None:
"""Initialize context compactor.
Args:
api_client: Claude API client for compaction operations
"""
self.api_client = api_client
async def request_summary(self, agent_id: str) -> str:
"""Request agent to summarize completed work.
Args:
agent_id: Unique identifier for the agent
Returns:
Summary text from agent
Raises:
Exception: If API call fails
"""
logger.info(f"Requesting work summary from agent {agent_id}")
response = await self.api_client.send_message(agent_id, self.SUMMARY_PROMPT)
summary: str = response["content"]
logger.debug(f"Received summary from {agent_id}: {len(summary)} characters")
return summary
async def compact(self, agent_id: str) -> CompactionResult:
"""Compact agent's context by replacing history with summary.
Args:
agent_id: Unique identifier for the agent
Returns:
CompactionResult with before/after metrics
"""
logger.info(f"Starting context compaction for agent {agent_id}")
try:
# Get context usage before compaction
before_usage = await self.api_client.get_context_usage(agent_id)
before_tokens = before_usage["used_tokens"]
before_total = before_usage["total_tokens"]
before_percent = (before_tokens / before_total * 100) if before_total > 0 else 0
logger.info(
f"Agent {agent_id} context before compaction: "
f"{before_tokens}/{before_total} ({before_percent:.1f}%)"
)
# Request summary from agent
summary = await self.request_summary(agent_id)
# Replace conversation history with summary
await self.api_client.replace_history(agent_id, summary)
# Get context usage after compaction
after_usage = await self.api_client.get_context_usage(agent_id)
after_tokens = after_usage["used_tokens"]
after_total = after_usage["total_tokens"]
after_percent = (after_tokens / after_total * 100) if after_total > 0 else 0
# Calculate reduction metrics
tokens_freed = before_tokens - after_tokens
reduction_percent = (
(tokens_freed / before_tokens * 100) if before_tokens > 0 else 0
)
logger.info(
f"Agent {agent_id} context after compaction: "
f"{after_tokens}/{after_total} ({after_percent:.1f}%), "
f"freed {tokens_freed} tokens ({reduction_percent:.1f}% reduction)"
)
return CompactionResult(
agent_id=agent_id,
before_tokens=before_tokens,
after_tokens=after_tokens,
before_percent=before_percent,
after_percent=after_percent,
tokens_freed=tokens_freed,
reduction_percent=reduction_percent,
success=True,
)
except Exception as e:
logger.error(f"Compaction failed for agent {agent_id}: {e}")
return CompactionResult(
agent_id=agent_id,
before_tokens=0,
after_tokens=0,
before_percent=0.0,
after_percent=0.0,
tokens_freed=0,
reduction_percent=0.0,
success=False,
error_message=str(e),
)

View File

@@ -6,6 +6,7 @@ from collections import defaultdict
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
from src.context_compaction import CompactionResult, ContextCompactor
from src.models import ContextAction, ContextUsage from src.models import ContextAction, ContextUsage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -34,6 +35,7 @@ class ContextMonitor:
self.poll_interval = poll_interval self.poll_interval = poll_interval
self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list) self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list)
self._monitoring_tasks: dict[str, bool] = {} self._monitoring_tasks: dict[str, bool] = {}
self._compactor = ContextCompactor(api_client=api_client)
async def get_context_usage(self, agent_id: str) -> ContextUsage: async def get_context_usage(self, agent_id: str) -> ContextUsage:
"""Get current context usage for an agent. """Get current context usage for an agent.
@@ -137,3 +139,28 @@ class ContextMonitor:
""" """
self._monitoring_tasks[agent_id] = False self._monitoring_tasks[agent_id] = False
logger.info(f"Requested stop for agent {agent_id} monitoring") logger.info(f"Requested stop for agent {agent_id} monitoring")
async def trigger_compaction(self, agent_id: str) -> CompactionResult:
"""Trigger context compaction for an agent.
Replaces conversation history with a concise summary to free memory.
Target: 40-50% context reduction.
Args:
agent_id: Unique identifier for the agent
Returns:
CompactionResult with before/after metrics
"""
logger.info(f"Triggering context compaction for agent {agent_id}")
result = await self._compactor.compact(agent_id)
if result.success:
logger.info(
f"Compaction successful for {agent_id}: "
f"freed {result.tokens_freed} tokens ({result.reduction_percent:.1f}% reduction)"
)
else:
logger.error(f"Compaction failed for {agent_id}: {result.error_message}")
return result