feat(#155): Build basic context monitor

Implements ContextMonitor class with real-time token usage tracking:
- COMPACT_THRESHOLD at 0.80 (80% triggers compaction)
- ROTATE_THRESHOLD at 0.95 (95% triggers rotation)
- Poll Claude API for context usage
- Return appropriate ContextAction based on thresholds
- Background monitoring loop (10-second polling)
- Log usage over time
- Error handling and recovery

Added ContextUsage model for tracking agent token consumption.

Tests:
- 25 test cases covering all functionality
- 100% coverage for context_monitor.py and models.py
- Mocked API responses for different usage levels
- Background monitoring and threshold detection
- Error handling verification

Quality gates:
- Type checking: PASS (mypy)
- Linting: PASS (ruff)
- Tests: PASS (25/25)
- Coverage: 100% for new files, 95.43% overall

Fixes #155

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 17:49:09 -06:00
parent 5639d085b4
commit d54c65360a
3 changed files with 630 additions and 0 deletions

View File

@@ -0,0 +1,139 @@
"""Context monitoring for agent token usage tracking."""
import asyncio
import logging
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from src.models import ContextAction, ContextUsage
logger = logging.getLogger(__name__)
class ContextMonitor:
"""Monitor agent context usage and trigger threshold-based actions.
Tracks agent token usage in real-time by polling the Claude API.
Triggers appropriate actions based on defined thresholds:
- 80% (COMPACT_THRESHOLD): Trigger context compaction
- 95% (ROTATE_THRESHOLD): Trigger session rotation
"""
COMPACT_THRESHOLD = 0.80 # 80% triggers compaction
ROTATE_THRESHOLD = 0.95 # 95% triggers rotation
def __init__(self, api_client: Any, poll_interval: float = 10.0) -> None:
"""Initialize context monitor.
Args:
api_client: Claude API client for fetching context usage
poll_interval: Seconds between polls (default: 10s)
"""
self.api_client = api_client
self.poll_interval = poll_interval
self._usage_history: dict[str, list[ContextUsage]] = defaultdict(list)
self._monitoring_tasks: dict[str, bool] = {}
async def get_context_usage(self, agent_id: str) -> ContextUsage:
"""Get current context usage for an agent.
Args:
agent_id: Unique identifier for the agent
Returns:
ContextUsage object with current token usage
Raises:
Exception: If API call fails
"""
response = await self.api_client.get_context_usage(agent_id)
usage = ContextUsage(
agent_id=agent_id,
used_tokens=response["used_tokens"],
total_tokens=response["total_tokens"],
)
# Log usage to history
self._usage_history[agent_id].append(usage)
logger.debug(f"Context usage for {agent_id}: {usage.usage_percent:.1f}%")
return usage
async def determine_action(self, agent_id: str) -> ContextAction:
"""Determine appropriate action based on current context usage.
Args:
agent_id: Unique identifier for the agent
Returns:
ContextAction based on threshold crossings
"""
usage = await self.get_context_usage(agent_id)
if usage.usage_ratio >= self.ROTATE_THRESHOLD:
logger.warning(
f"Agent {agent_id} hit ROTATE threshold: {usage.usage_percent:.1f}%"
)
return ContextAction.ROTATE_SESSION
elif usage.usage_ratio >= self.COMPACT_THRESHOLD:
logger.info(
f"Agent {agent_id} hit COMPACT threshold: {usage.usage_percent:.1f}%"
)
return ContextAction.COMPACT
else:
logger.debug(f"Agent {agent_id} continuing: {usage.usage_percent:.1f}%")
return ContextAction.CONTINUE
def get_usage_history(self, agent_id: str) -> list[ContextUsage]:
"""Get historical context usage for an agent.
Args:
agent_id: Unique identifier for the agent
Returns:
List of ContextUsage objects in chronological order
"""
return self._usage_history[agent_id]
async def start_monitoring(
self, agent_id: str, callback: Callable[[str, ContextAction], None]
) -> None:
"""Start background monitoring loop for an agent.
Polls context usage at regular intervals and calls callback with
appropriate actions when thresholds are crossed.
Args:
agent_id: Unique identifier for the agent
callback: Function to call with (agent_id, action) on each poll
"""
self._monitoring_tasks[agent_id] = True
logger.info(
f"Started monitoring agent {agent_id} (poll interval: {self.poll_interval}s)"
)
while self._monitoring_tasks.get(agent_id, False):
try:
action = await self.determine_action(agent_id)
callback(agent_id, action)
except Exception as e:
logger.error(f"Error monitoring agent {agent_id}: {e}")
# Continue monitoring despite errors
# Wait for next poll (or until stopped)
try:
await asyncio.sleep(self.poll_interval)
except asyncio.CancelledError:
break
logger.info(f"Stopped monitoring agent {agent_id}")
def stop_monitoring(self, agent_id: str) -> None:
"""Stop background monitoring for an agent.
Args:
agent_id: Unique identifier for the agent
"""
self._monitoring_tasks[agent_id] = False
logger.info(f"Requested stop for agent {agent_id} monitoring")

View File

@@ -0,0 +1,110 @@
"""Data models for mosaic-coordinator."""
from enum import Enum
from typing import Literal
from pydantic import BaseModel, Field, field_validator
class ContextAction(str, Enum):
"""Actions to take based on context usage thresholds."""
CONTINUE = "continue" # Below compact threshold, keep working
COMPACT = "compact" # Hit 80% threshold, summarize and compact
ROTATE_SESSION = "rotate_session" # Hit 95% threshold, spawn new agent
class ContextUsage:
"""Agent context usage information."""
def __init__(self, agent_id: str, used_tokens: int, total_tokens: int) -> None:
"""Initialize context usage.
Args:
agent_id: Unique identifier for the agent
used_tokens: Number of tokens currently used
total_tokens: Total token capacity for this agent
"""
self.agent_id = agent_id
self.used_tokens = used_tokens
self.total_tokens = total_tokens
@property
def usage_ratio(self) -> float:
"""Calculate usage as a ratio (0.0-1.0).
Returns:
Ratio of used tokens to total capacity
"""
if self.total_tokens == 0:
return 0.0
return self.used_tokens / self.total_tokens
@property
def usage_percent(self) -> float:
"""Calculate usage as a percentage (0-100).
Returns:
Percentage of context used
"""
return self.usage_ratio * 100
def __repr__(self) -> str:
"""String representation."""
return (
f"ContextUsage(agent_id={self.agent_id!r}, "
f"used={self.used_tokens}, total={self.total_tokens}, "
f"usage={self.usage_percent:.1f}%)"
)
class IssueMetadata(BaseModel):
"""Parsed metadata from issue body."""
estimated_context: int = Field(
default=50000,
description="Estimated context size in tokens",
ge=0
)
difficulty: Literal["easy", "medium", "hard"] = Field(
default="medium",
description="Issue difficulty level"
)
assigned_agent: Literal["sonnet", "haiku", "opus", "glm"] = Field(
default="sonnet",
description="Recommended AI agent for this issue"
)
blocks: list[int] = Field(
default_factory=list,
description="List of issue numbers this issue blocks"
)
blocked_by: list[int] = Field(
default_factory=list,
description="List of issue numbers blocking this issue"
)
@field_validator("difficulty", mode="before")
@classmethod
def validate_difficulty(cls, v: str) -> str:
"""Validate difficulty, default to medium if invalid."""
valid_values = ["easy", "medium", "hard"]
if v not in valid_values:
return "medium"
return v
@field_validator("assigned_agent", mode="before")
@classmethod
def validate_agent(cls, v: str) -> str:
"""Validate agent, default to sonnet if invalid."""
valid_values = ["sonnet", "haiku", "opus", "glm"]
if v not in valid_values:
return "sonnet"
return v
@field_validator("blocks", "blocked_by", mode="before")
@classmethod
def validate_issue_lists(cls, v: list[int] | None) -> list[int]:
"""Ensure issue lists are never None."""
if v is None:
return []
return v