From d51b1bd749f35472407af185d2a7e0fbd7136216 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 20:30:28 -0600 Subject: [PATCH] feat(#151): Implement context compaction (TDD - GREEN phase) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement context compaction to free memory when agents reach 80% context usage. Features: - ContextCompactor class for handling compaction operations - Generates summary prompt asking agent to summarize completed work - Replaces conversation history with concise summary - Measures context reduction before/after compaction - Logs compaction metrics (tokens freed, reduction percentage) - Integration with ContextMonitor via trigger_compaction() method Implementation details: - CompactionResult dataclass tracks before/after metrics - Target: 40-50% context reduction when triggered at 80% - Error handling for API failures - Type-safe with mypy strict mode - 100% test coverage for new code Quality gates passed: ✅ Build (mypy): No type errors ✅ Lint (ruff): All checks passed ✅ Tests: 41/41 tests passing ✅ Coverage: 100% for context_compaction.py, 97% for context_monitor.py Co-Authored-By: Claude Opus 4.5 --- apps/coordinator/src/context_compaction.py | 170 +++++++++++++++++++++ apps/coordinator/src/context_monitor.py | 27 ++++ 2 files changed, 197 insertions(+) create mode 100644 apps/coordinator/src/context_compaction.py diff --git a/apps/coordinator/src/context_compaction.py b/apps/coordinator/src/context_compaction.py new file mode 100644 index 0000000..2de6699 --- /dev/null +++ b/apps/coordinator/src/context_compaction.py @@ -0,0 +1,170 @@ +"""Context compaction for reducing agent memory usage. + +Compaction process: +1. Request summary from agent of completed work, patterns, and decisions +2. Replace conversation history with concise summary +3. Measure and validate context reduction achieved + +Target: 40-50% context reduction when triggered at 80% threshold. +""" + +import logging +from dataclasses import dataclass +from typing import Any + +logger = logging.getLogger(__name__) + + +@dataclass +class CompactionResult: + """Result of context compaction operation. + + Attributes: + agent_id: Unique identifier for the agent + before_tokens: Token count before compaction + after_tokens: Token count after compaction + before_percent: Usage percentage before compaction + after_percent: Usage percentage after compaction + tokens_freed: Number of tokens freed by compaction + reduction_percent: Percentage of context freed + success: Whether compaction succeeded + error_message: Error message if compaction failed + """ + + agent_id: str + before_tokens: int + after_tokens: int + before_percent: float + after_percent: float + tokens_freed: int + reduction_percent: float + success: bool + error_message: str = "" + + def __repr__(self) -> str: + """String representation.""" + status = "success" if self.success else "failed" + return ( + f"CompactionResult(agent_id={self.agent_id!r}, " + f"reduction={self.reduction_percent:.1f}%, " + f"status={status})" + ) + + +class ContextCompactor: + """Handles context compaction to free agent memory. + + Compaction is triggered when an agent reaches 80% context usage. + The compactor requests a summary from the agent and replaces the + conversation history with a concise summary, freeing memory. + """ + + SUMMARY_PROMPT = """Please provide a concise summary of your completed work so far. + +Focus on: +- Key tasks completed +- Important patterns or approaches discovered +- Critical decisions made and rationale +- Any findings that future work should be aware of + +Keep the summary concise but informative. This will replace the detailed conversation history.""" + + def __init__(self, api_client: Any) -> None: + """Initialize context compactor. + + Args: + api_client: Claude API client for compaction operations + """ + self.api_client = api_client + + async def request_summary(self, agent_id: str) -> str: + """Request agent to summarize completed work. + + Args: + agent_id: Unique identifier for the agent + + Returns: + Summary text from agent + + Raises: + Exception: If API call fails + """ + logger.info(f"Requesting work summary from agent {agent_id}") + + response = await self.api_client.send_message(agent_id, self.SUMMARY_PROMPT) + summary: str = response["content"] + + logger.debug(f"Received summary from {agent_id}: {len(summary)} characters") + return summary + + async def compact(self, agent_id: str) -> CompactionResult: + """Compact agent's context by replacing history with summary. + + Args: + agent_id: Unique identifier for the agent + + Returns: + CompactionResult with before/after metrics + """ + logger.info(f"Starting context compaction for agent {agent_id}") + + try: + # Get context usage before compaction + before_usage = await self.api_client.get_context_usage(agent_id) + before_tokens = before_usage["used_tokens"] + before_total = before_usage["total_tokens"] + before_percent = (before_tokens / before_total * 100) if before_total > 0 else 0 + + logger.info( + f"Agent {agent_id} context before compaction: " + f"{before_tokens}/{before_total} ({before_percent:.1f}%)" + ) + + # Request summary from agent + summary = await self.request_summary(agent_id) + + # Replace conversation history with summary + await self.api_client.replace_history(agent_id, summary) + + # Get context usage after compaction + after_usage = await self.api_client.get_context_usage(agent_id) + after_tokens = after_usage["used_tokens"] + after_total = after_usage["total_tokens"] + after_percent = (after_tokens / after_total * 100) if after_total > 0 else 0 + + # Calculate reduction metrics + tokens_freed = before_tokens - after_tokens + reduction_percent = ( + (tokens_freed / before_tokens * 100) if before_tokens > 0 else 0 + ) + + logger.info( + f"Agent {agent_id} context after compaction: " + f"{after_tokens}/{after_total} ({after_percent:.1f}%), " + f"freed {tokens_freed} tokens ({reduction_percent:.1f}% reduction)" + ) + + return CompactionResult( + agent_id=agent_id, + before_tokens=before_tokens, + after_tokens=after_tokens, + before_percent=before_percent, + after_percent=after_percent, + tokens_freed=tokens_freed, + reduction_percent=reduction_percent, + success=True, + ) + + except Exception as e: + logger.error(f"Compaction failed for agent {agent_id}: {e}") + return CompactionResult( + agent_id=agent_id, + before_tokens=0, + after_tokens=0, + before_percent=0.0, + after_percent=0.0, + tokens_freed=0, + reduction_percent=0.0, + success=False, + error_message=str(e), + ) diff --git a/apps/coordinator/src/context_monitor.py b/apps/coordinator/src/context_monitor.py index 6d3f1e5..04c8835 100644 --- a/apps/coordinator/src/context_monitor.py +++ b/apps/coordinator/src/context_monitor.py @@ -6,6 +6,7 @@ from collections import defaultdict from collections.abc import Callable from typing import Any +from src.context_compaction import CompactionResult, ContextCompactor from src.models import ContextAction, ContextUsage logger = logging.getLogger(__name__) @@ -34,6 +35,7 @@ class ContextMonitor: self.poll_interval = poll_interval self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list) self._monitoring_tasks: dict[str, bool] = {} + self._compactor = ContextCompactor(api_client=api_client) async def get_context_usage(self, agent_id: str) -> ContextUsage: """Get current context usage for an agent. @@ -137,3 +139,28 @@ class ContextMonitor: """ self._monitoring_tasks[agent_id] = False logger.info(f"Requested stop for agent {agent_id} monitoring") + + async def trigger_compaction(self, agent_id: str) -> CompactionResult: + """Trigger context compaction for an agent. + + Replaces conversation history with a concise summary to free memory. + Target: 40-50% context reduction. + + Args: + agent_id: Unique identifier for the agent + + Returns: + CompactionResult with before/after metrics + """ + logger.info(f"Triggering context compaction for agent {agent_id}") + result = await self._compactor.compact(agent_id) + + if result.success: + logger.info( + f"Compaction successful for {agent_id}: " + f"freed {result.tokens_freed} tokens ({result.reduction_percent:.1f}% reduction)" + ) + else: + logger.error(f"Compaction failed for {agent_id}: {result.error_message}") + + return result