Add comprehensive usage budget management design to M6 orchestration architecture. FEATURES: - Real-time usage tracking across agents - Budget allocation per task/milestone/project - Usage projection and burn rate calculation - Throttling decisions to prevent budget exhaustion - Model tier optimization (Haiku/Sonnet/Opus) - Pre-commit usage validation DATA MODEL: - usage_budgets table (allocated/consumed/remaining) - agent_usage_logs table (per-agent tracking) - Valkey keys for real-time state BUDGET CHECKPOINTS: 1. Task assignment - can afford this task? 2. Agent spawn - verify budget headroom 3. Checkpoint intervals - periodic compliance 4. Pre-commit validation - usage efficiency PRIORITY: MVP (M6 Phase 3) for basic tracking, Phase 5 for advanced projection and optimization. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
48 KiB
Agent Orchestration Layer
Version: 1.0
Status: Design Phase
Author: Mosaic Stack Team
Date: 2025-01-29
Table of Contents
- Problem Statement
- Architecture Overview
- Database Schema
- Valkey/Redis Key Patterns
- API Endpoints
- Coordinator Service Design
- Recovery & Resilience Patterns
- Implementation Phases
Problem Statement
Current Limitations
-
No Persistent Orchestration
- Agents spawned in sessions have no lifecycle management beyond the session
- Work continuity is lost when Gateway restarts or sessions end
- No coordination between agents working on the same project
-
No Health Monitoring
- No visibility into agent status after spawn
- Failed agents leave tasks in limbo
- No automated recovery from agent failures
-
No Task Persistence
- Task state only exists in session memory
- Users can't track long-running work
- No audit trail of agent activity
-
Conversation Interference
- Main session conversations derail active development work
- Context switching disrupts agent focus
- No separation between user chat and background work
Requirements
The Agent Orchestration Layer must provide:
- Persistent Task Management — Task definitions, state, and history survive restarts
- Autonomous Coordination — Independent of main session; agents work continuously
- Health Monitoring — Real-time tracking of agent status and progress
- Automatic Recovery — Detect failures and resume work intelligently
- Complete Audit Trail — Every state transition logged for debugging and accountability
- Multi-Workspace Support — Row-level security and tenant isolation
Architecture Overview
High-Level Design
┌─────────────────────────────────────────────────────────────────┐
│ API Layer │
│ (NestJS Controllers + Guards + Interceptors) │
└────────────┬─────────────────────────────────────┬──────────────┘
│ │
│ │
┌────────▼────────┐ ┌────────▼─────────┐
│ Task Manager │ │ Agent Manager │
│ Service │ │ Service │
└────────┬────────┘ └────────┬─────────┘
│ │
│ ┌───────────────────┐ │
└─────────► Coordinator ◄──────┘
│ Service │
└─────────┬─────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
┌───────▼────────┐ ┌────────▼────────┐ ┌──────▼──────┐
│ PostgreSQL │ │ Valkey/Redis │ │ Gateway │
│ (Persistent) │ │ (Runtime) │ │ (Agent │
│ │ │ │ │ Spawner) │
└────────────────┘ └─────────────────┘ └─────────────┘
Component Responsibilities
| Component | Responsibility |
|---|---|
| Task Manager | CRUD operations on tasks, state transitions, assignment logic |
| Agent Manager | Agent lifecycle, health tracking, session management |
| Coordinator | Heartbeat processing, failure detection, recovery orchestration |
| PostgreSQL | Persistent storage of tasks, agents, sessions, logs |
| Valkey/Redis | Runtime state, heartbeats, quick lookups, pub/sub |
| Gateway | Agent spawning, session management, message routing |
Database Schema
Task State Machine
┌──────────────┐
│ PENDING │
└──────┬───────┘
│
┌──────▼───────┐
┌────┤ ASSIGNED ├────┐
│ └──────┬───────┘ │
│ │ │
┌──────▼───────┐ │ ┌────────▼──────┐
│ RUNNING │ │ │ PAUSED │
└──────┬───────┘ │ └────────┬──────┘
│ │ │
│ ┌──────▼───────┐ │
└────► COMPLETED ◄────┘
└──────┬───────┘
│
┌──────▼───────┐
│ ARCHIVED │
└──────────────┘
┌──────────────┐
│ FAILED │
└──────┬───────┘
│
┌──────▼───────┐
│ ABORTED │────┐
└──────────────┘ │
│ │
└────────────┘
(retry → PENDING)
New Tables
agent_tasks
Extends existing task management with orchestration-specific fields.
CREATE TYPE task_orchestration_status AS ENUM (
'PENDING', -- Awaiting assignment
'ASSIGNED', -- Assigned to an agent
'RUNNING', -- Agent actively working
'PAUSED', -- User paused
'COMPLETED', -- Successfully finished
'FAILED', -- Failed with error
'ABORTED', -- Terminated (stale agent, manual cancel)
'ARCHIVED' -- Historical record
);
CREATE TABLE agent_tasks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
-- Task Definition
title VARCHAR(255) NOT NULL,
description TEXT,
task_type VARCHAR(100) NOT NULL, -- 'development', 'research', 'documentation', etc.
-- Status & Priority
status task_orchestration_status DEFAULT 'PENDING',
priority INT DEFAULT 5, -- 1 (low) to 10 (high)
-- Assignment
agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,
session_key VARCHAR(255), -- Current active session
-- Progress Tracking
progress_percent INT DEFAULT 0 CHECK (progress_percent BETWEEN 0 AND 100),
current_step TEXT,
estimated_completion_at TIMESTAMPTZ,
-- Retry Logic
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
retry_backoff_seconds INT DEFAULT 300, -- 5 minutes
last_error TEXT,
-- Dependencies
depends_on UUID[] DEFAULT ARRAY[]::UUID[], -- Array of task IDs
blocks UUID[] DEFAULT ARRAY[]::UUID[], -- Tasks blocked by this one
-- Context
input_context JSONB DEFAULT '{}', -- Input data/params for agent
output_result JSONB, -- Final result from agent
checkpoint_data JSONB DEFAULT '{}', -- Resumable state
-- Metadata
metadata JSONB DEFAULT '{}',
tags VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[],
-- Audit
created_by UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
assigned_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
CONSTRAINT fk_workspace FOREIGN KEY (workspace_id) REFERENCES workspaces(id),
CONSTRAINT fk_agent FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE INDEX idx_agent_tasks_workspace ON agent_tasks(workspace_id);
CREATE INDEX idx_agent_tasks_status ON agent_tasks(status) WHERE status IN ('PENDING', 'ASSIGNED', 'RUNNING');
CREATE INDEX idx_agent_tasks_agent ON agent_tasks(agent_id) WHERE agent_id IS NOT NULL;
CREATE INDEX idx_agent_tasks_priority ON agent_tasks(priority DESC, created_at ASC) WHERE status = 'PENDING';
CREATE INDEX idx_agent_tasks_depends_on ON agent_tasks USING gin(depends_on);
CREATE INDEX idx_agent_tasks_tags ON agent_tasks USING gin(tags);
agent_task_logs
Immutable audit trail of all task state transitions and agent actions.
CREATE TYPE task_log_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL');
CREATE TABLE agent_task_logs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
task_id UUID NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE,
agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,
-- Log Entry
level task_log_level DEFAULT 'INFO',
event VARCHAR(100) NOT NULL, -- 'state_transition', 'progress_update', 'error', etc.
message TEXT,
details JSONB DEFAULT '{}',
-- State Snapshot
previous_status task_orchestration_status,
new_status task_orchestration_status,
-- Context
session_key VARCHAR(255),
stack_trace TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_task_logs_task ON agent_task_logs(task_id, created_at DESC);
CREATE INDEX idx_task_logs_workspace ON agent_task_logs(workspace_id, created_at DESC);
CREATE INDEX idx_task_logs_level ON agent_task_logs(level) WHERE level IN ('ERROR', 'CRITICAL');
CREATE INDEX idx_task_logs_event ON agent_task_logs(event);
agent_heartbeats
Short-lived records for health monitoring (TTL enforced in Valkey).
CREATE TABLE agent_heartbeats (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
-- Health Data
status VARCHAR(50) NOT NULL, -- 'healthy', 'degraded', 'stale'
current_task_id UUID REFERENCES agent_tasks(id) ON DELETE SET NULL,
progress_percent INT,
-- Resource Usage
memory_mb INT,
cpu_percent INT,
-- Timing
last_seen_at TIMESTAMPTZ DEFAULT NOW(),
next_expected_at TIMESTAMPTZ,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_heartbeats_agent ON agent_heartbeats(agent_id, last_seen_at DESC);
CREATE INDEX idx_heartbeats_stale ON agent_heartbeats(last_seen_at) WHERE status != 'healthy';
-- Cleanup function (run periodically)
CREATE OR REPLACE FUNCTION cleanup_old_heartbeats()
RETURNS void AS $$
BEGIN
DELETE FROM agent_heartbeats
WHERE last_seen_at < NOW() - INTERVAL '1 hour';
END;
$$ LANGUAGE plpgsql;
Updates to Existing Tables
Extend agents table
-- Add orchestration fields to existing agents table
ALTER TABLE agents ADD COLUMN IF NOT EXISTS coordinator_enabled BOOLEAN DEFAULT true;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS max_concurrent_tasks INT DEFAULT 1;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS heartbeat_interval_seconds INT DEFAULT 30;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS stale_threshold_seconds INT DEFAULT 180; -- 3 minutes
ALTER TABLE agents ADD COLUMN IF NOT EXISTS capabilities VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[];
CREATE INDEX idx_agents_coordinator ON agents(coordinator_enabled) WHERE coordinator_enabled = true;
Valkey/Redis Key Patterns
Valkey is used for:
- Real-time state (fast reads/writes)
- Pub/Sub messaging (coordination events)
- Distributed locks (prevent race conditions)
- Session data (ephemeral context)
Key Naming Convention
{namespace}:{workspace_id}:{entity_type}:{entity_id}:{property}
Key Patterns
Task Queue
# Pending tasks (sorted set by priority + timestamp)
ZADD tasks:pending:{workspace_id} {priority}:{timestamp} {task_id}
# Assigned tasks (hash per agent)
HSET agent:tasks:{agent_id} {task_id} {assigned_at}
# Active sessions (set)
SADD tasks:active:{workspace_id} {task_id}:{session_key}
Agent Status
# Agent heartbeat (string with TTL)
SETEX agent:heartbeat:{agent_id} 60 "{\"status\":\"running\",\"task_id\":\"{task_id}\",\"timestamp\":{ts}}"
# Agent status (hash)
HSET agent:status:{agent_id}
status "running"
current_task "{task_id}"
last_heartbeat {timestamp}
# Stale agents (sorted set by last heartbeat)
ZADD coordinator:stale_agents {timestamp} {agent_id}
Coordination
# Coordinator lock (to prevent multiple coordinators)
SET coordinator:lock:{workspace_id} {coordinator_instance_id} NX EX 30
# Task assignment lock
SET task:assign_lock:{task_id} {agent_id} NX EX 5
# Recovery queue (list)
LPUSH coordinator:recovery_queue:{workspace_id} {task_id}
Pub/Sub Channels
# Task events
PUBLISH tasks:events:{workspace_id} "{\"event\":\"task_completed\",\"task_id\":\"{task_id}\"}"
# Agent events
PUBLISH agents:events:{workspace_id} "{\"event\":\"agent_stale\",\"agent_id\":\"{agent_id}\"}"
# Coordinator commands
PUBLISH coordinator:commands:{workspace_id} "{\"command\":\"reassign_task\",\"task_id\":\"{task_id}\"}"
Session Data (TTL: 1 hour)
# Session context (hash with TTL)
HSET session:context:{session_key}
workspace_id "{workspace_id}"
agent_id "{agent_id}"
task_id "{task_id}"
started_at {timestamp}
EXPIRE session:context:{session_key} 3600
Data Lifecycle
| Key Type | TTL | Cleanup Strategy |
|---|---|---|
agent:heartbeat:* |
60s | Auto-expire |
agent:status:* |
None | Delete on agent termination |
session:context:* |
1h | Auto-expire |
tasks:pending:* |
None | Remove on assignment |
coordinator:lock:* |
30s | Auto-expire (renewed by active coordinator) |
task:assign_lock:* |
5s | Auto-expire after assignment |
API Endpoints
Task Management
// Create a new orchestration task
POST /api/v1/agent-tasks
{
"title": "Fix TypeScript strict errors in U-Connect",
"description": "...",
"taskType": "development",
"priority": 8,
"inputContext": {
"repository": "u-connect",
"branch": "main",
"scope": ["packages/shared"]
},
"dependsOn": [],
"maxRetries": 3
}
// List tasks (with filtering)
GET /api/v1/agent-tasks?status=RUNNING&priority>=8&tags=typescript
// Get task details
GET /api/v1/agent-tasks/:id
// Update task (limited fields)
PATCH /api/v1/agent-tasks/:id
{
"priority": 10,
"status": "PAUSED"
}
// Cancel task
POST /api/v1/agent-tasks/:id/cancel
{
"reason": "Requirements changed"
}
// Retry failed task
POST /api/v1/agent-tasks/:id/retry
// Get task logs
GET /api/v1/agent-tasks/:id/logs?level=ERROR&limit=100
Agent Management
// List agents
GET /api/v1/agents?status=WORKING&coordinatorEnabled=true
// Get agent details
GET /api/v1/agents/:id
// Register/update agent
PUT /api/v1/agents/:agentId
{
"name": "Dev Agent #1",
"model": "claude-sonnet-4",
"capabilities": ["typescript", "nestjs", "prisma"],
"maxConcurrentTasks": 2,
"heartbeatIntervalSeconds": 30
}
// Get agent health
GET /api/v1/agents/:id/health
// Get agent tasks
GET /api/v1/agents/:id/tasks?status=RUNNING
// Terminate agent
POST /api/v1/agents/:id/terminate
{
"reason": "Manual shutdown",
"graceful": true // Allow task completion
}
Coordination
// Coordinator status (internal/admin only)
GET /api/v1/coordinator/status
{
"active": true,
"instanceId": "coord-abc123",
"workspaces": ["ws-1", "ws-2"],
"lastHeartbeat": "2025-01-29T14:30:00Z",
"stats": {
"pendingTasks": 3,
"runningTasks": 5,
"healthyAgents": 8,
"staleAgents": 1
}
}
// Force recovery check (admin only)
POST /api/v1/coordinator/recover
{
"workspaceId": "optional - all if omitted"
}
// Get coordination logs (admin only)
GET /api/v1/coordinator/logs?workspace={id}&level=ERROR
Webhooks (for external integration)
// Task state change webhook
POST {configured_webhook_url}
{
"event": "task.status_changed",
"taskId": "task-123",
"previousStatus": "RUNNING",
"newStatus": "COMPLETED",
"workspaceId": "ws-1",
"timestamp": "2025-01-29T14:30:00Z"
}
Coordinator Service Design
Core Responsibilities
- Health Monitoring — Check agent heartbeats, mark stale agents
- Task Assignment — Match pending tasks to available agents
- Recovery Orchestration — Reassign tasks from failed/stale agents
- Dependency Resolution — Ensure tasks wait for dependencies
- Resource Management — Enforce agent concurrency limits
Architecture
// Coordinator Service (NestJS)
@Injectable()
export class CoordinatorService {
private coordinatorLock: string;
private isRunning: boolean = false;
constructor(
private readonly taskManager: TaskManagerService,
private readonly agentManager: AgentManagerService,
private readonly valkey: Valkey,
private readonly prisma: PrismaService,
private readonly logger: Logger
) {}
// Main coordination loop
@Cron("*/30 * * * * *") // Every 30 seconds
async coordinate() {
if (!(await this.acquireLock())) {
return; // Another coordinator is active
}
try {
await this.checkAgentHealth();
await this.assignPendingTasks();
await this.resolveDependencies();
await this.recoverFailedTasks();
} catch (error) {
this.logger.error("Coordination cycle failed", error);
} finally {
await this.releaseLock();
}
}
// Distributed lock to prevent multiple coordinators
private async acquireLock(): Promise<boolean> {
const lockKey = `coordinator:lock:global`;
const result = await this.valkey.set(
lockKey,
process.env.HOSTNAME || "coordinator",
"NX",
"EX",
30
);
return result === "OK";
}
// Check agent heartbeats and mark stale
private async checkAgentHealth() {
const agents = await this.agentManager.getCoordinatorManagedAgents();
const now = Date.now();
for (const agent of agents) {
const heartbeatKey = `agent:heartbeat:${agent.id}`;
const lastHeartbeat = await this.valkey.get(heartbeatKey);
if (!lastHeartbeat) {
// No heartbeat - agent is stale
await this.handleStaleAgent(agent);
} else {
const heartbeatData = JSON.parse(lastHeartbeat);
const age = now - heartbeatData.timestamp;
if (age > agent.staleThresholdSeconds * 1000) {
await this.handleStaleAgent(agent);
}
}
}
}
// Assign pending tasks to available agents
private async assignPendingTasks() {
const workspaces = await this.getActiveWorkspaces();
for (const workspace of workspaces) {
const pendingTasks = await this.taskManager.getPendingTasks(workspace.id, {
orderBy: { priority: "desc", createdAt: "asc" },
});
for (const task of pendingTasks) {
// Check dependencies
if (!(await this.areDependenciesMet(task))) {
continue;
}
// Find available agent
const agent = await this.agentManager.findAvailableAgent(
workspace.id,
task.taskType,
task.metadata.requiredCapabilities
);
if (agent) {
await this.assignTask(task, agent);
}
}
}
}
// Handle stale agents
private async handleStaleAgent(agent: Agent) {
this.logger.warn(`Agent ${agent.id} is stale - recovering tasks`);
// Mark agent as ERROR
await this.agentManager.updateAgentStatus(agent.id, AgentStatus.ERROR);
// Get assigned tasks
const tasks = await this.taskManager.getTasksForAgent(agent.id);
for (const task of tasks) {
await this.recoverTask(task, "agent_stale");
}
}
// Recover a task from failure
private async recoverTask(task: AgentTask, reason: string) {
// Log the failure
await this.taskManager.logTaskEvent(task.id, {
level: "ERROR",
event: "task_recovery",
message: `Task recovery initiated: ${reason}`,
previousStatus: task.status,
newStatus: "ABORTED",
});
// Check retry limit
if (task.retryCount >= task.maxRetries) {
await this.taskManager.updateTask(task.id, {
status: "FAILED",
lastError: `Max retries exceeded (${task.retryCount}/${task.maxRetries})`,
failedAt: new Date(),
});
return;
}
// Abort current assignment
await this.taskManager.updateTask(task.id, {
status: "ABORTED",
agentId: null,
sessionKey: null,
retryCount: task.retryCount + 1,
});
// Wait for backoff period before requeuing
const backoffMs = task.retryBackoffSeconds * 1000 * Math.pow(2, task.retryCount);
setTimeout(async () => {
await this.taskManager.updateTask(task.id, {
status: "PENDING",
});
}, backoffMs);
}
// Assign task to agent
private async assignTask(task: AgentTask, agent: Agent) {
// Acquire assignment lock
const lockKey = `task:assign_lock:${task.id}`;
const locked = await this.valkey.set(lockKey, agent.id, "NX", "EX", 5);
if (!locked) {
return; // Another coordinator already assigned this task
}
try {
// Update task
await this.taskManager.updateTask(task.id, {
status: "ASSIGNED",
agentId: agent.id,
assignedAt: new Date(),
});
// Spawn agent session via Gateway
const session = await this.spawnAgentSession(agent, task);
// Update task with session
await this.taskManager.updateTask(task.id, {
sessionKey: session.sessionKey,
status: "RUNNING",
startedAt: new Date(),
});
// Log assignment
await this.taskManager.logTaskEvent(task.id, {
level: "INFO",
event: "task_assigned",
message: `Task assigned to agent ${agent.id}`,
details: { agentId: agent.id, sessionKey: session.sessionKey },
});
} finally {
await this.valkey.del(lockKey);
}
}
// Spawn agent session via Gateway
private async spawnAgentSession(agent: Agent, task: AgentTask): Promise<AgentSession> {
// Call Gateway API to spawn subagent with task context
const response = await fetch(`${process.env.GATEWAY_URL}/api/agents/spawn`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
workspaceId: task.workspaceId,
agentId: agent.id,
taskId: task.id,
label: `task:${task.id}:${task.title.slice(0, 30)}`,
context: {
taskTitle: task.title,
taskDescription: task.description,
inputContext: task.inputContext,
checkpointData: task.checkpointData,
},
}),
});
const data = await response.json();
return data.session;
}
}
Coordination State Machine
┌─────────────────────────────────────────────────────────┐
│ Coordinator Cycle │
└─────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────┐
│ Acquire Lock (30s) │
└───────────┬───────────┘
│
┌────────▼────────┐
│ Check Health │
│ - Read agents │
│ - Check ttls │
│ - Mark stale │
└────────┬────────┘
│
┌────────▼────────┐
│ Assign Tasks │
│ - Read pending │
│ - Match agents │
│ - Spawn sessions│
└────────┬────────┘
│
┌────────▼────────┐
│ Resolve Deps │
│ - Check waiting │
│ - Unblock ready │
└────────┬────────┘
│
┌────────▼────────┐
│ Recover Failed │
│ - Requeue retry │
│ - Log failures │
└────────┬────────┘
│
┌───────────▼───────────┐
│ Release Lock │
└───────────────────────┘
Recovery & Resilience Patterns
1. Agent Failure Recovery
Scenario: Agent crashes mid-task.
Detection:
- Heartbeat TTL expires in Valkey
- Coordinator detects missing heartbeat
Recovery:
- Mark agent as
ERRORin database - Abort assigned tasks with
status = ABORTED - Log failure with stack trace (if available)
- Apply exponential backoff:
backoff = retryBackoffSeconds * 2^retryCount - Requeue task to
PENDINGafter backoff - Coordinator reassigns on next cycle
2. Gateway Restart Recovery
Scenario: Gateway restarts, killing all agent sessions.
Detection:
- All agent heartbeats stop simultaneously
- Coordinator detects mass stale agents
Recovery:
- Coordinator marks all
RUNNINGtasks asABORTED - Tasks with
checkpointDatacan resume from last checkpoint - Tasks without checkpoints restart from scratch
- Exponential backoff prevents thundering herd
3. Task Dependency Deadlock
Scenario: Task A depends on Task B, which depends on Task A (circular dependency).
Detection:
- Coordinator builds dependency graph
- Detects cycles during
resolveDependencies()
Recovery:
- Log
ERRORwith cycle details - Mark all tasks in cycle as
FAILEDwith reasondependency_cycle - Notify workspace owner via webhook
4. Database Failure
Scenario: PostgreSQL becomes unavailable.
Detection:
- Prisma query fails with connection error
Recovery:
- Coordinator catches error, logs to stderr
- Releases lock (allowing failover to another instance)
- Retries with exponential backoff: 5s, 10s, 20s, 40s
- If database remains down >5 minutes, alert admin
5. Split-Brain Prevention
Scenario: Network partition causes two coordinators to run simultaneously.
Prevention:
- Distributed lock in Valkey with 30s TTL
- Coordinators must renew lock every cycle
- Only one coordinator can hold lock at a time
Detection:
- Task assigned to multiple agents (conflict detection)
Recovery:
- Newer assignment wins (based on
assignedAttimestamp) - Cancel older session
- Log conflict for investigation
6. Stale Task Timeout
Scenario: Task runs longer than estimatedCompletionAt + grace period.
Detection:
- Coordinator checks
estimatedCompletionAtfield - If exceeded by >30 minutes, mark as potentially hung
Recovery:
- Send warning to agent session (via pub/sub)
- If no progress update in 10 minutes, abort task
- Log timeout error
- Requeue with higher priority
Usage Budget Management
Version: 1.0 Date: 2026-02-04 Status: Required for MVP
Problem Statement
Autonomous agents using Claude Code can consume significant API tokens without proper governance. Without real-time usage tracking and budgeting, projects risk:
- Cost overruns — Agents exceed budget before milestone completion
- Service disruption — Hit API rate limits mid-task
- Unpredictable momentum — Can't estimate project velocity
- Budget exhaustion — Agents consume entire monthly budget in days
Requirements
The orchestration layer must provide:
- Real-time usage tracking — Current token usage across all active agents
- Budget allocation — Pre-allocate budgets per task/milestone/project
- Usage projection — Estimate remaining work vs remaining budget
- Throttling decisions — Pause/slow agents approaching limits
- Cost optimization — Route tasks to appropriate model tiers (Haiku/Sonnet/Opus)
Architecture
┌─────────────────────────────────────────────────────────┐
│ Usage Budget Manager Service │
│ ┌───────────────────┐ ┌────────────────────────────┐ │
│ │ Usage Tracker │ │ Budget Allocator │ │
│ │ - Query API │ │ - Per-task budgets │ │
│ │ - Real-time sum │ │ - Per-milestone budgets │ │
│ │ - Agent rollup │ │ - Global limits │ │
│ └────────┬──────────┘ └──────────┬─────────────────┘ │
│ │ │ │
│ ┌────────▼────────────────────────▼─────────────────┐ │
│ │ Projection Engine │ │
│ │ - Estimate remaining work │ │
│ │ - Calculate burn rate │ │
│ │ - Predict budget exhaustion │ │
│ │ - Recommend throttle/pause │ │
│ └───────────────────────────┬───────────────────────┘ │
└────────────────────────────────┼─────────────────────────┘
│
┌────────────┼────────────┐
│ │ │
┌───────▼──────┐ ┌──▼──────┐ ┌──▼────────────┐
│Queue Manager │ │Agent Mgr│ │ Coordinator │
│- Check budget│ │- Spawn │ │ - Pre-commit │
│ before queue│ │ check │ │ validation │
└──────────────┘ └─────────┘ └───────────────┘
Budget Check Points
1. Task Assignment (Queue Manager)
async canAffordTask(taskId: string): Promise<BudgetDecision> {
const task = await getTask(taskId);
const currentUsage = await usageTracker.getCurrentUsage();
const budget = await budgetAllocator.getTaskBudget(taskId);
const projectedCost = estimateTaskCost(task);
const remaining = budget.limit - currentUsage.total;
if (projectedCost > remaining) {
return {
canProceed: false,
reason: 'Insufficient budget',
recommendation: 'Pause or reallocate budget',
};
}
return { canProceed: true };
}
2. Agent Spawn (Agent Manager)
Before spawning a Claude Code agent, verify budget headroom:
async spawnAgent(config: AgentConfig): Promise<Agent> {
const budgetCheck = await usageBudgetManager.canAffordTask(config.taskId);
if (!budgetCheck.canProceed) {
throw new InsufficientBudgetError(budgetCheck.reason);
}
// Proceed with spawn
const agent = await claudeCode.spawn(config);
// Track agent for usage rollup
await usageTracker.registerAgent(agent.id, config.taskId);
return agent;
}
3. Checkpoint Intervals (Coordinator)
During task execution, periodically verify budget compliance:
async checkpointBudgetCompliance(taskId: string): Promise<void> {
const usage = await usageTracker.getTaskUsage(taskId);
const budget = await budgetAllocator.getTaskBudget(taskId);
const percentUsed = (usage.current / budget.allocated) * 100;
if (percentUsed > 90) {
await coordinator.sendWarning(taskId, 'Approaching budget limit');
}
if (percentUsed > 100) {
await coordinator.pauseTask(taskId, 'Budget exceeded');
await notifyUser(taskId, 'Task paused: budget exhausted');
}
}
4. Pre-commit Validation (Quality Gates)
Before committing work, verify usage is reasonable:
async validateUsageEfficiency(taskId: string): Promise<ValidationResult> {
const usage = await usageTracker.getTaskUsage(taskId);
const linesChanged = await git.getChangedLines(taskId);
const testsAdded = await git.getTestFiles(taskId);
// Cost per line heuristic (adjust based on learnings)
const expectedTokensPerLine = 150; // baseline + TDD overhead
const expectedUsage = linesChanged * expectedTokensPerLine;
const efficiency = expectedUsage / usage.current;
if (efficiency < 0.5) {
return {
valid: false,
reason: 'Usage appears inefficient',
recommendation: 'Review agent logs for token waste',
};
}
return { valid: true };
}
Data Model
usage_budgets Table
CREATE TABLE usage_budgets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id),
-- Scope: 'global', 'project', 'milestone', 'task'
scope VARCHAR(20) NOT NULL,
scope_id VARCHAR(100), -- project_id, milestone_id, or task_id
-- Budget limits (tokens)
allocated BIGINT NOT NULL,
consumed BIGINT NOT NULL DEFAULT 0,
remaining BIGINT GENERATED ALWAYS AS (allocated - consumed) STORED,
-- Tracking
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_usage_budgets_scope ON usage_budgets(scope, scope_id);
CREATE INDEX idx_usage_budgets_workspace ON usage_budgets(workspace_id);
agent_usage_logs Table
CREATE TABLE agent_usage_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id),
agent_session_id UUID NOT NULL,
task_id UUID REFERENCES agent_tasks(id),
-- Usage details
input_tokens BIGINT NOT NULL,
output_tokens BIGINT NOT NULL,
total_tokens BIGINT NOT NULL,
model VARCHAR(100) NOT NULL, -- 'claude-sonnet-4', 'claude-haiku-3.5', etc.
-- Cost tracking
estimated_cost_usd DECIMAL(10, 6),
-- Context
operation VARCHAR(100), -- 'task_execution', 'quality_review', etc.
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_agent_usage_task ON agent_usage_logs(task_id);
CREATE INDEX idx_agent_usage_session ON agent_usage_logs(agent_session_id);
CREATE INDEX idx_agent_usage_workspace ON agent_usage_logs(workspace_id);
Valkey/Redis Keys
# Current usage (real-time)
usage:current:{workspace_id} -> HASH
{
"total_tokens": "1234567",
"total_cost_usd": "12.34",
"last_updated": "2026-02-04T10:30:00Z"
}
# Per-task usage
usage:task:{task_id} -> HASH
{
"allocated": "50000",
"consumed": "23456",
"remaining": "26544",
"agents": ["agent-1", "agent-2"]
}
# Budget alerts
usage:alerts:{workspace_id} -> LIST
[
{
"task_id": "task-123",
"level": "warning",
"percent_used": 92,
"message": "Task approaching budget limit"
}
]
Cost Estimation Formulas
Based on autonomous execution learnings:
function estimateTaskCost(task: Task): number {
const baselineTokens = task.estimatedComplexity * 1000; // tokens per complexity point
// Overhead factors
const tddOverhead = 1.2; // +20% for test writing
const baselineBuffer = 1.3; // +30% general buffer
const phaseBuffer = 1.15; // +15% phase-specific uncertainty
const estimated = baselineTokens * tddOverhead * baselineBuffer * phaseBuffer;
return Math.ceil(estimated);
}
Model Tier Optimization
Route tasks to appropriate model tiers for cost efficiency:
| Model | Cost/MTok (input) | Cost/MTok (output) | Use Case |
|---|---|---|---|
| Claude Haiku 3.5 | $0.80 | $4.00 | Simple CRUD, boilerplate, linting |
| Claude Sonnet 4 | $3.00 | $15.00 | Standard development, refactoring |
| Claude Opus 4 | $15.00 | $75.00 | Complex architecture, critical fixes |
Routing logic:
function selectModel(task: Task): ModelTier {
if (task.priority === "critical" || task.complexity > 8) {
return "opus";
}
if (task.type === "boilerplate" || task.estimatedTokens < 10000) {
return "haiku";
}
return "sonnet"; // default
}
Projection Algorithm
Predict budget exhaustion:
async function projectBudgetExhaustion(workspaceId: string): Promise<Projection> {
const usage = await usageTracker.getCurrentUsage(workspaceId);
const budget = await budgetAllocator.getGlobalBudget(workspaceId);
const dailyBurnRate = usage.tokensLast24h;
const daysRemaining = budget.remaining / dailyBurnRate;
const exhaustionDate = new Date();
exhaustionDate.setDate(exhaustionDate.getDate() + daysRemaining);
return {
remaining_tokens: budget.remaining,
daily_burn_rate: dailyBurnRate,
days_until_exhaustion: daysRemaining,
projected_exhaustion_date: exhaustionDate,
recommendation: daysRemaining < 7 ? "THROTTLE" : "CONTINUE",
};
}
Implementation Priority
MVP (M6 Phase 3):
- ✅ Basic usage tracking (log tokens per task)
- ✅ Simple budget checks (can afford this task?)
- ✅ Alert on budget exceeded
Post-MVP (M6 Phase 5):
- Projection engine (when will budget run out?)
- Model tier optimization (Haiku/Sonnet/Opus routing)
- Historical analysis (actual vs estimated)
- Budget reallocation (move budget between projects)
Success Metrics
- Budget accuracy: Estimated vs actual within 20%
- Cost optimization: 40%+ savings from model tier routing
- No surprise exhaustion: Zero instances of unexpected budget depletion
- Steady momentum: Projects maintain velocity without budget interruptions
Implementation Phases
Phase 1: Foundation (Week 1-2)
Goal: Basic task and agent models, no coordination yet.
Deliverables:
- Database schema migration (tables, indexes)
- Prisma models for
AgentTask,AgentTaskLog,AgentHeartbeat - Basic CRUD API endpoints for tasks
- Agent registration API
- Manual task assignment (no automation)
Testing:
- Unit tests for task state machine
- Integration tests for task CRUD
- Manual testing: create task, assign to agent, complete
Phase 2: Coordination Core (Week 3-4)
Goal: Autonomous coordinator with health monitoring.
Deliverables:
CoordinatorServicewith distributed locking- Health monitoring (heartbeat TTL checks)
- Automatic task assignment to available agents
- Valkey integration for runtime state
- Pub/Sub for coordination events
Testing:
- Unit tests for coordinator logic
- Integration tests with Valkey
- Chaos testing: kill agents, verify recovery
- Load testing: 10+ agents, 50+ tasks
Phase 3: Recovery & Resilience (Week 5)
Goal: Fault-tolerant operation with automatic recovery.
Deliverables:
- Agent failure detection and task recovery
- Exponential backoff for retries
- Checkpoint/resume support for long-running tasks
- Dependency resolution
- Deadlock detection
Testing:
- Fault injection: kill agents, restart Gateway
- Dependency cycle testing
- Retry exhaustion testing
- Split-brain simulation
Phase 4: Observability (Week 6)
Goal: Full visibility into orchestration state.
Deliverables:
- Coordinator status dashboard
- Task progress tracking UI
- Real-time logs API
- Metrics export (Prometheus format)
- Webhook integration for external monitoring
Testing:
- Load testing with metrics collection
- Dashboard usability testing
- Webhook reliability testing
Phase 5: Advanced Features (Future)
Goal: Production-grade features.
Deliverables:
- Task prioritization algorithms (SJF, priority queues)
- Agent capability matching (skills-based routing)
- Task batching (group similar tasks)
- Cost tracking (token usage, compute time)
- Multi-region coordination (geo-distributed agents)
Security Considerations
Row-Level Security (RLS)
All queries must enforce workspace isolation:
// Example: Only return tasks for user's workspace
async getTasks(userId: string, workspaceId: string) {
const membership = await this.prisma.workspaceMember.findUnique({
where: { workspaceId_userId: { workspaceId, userId } }
});
if (!membership) {
throw new ForbiddenException('Not a member of this workspace');
}
return this.prisma.agentTask.findMany({
where: { workspaceId }
});
}
API Authentication
- All endpoints require valid session token
- Coordinator endpoints restricted to
ADMINorOWNERroles - Agent heartbeat endpoints use JWT with agent-specific claims
Secrets Management
- Task
inputContextmay contain secrets (API keys, passwords) - Encrypt sensitive fields at rest using Prisma middleware
- Never log secrets in task logs
- Redact secrets in API responses
Monitoring & Alerting
Key Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
coordinator.cycle.duration_ms |
Coordination cycle execution time | >5000ms |
coordinator.stale_agents.count |
Number of stale agents detected | >5 |
tasks.pending.count |
Tasks waiting for assignment | >50 |
tasks.failed.count |
Total failed tasks (last 1h) | >10 |
tasks.retry.exhausted.count |
Tasks exceeding max retries | >0 |
agents.spawned.count |
Agent spawn rate | >100/min |
valkey.connection.errors |
Valkey connection failures | >0 |
Health Checks
GET /health/coordinator
{
"status": "healthy",
"coordinator": {
"active": true,
"lastCycle": "2025-01-29T14:30:00Z",
"cycleCount": 1234
},
"database": "connected",
"valkey": "connected",
"gateway": "reachable"
}
API Integration Examples
Creating a Task from Main Agent
// Main agent creates a development task
const task = await fetch("/api/v1/agent-tasks", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${sessionToken}`,
},
body: JSON.stringify({
title: "Fix TypeScript strict errors in U-Connect",
description: "Run tsc --noEmit, fix all errors, commit changes",
taskType: "development",
priority: 8,
inputContext: {
repository: "u-connect",
branch: "main",
commands: ["pnpm install", "pnpm tsc:check"],
},
maxRetries: 2,
estimatedDurationMinutes: 30,
}),
});
const { id } = await task.json();
console.log(`Task created: ${id}`);
Agent Heartbeat (from spawned subagent)
// Subagent sends heartbeat every 30s
setInterval(async () => {
await fetch(`/api/v1/agents/${agentId}/heartbeat`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${agentToken}`,
},
body: JSON.stringify({
status: "healthy",
currentTaskId: taskId,
progressPercent: 45,
currentStep: "Running tsc --noEmit",
memoryMb: 512,
cpuPercent: 35,
}),
});
}, 30000);
Task Progress Update
// Agent updates task progress
await fetch(`/api/v1/agent-tasks/${taskId}/progress`, {
method: "PATCH",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${agentToken}`,
},
body: JSON.stringify({
progressPercent: 70,
currentStep: "Fixing type errors in packages/shared",
checkpointData: {
filesProcessed: 15,
errorsFixed: 8,
remainingFiles: 5,
},
}),
});
Task Completion
// Agent marks task complete
await fetch(`/api/v1/agent-tasks/${taskId}/complete`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${agentToken}`,
},
body: JSON.stringify({
outputResult: {
filesModified: 20,
errorsFixed: 23,
commitHash: "abc123",
buildStatus: "passing",
},
summary: "All TypeScript strict errors resolved. Build passing.",
}),
});
Glossary
| Term | Definition |
|---|---|
| Agent | Autonomous AI instance (e.g., Claude subagent) that executes tasks |
| Task | Unit of work to be executed by an agent |
| Coordinator | Background service that assigns tasks and monitors agent health |
| Heartbeat | Periodic signal from agent indicating it's alive and working |
| Stale Agent | Agent that has stopped sending heartbeats (assumed dead) |
| Checkpoint | Snapshot of task state allowing resumption after failure |
| Workspace | Tenant isolation boundary (all tasks belong to a workspace) |
| Session | Gateway-managed connection between user and agent |
| Orchestration | Automated coordination of multiple agents working on tasks |
References
- Mosaic Stack Architecture
- Existing Agent/AgentSession Schema
- NestJS Task Scheduling
- Valkey Documentation
- PostgreSQL Advisory Locks
Next Steps:
- Review and approve this design document
- Create GitHub issues for Phase 1 tasks
- Set up development branch:
feature/agent-orchestration - Begin database schema migration
Questions/Feedback: Open an issue in mosaic-stack repo with label orchestration.