# Agent Orchestration Layer **Version:** 1.0 **Status:** Design Phase **Author:** Mosaic Stack Team **Date:** 2025-01-29 ## Table of Contents 1. [Problem Statement](#problem-statement) 2. [Architecture Overview](#architecture-overview) 3. [Database Schema](#database-schema) 4. [Valkey/Redis Key Patterns](#valkeyredis-key-patterns) 5. [API Endpoints](#api-endpoints) 6. [Coordinator Service Design](#coordinator-service-design) 7. [Recovery & Resilience Patterns](#recovery--resilience-patterns) 8. [Implementation Phases](#implementation-phases) --- ## Problem Statement ### Current Limitations 1. **No Persistent Orchestration** - Agents spawned in sessions have no lifecycle management beyond the session - Work continuity is lost when Gateway restarts or sessions end - No coordination between agents working on the same project 2. **No Health Monitoring** - No visibility into agent status after spawn - Failed agents leave tasks in limbo - No automated recovery from agent failures 3. **No Task Persistence** - Task state only exists in session memory - Users can't track long-running work - No audit trail of agent activity 4. **Conversation Interference** - Main session conversations derail active development work - Context switching disrupts agent focus - No separation between user chat and background work ### Requirements The Agent Orchestration Layer must provide: - **Persistent Task Management** — Task definitions, state, and history survive restarts - **Autonomous Coordination** — Independent of main session; agents work continuously - **Health Monitoring** — Real-time tracking of agent status and progress - **Automatic Recovery** — Detect failures and resume work intelligently - **Complete Audit Trail** — Every state transition logged for debugging and accountability - **Multi-Workspace Support** — Row-level security and tenant isolation --- ## Architecture Overview ### High-Level Design ``` ┌─────────────────────────────────────────────────────────────────┐ │ API Layer │ │ (NestJS Controllers + Guards + Interceptors) │ └────────────┬─────────────────────────────────────┬──────────────┘ │ │ │ │ ┌────────▼────────┐ ┌────────▼─────────┐ │ Task Manager │ │ Agent Manager │ │ Service │ │ Service │ └────────┬────────┘ └────────┬─────────┘ │ │ │ ┌───────────────────┐ │ └─────────► Coordinator ◄──────┘ │ Service │ └─────────┬─────────┘ │ ┌────────────────────┼────────────────────┐ │ │ │ ┌───────▼────────┐ ┌────────▼────────┐ ┌──────▼──────┐ │ PostgreSQL │ │ Valkey/Redis │ │ Gateway │ │ (Persistent) │ │ (Runtime) │ │ (Agent │ │ │ │ │ │ Spawner) │ └────────────────┘ └─────────────────┘ └─────────────┘ ``` ### Component Responsibilities | Component | Responsibility | | ----------------- | --------------------------------------------------------------- | | **Task Manager** | CRUD operations on tasks, state transitions, assignment logic | | **Agent Manager** | Agent lifecycle, health tracking, session management | | **Coordinator** | Heartbeat processing, failure detection, recovery orchestration | | **PostgreSQL** | Persistent storage of tasks, agents, sessions, logs | | **Valkey/Redis** | Runtime state, heartbeats, quick lookups, pub/sub | | **Gateway** | Agent spawning, session management, message routing | --- ## Database Schema ### Task State Machine ``` ┌──────────────┐ │ PENDING │ └──────┬───────┘ │ ┌──────▼───────┐ ┌────┤ ASSIGNED ├────┐ │ └──────┬───────┘ │ │ │ │ ┌──────▼───────┐ │ ┌────────▼──────┐ │ RUNNING │ │ │ PAUSED │ └──────┬───────┘ │ └────────┬──────┘ │ │ │ │ ┌──────▼───────┐ │ └────► COMPLETED ◄────┘ └──────┬───────┘ │ ┌──────▼───────┐ │ ARCHIVED │ └──────────────┘ ┌──────────────┐ │ FAILED │ └──────┬───────┘ │ ┌──────▼───────┐ │ ABORTED │────┐ └──────────────┘ │ │ │ └────────────┘ (retry → PENDING) ``` ### New Tables #### `agent_tasks` Extends existing task management with orchestration-specific fields. ```sql CREATE TYPE task_orchestration_status AS ENUM ( 'PENDING', -- Awaiting assignment 'ASSIGNED', -- Assigned to an agent 'RUNNING', -- Agent actively working 'PAUSED', -- User paused 'COMPLETED', -- Successfully finished 'FAILED', -- Failed with error 'ABORTED', -- Terminated (stale agent, manual cancel) 'ARCHIVED' -- Historical record ); CREATE TABLE agent_tasks ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, -- Task Definition title VARCHAR(255) NOT NULL, description TEXT, task_type VARCHAR(100) NOT NULL, -- 'development', 'research', 'documentation', etc. -- Status & Priority status task_orchestration_status DEFAULT 'PENDING', priority INT DEFAULT 5, -- 1 (low) to 10 (high) -- Assignment agent_id UUID REFERENCES agents(id) ON DELETE SET NULL, session_key VARCHAR(255), -- Current active session -- Progress Tracking progress_percent INT DEFAULT 0 CHECK (progress_percent BETWEEN 0 AND 100), current_step TEXT, estimated_completion_at TIMESTAMPTZ, -- Retry Logic retry_count INT DEFAULT 0, max_retries INT DEFAULT 3, retry_backoff_seconds INT DEFAULT 300, -- 5 minutes last_error TEXT, -- Dependencies depends_on UUID[] DEFAULT ARRAY[]::UUID[], -- Array of task IDs blocks UUID[] DEFAULT ARRAY[]::UUID[], -- Tasks blocked by this one -- Context input_context JSONB DEFAULT '{}', -- Input data/params for agent output_result JSONB, -- Final result from agent checkpoint_data JSONB DEFAULT '{}', -- Resumable state -- Metadata metadata JSONB DEFAULT '{}', tags VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[], -- Audit created_by UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), assigned_at TIMESTAMPTZ, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, failed_at TIMESTAMPTZ, CONSTRAINT fk_workspace FOREIGN KEY (workspace_id) REFERENCES workspaces(id), CONSTRAINT fk_agent FOREIGN KEY (agent_id) REFERENCES agents(id) ); CREATE INDEX idx_agent_tasks_workspace ON agent_tasks(workspace_id); CREATE INDEX idx_agent_tasks_status ON agent_tasks(status) WHERE status IN ('PENDING', 'ASSIGNED', 'RUNNING'); CREATE INDEX idx_agent_tasks_agent ON agent_tasks(agent_id) WHERE agent_id IS NOT NULL; CREATE INDEX idx_agent_tasks_priority ON agent_tasks(priority DESC, created_at ASC) WHERE status = 'PENDING'; CREATE INDEX idx_agent_tasks_depends_on ON agent_tasks USING gin(depends_on); CREATE INDEX idx_agent_tasks_tags ON agent_tasks USING gin(tags); ``` #### `agent_task_logs` Immutable audit trail of all task state transitions and agent actions. ```sql CREATE TYPE task_log_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'); CREATE TABLE agent_task_logs ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, task_id UUID NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE, agent_id UUID REFERENCES agents(id) ON DELETE SET NULL, -- Log Entry level task_log_level DEFAULT 'INFO', event VARCHAR(100) NOT NULL, -- 'state_transition', 'progress_update', 'error', etc. message TEXT, details JSONB DEFAULT '{}', -- State Snapshot previous_status task_orchestration_status, new_status task_orchestration_status, -- Context session_key VARCHAR(255), stack_trace TEXT, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX idx_task_logs_task ON agent_task_logs(task_id, created_at DESC); CREATE INDEX idx_task_logs_workspace ON agent_task_logs(workspace_id, created_at DESC); CREATE INDEX idx_task_logs_level ON agent_task_logs(level) WHERE level IN ('ERROR', 'CRITICAL'); CREATE INDEX idx_task_logs_event ON agent_task_logs(event); ``` #### `agent_heartbeats` Short-lived records for health monitoring (TTL enforced in Valkey). ```sql CREATE TABLE agent_heartbeats ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE, -- Health Data status VARCHAR(50) NOT NULL, -- 'healthy', 'degraded', 'stale' current_task_id UUID REFERENCES agent_tasks(id) ON DELETE SET NULL, progress_percent INT, -- Resource Usage memory_mb INT, cpu_percent INT, -- Timing last_seen_at TIMESTAMPTZ DEFAULT NOW(), next_expected_at TIMESTAMPTZ, metadata JSONB DEFAULT '{}' ); CREATE INDEX idx_heartbeats_agent ON agent_heartbeats(agent_id, last_seen_at DESC); CREATE INDEX idx_heartbeats_stale ON agent_heartbeats(last_seen_at) WHERE status != 'healthy'; -- Cleanup function (run periodically) CREATE OR REPLACE FUNCTION cleanup_old_heartbeats() RETURNS void AS $$ BEGIN DELETE FROM agent_heartbeats WHERE last_seen_at < NOW() - INTERVAL '1 hour'; END; $$ LANGUAGE plpgsql; ``` ### Updates to Existing Tables #### Extend `agents` table ```sql -- Add orchestration fields to existing agents table ALTER TABLE agents ADD COLUMN IF NOT EXISTS coordinator_enabled BOOLEAN DEFAULT true; ALTER TABLE agents ADD COLUMN IF NOT EXISTS max_concurrent_tasks INT DEFAULT 1; ALTER TABLE agents ADD COLUMN IF NOT EXISTS heartbeat_interval_seconds INT DEFAULT 30; ALTER TABLE agents ADD COLUMN IF NOT EXISTS stale_threshold_seconds INT DEFAULT 180; -- 3 minutes ALTER TABLE agents ADD COLUMN IF NOT EXISTS capabilities VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[]; CREATE INDEX idx_agents_coordinator ON agents(coordinator_enabled) WHERE coordinator_enabled = true; ``` --- ## Valkey/Redis Key Patterns Valkey is used for: - **Real-time state** (fast reads/writes) - **Pub/Sub messaging** (coordination events) - **Distributed locks** (prevent race conditions) - **Session data** (ephemeral context) ### Key Naming Convention ``` {namespace}:{workspace_id}:{entity_type}:{entity_id}:{property} ``` ### Key Patterns #### Task Queue ```redis # Pending tasks (sorted set by priority + timestamp) ZADD tasks:pending:{workspace_id} {priority}:{timestamp} {task_id} # Assigned tasks (hash per agent) HSET agent:tasks:{agent_id} {task_id} {assigned_at} # Active sessions (set) SADD tasks:active:{workspace_id} {task_id}:{session_key} ``` #### Agent Status ```redis # Agent heartbeat (string with TTL) SETEX agent:heartbeat:{agent_id} 60 "{\"status\":\"running\",\"task_id\":\"{task_id}\",\"timestamp\":{ts}}" # Agent status (hash) HSET agent:status:{agent_id} status "running" current_task "{task_id}" last_heartbeat {timestamp} # Stale agents (sorted set by last heartbeat) ZADD coordinator:stale_agents {timestamp} {agent_id} ``` #### Coordination ```redis # Coordinator lock (to prevent multiple coordinators) SET coordinator:lock:{workspace_id} {coordinator_instance_id} NX EX 30 # Task assignment lock SET task:assign_lock:{task_id} {agent_id} NX EX 5 # Recovery queue (list) LPUSH coordinator:recovery_queue:{workspace_id} {task_id} ``` #### Pub/Sub Channels ```redis # Task events PUBLISH tasks:events:{workspace_id} "{\"event\":\"task_completed\",\"task_id\":\"{task_id}\"}" # Agent events PUBLISH agents:events:{workspace_id} "{\"event\":\"agent_stale\",\"agent_id\":\"{agent_id}\"}" # Coordinator commands PUBLISH coordinator:commands:{workspace_id} "{\"command\":\"reassign_task\",\"task_id\":\"{task_id}\"}" ``` #### Session Data (TTL: 1 hour) ```redis # Session context (hash with TTL) HSET session:context:{session_key} workspace_id "{workspace_id}" agent_id "{agent_id}" task_id "{task_id}" started_at {timestamp} EXPIRE session:context:{session_key} 3600 ``` ### Data Lifecycle | Key Type | TTL | Cleanup Strategy | | -------------------- | ---- | ------------------------------------------- | | `agent:heartbeat:*` | 60s | Auto-expire | | `agent:status:*` | None | Delete on agent termination | | `session:context:*` | 1h | Auto-expire | | `tasks:pending:*` | None | Remove on assignment | | `coordinator:lock:*` | 30s | Auto-expire (renewed by active coordinator) | | `task:assign_lock:*` | 5s | Auto-expire after assignment | --- ## API Endpoints ### Task Management ```typescript // Create a new orchestration task POST /api/v1/agent-tasks { "title": "Fix TypeScript strict errors in U-Connect", "description": "...", "taskType": "development", "priority": 8, "inputContext": { "repository": "u-connect", "branch": "main", "scope": ["packages/shared"] }, "dependsOn": [], "maxRetries": 3 } // List tasks (with filtering) GET /api/v1/agent-tasks?status=RUNNING&priority>=8&tags=typescript // Get task details GET /api/v1/agent-tasks/:id // Update task (limited fields) PATCH /api/v1/agent-tasks/:id { "priority": 10, "status": "PAUSED" } // Cancel task POST /api/v1/agent-tasks/:id/cancel { "reason": "Requirements changed" } // Retry failed task POST /api/v1/agent-tasks/:id/retry // Get task logs GET /api/v1/agent-tasks/:id/logs?level=ERROR&limit=100 ``` ### Agent Management ```typescript // List agents GET /api/v1/agents?status=WORKING&coordinatorEnabled=true // Get agent details GET /api/v1/agents/:id // Register/update agent PUT /api/v1/agents/:agentId { "name": "Dev Agent #1", "model": "claude-sonnet-4", "capabilities": ["typescript", "nestjs", "prisma"], "maxConcurrentTasks": 2, "heartbeatIntervalSeconds": 30 } // Get agent health GET /api/v1/agents/:id/health // Get agent tasks GET /api/v1/agents/:id/tasks?status=RUNNING // Terminate agent POST /api/v1/agents/:id/terminate { "reason": "Manual shutdown", "graceful": true // Allow task completion } ``` ### Coordination ```typescript // Coordinator status (internal/admin only) GET /api/v1/coordinator/status { "active": true, "instanceId": "coord-abc123", "workspaces": ["ws-1", "ws-2"], "lastHeartbeat": "2025-01-29T14:30:00Z", "stats": { "pendingTasks": 3, "runningTasks": 5, "healthyAgents": 8, "staleAgents": 1 } } // Force recovery check (admin only) POST /api/v1/coordinator/recover { "workspaceId": "optional - all if omitted" } // Get coordination logs (admin only) GET /api/v1/coordinator/logs?workspace={id}&level=ERROR ``` ### Webhooks (for external integration) ```typescript // Task state change webhook POST {configured_webhook_url} { "event": "task.status_changed", "taskId": "task-123", "previousStatus": "RUNNING", "newStatus": "COMPLETED", "workspaceId": "ws-1", "timestamp": "2025-01-29T14:30:00Z" } ``` --- ## Coordinator Service Design ### Core Responsibilities 1. **Health Monitoring** — Check agent heartbeats, mark stale agents 2. **Task Assignment** — Match pending tasks to available agents 3. **Recovery Orchestration** — Reassign tasks from failed/stale agents 4. **Dependency Resolution** — Ensure tasks wait for dependencies 5. **Resource Management** — Enforce agent concurrency limits ### Architecture ```typescript // Coordinator Service (NestJS) @Injectable() export class CoordinatorService { private coordinatorLock: string; private isRunning: boolean = false; constructor( private readonly taskManager: TaskManagerService, private readonly agentManager: AgentManagerService, private readonly valkey: Valkey, private readonly prisma: PrismaService, private readonly logger: Logger ) {} // Main coordination loop @Cron("*/30 * * * * *") // Every 30 seconds async coordinate() { if (!(await this.acquireLock())) { return; // Another coordinator is active } try { await this.checkAgentHealth(); await this.assignPendingTasks(); await this.resolveDependencies(); await this.recoverFailedTasks(); } catch (error) { this.logger.error("Coordination cycle failed", error); } finally { await this.releaseLock(); } } // Distributed lock to prevent multiple coordinators private async acquireLock(): Promise { const lockKey = `coordinator:lock:global`; const result = await this.valkey.set( lockKey, process.env.HOSTNAME || "coordinator", "NX", "EX", 30 ); return result === "OK"; } // Check agent heartbeats and mark stale private async checkAgentHealth() { const agents = await this.agentManager.getCoordinatorManagedAgents(); const now = Date.now(); for (const agent of agents) { const heartbeatKey = `agent:heartbeat:${agent.id}`; const lastHeartbeat = await this.valkey.get(heartbeatKey); if (!lastHeartbeat) { // No heartbeat - agent is stale await this.handleStaleAgent(agent); } else { const heartbeatData = JSON.parse(lastHeartbeat); const age = now - heartbeatData.timestamp; if (age > agent.staleThresholdSeconds * 1000) { await this.handleStaleAgent(agent); } } } } // Assign pending tasks to available agents private async assignPendingTasks() { const workspaces = await this.getActiveWorkspaces(); for (const workspace of workspaces) { const pendingTasks = await this.taskManager.getPendingTasks(workspace.id, { orderBy: { priority: "desc", createdAt: "asc" }, }); for (const task of pendingTasks) { // Check dependencies if (!(await this.areDependenciesMet(task))) { continue; } // Find available agent const agent = await this.agentManager.findAvailableAgent( workspace.id, task.taskType, task.metadata.requiredCapabilities ); if (agent) { await this.assignTask(task, agent); } } } } // Handle stale agents private async handleStaleAgent(agent: Agent) { this.logger.warn(`Agent ${agent.id} is stale - recovering tasks`); // Mark agent as ERROR await this.agentManager.updateAgentStatus(agent.id, AgentStatus.ERROR); // Get assigned tasks const tasks = await this.taskManager.getTasksForAgent(agent.id); for (const task of tasks) { await this.recoverTask(task, "agent_stale"); } } // Recover a task from failure private async recoverTask(task: AgentTask, reason: string) { // Log the failure await this.taskManager.logTaskEvent(task.id, { level: "ERROR", event: "task_recovery", message: `Task recovery initiated: ${reason}`, previousStatus: task.status, newStatus: "ABORTED", }); // Check retry limit if (task.retryCount >= task.maxRetries) { await this.taskManager.updateTask(task.id, { status: "FAILED", lastError: `Max retries exceeded (${task.retryCount}/${task.maxRetries})`, failedAt: new Date(), }); return; } // Abort current assignment await this.taskManager.updateTask(task.id, { status: "ABORTED", agentId: null, sessionKey: null, retryCount: task.retryCount + 1, }); // Wait for backoff period before requeuing const backoffMs = task.retryBackoffSeconds * 1000 * Math.pow(2, task.retryCount); setTimeout(async () => { await this.taskManager.updateTask(task.id, { status: "PENDING", }); }, backoffMs); } // Assign task to agent private async assignTask(task: AgentTask, agent: Agent) { // Acquire assignment lock const lockKey = `task:assign_lock:${task.id}`; const locked = await this.valkey.set(lockKey, agent.id, "NX", "EX", 5); if (!locked) { return; // Another coordinator already assigned this task } try { // Update task await this.taskManager.updateTask(task.id, { status: "ASSIGNED", agentId: agent.id, assignedAt: new Date(), }); // Spawn agent session via Gateway const session = await this.spawnAgentSession(agent, task); // Update task with session await this.taskManager.updateTask(task.id, { sessionKey: session.sessionKey, status: "RUNNING", startedAt: new Date(), }); // Log assignment await this.taskManager.logTaskEvent(task.id, { level: "INFO", event: "task_assigned", message: `Task assigned to agent ${agent.id}`, details: { agentId: agent.id, sessionKey: session.sessionKey }, }); } finally { await this.valkey.del(lockKey); } } // Spawn agent session via Gateway private async spawnAgentSession(agent: Agent, task: AgentTask): Promise { // Call Gateway API to spawn subagent with task context const response = await fetch(`${process.env.GATEWAY_URL}/api/agents/spawn`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ workspaceId: task.workspaceId, agentId: agent.id, taskId: task.id, label: `task:${task.id}:${task.title.slice(0, 30)}`, context: { taskTitle: task.title, taskDescription: task.description, inputContext: task.inputContext, checkpointData: task.checkpointData, }, }), }); const data = await response.json(); return data.session; } } ``` ### Coordination State Machine ``` ┌─────────────────────────────────────────────────────────┐ │ Coordinator Cycle │ └─────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────┐ │ Acquire Lock (30s) │ └───────────┬───────────┘ │ ┌────────▼────────┐ │ Check Health │ │ - Read agents │ │ - Check ttls │ │ - Mark stale │ └────────┬────────┘ │ ┌────────▼────────┐ │ Assign Tasks │ │ - Read pending │ │ - Match agents │ │ - Spawn sessions│ └────────┬────────┘ │ ┌────────▼────────┐ │ Resolve Deps │ │ - Check waiting │ │ - Unblock ready │ └────────┬────────┘ │ ┌────────▼────────┐ │ Recover Failed │ │ - Requeue retry │ │ - Log failures │ └────────┬────────┘ │ ┌───────────▼───────────┐ │ Release Lock │ └───────────────────────┘ ``` --- ## Recovery & Resilience Patterns ### 1. Agent Failure Recovery **Scenario:** Agent crashes mid-task. **Detection:** - Heartbeat TTL expires in Valkey - Coordinator detects missing heartbeat **Recovery:** 1. Mark agent as `ERROR` in database 2. Abort assigned tasks with `status = ABORTED` 3. Log failure with stack trace (if available) 4. Apply exponential backoff: `backoff = retryBackoffSeconds * 2^retryCount` 5. Requeue task to `PENDING` after backoff 6. Coordinator reassigns on next cycle ### 2. Gateway Restart Recovery **Scenario:** Gateway restarts, killing all agent sessions. **Detection:** - All agent heartbeats stop simultaneously - Coordinator detects mass stale agents **Recovery:** 1. Coordinator marks all `RUNNING` tasks as `ABORTED` 2. Tasks with `checkpointData` can resume from last checkpoint 3. Tasks without checkpoints restart from scratch 4. Exponential backoff prevents thundering herd ### 3. Task Dependency Deadlock **Scenario:** Task A depends on Task B, which depends on Task A (circular dependency). **Detection:** - Coordinator builds dependency graph - Detects cycles during `resolveDependencies()` **Recovery:** 1. Log `ERROR` with cycle details 2. Mark all tasks in cycle as `FAILED` with reason `dependency_cycle` 3. Notify workspace owner via webhook ### 4. Database Failure **Scenario:** PostgreSQL becomes unavailable. **Detection:** - Prisma query fails with connection error **Recovery:** 1. Coordinator catches error, logs to stderr 2. Releases lock (allowing failover to another instance) 3. Retries with exponential backoff: 5s, 10s, 20s, 40s 4. If database remains down >5 minutes, alert admin ### 5. Split-Brain Prevention **Scenario:** Network partition causes two coordinators to run simultaneously. **Prevention:** - Distributed lock in Valkey with 30s TTL - Coordinators must renew lock every cycle - Only one coordinator can hold lock at a time **Detection:** - Task assigned to multiple agents (conflict detection) **Recovery:** 1. Newer assignment wins (based on `assignedAt` timestamp) 2. Cancel older session 3. Log conflict for investigation ### 6. Stale Task Timeout **Scenario:** Task runs longer than `estimatedCompletionAt + grace period`. **Detection:** - Coordinator checks `estimatedCompletionAt` field - If exceeded by >30 minutes, mark as potentially hung **Recovery:** 1. Send warning to agent session (via pub/sub) 2. If no progress update in 10 minutes, abort task 3. Log timeout error 4. Requeue with higher priority --- ## Usage Budget Management **Version:** 1.0 **Date:** 2026-02-04 **Status:** Required for MVP ### Problem Statement Autonomous agents using Claude Code can consume significant API tokens without proper governance. Without real-time usage tracking and budgeting, projects risk: 1. **Cost overruns** — Agents exceed budget before milestone completion 2. **Service disruption** — Hit API rate limits mid-task 3. **Unpredictable momentum** — Can't estimate project velocity 4. **Budget exhaustion** — Agents consume entire monthly budget in days ### Requirements The orchestration layer must provide: - **Real-time usage tracking** — Current token usage across all active agents - **Budget allocation** — Pre-allocate budgets per task/milestone/project - **Usage projection** — Estimate remaining work vs remaining budget - **Throttling decisions** — Pause/slow agents approaching limits - **Cost optimization** — Route tasks to appropriate model tiers (Haiku/Sonnet/Opus) ### Architecture ``` ┌─────────────────────────────────────────────────────────┐ │ Usage Budget Manager Service │ │ ┌───────────────────┐ ┌────────────────────────────┐ │ │ │ Usage Tracker │ │ Budget Allocator │ │ │ │ - Query API │ │ - Per-task budgets │ │ │ │ - Real-time sum │ │ - Per-milestone budgets │ │ │ │ - Agent rollup │ │ - Global limits │ │ │ └────────┬──────────┘ └──────────┬─────────────────┘ │ │ │ │ │ │ ┌────────▼────────────────────────▼─────────────────┐ │ │ │ Projection Engine │ │ │ │ - Estimate remaining work │ │ │ │ - Calculate burn rate │ │ │ │ - Predict budget exhaustion │ │ │ │ - Recommend throttle/pause │ │ │ └───────────────────────────┬───────────────────────┘ │ └────────────────────────────────┼─────────────────────────┘ │ ┌────────────┼────────────┐ │ │ │ ┌───────▼──────┐ ┌──▼──────┐ ┌──▼────────────┐ │Queue Manager │ │Agent Mgr│ │ Coordinator │ │- Check budget│ │- Spawn │ │ - Pre-commit │ │ before queue│ │ check │ │ validation │ └──────────────┘ └─────────┘ └───────────────┘ ``` ### Budget Check Points **1. Task Assignment** (Queue Manager) ```typescript async canAffordTask(taskId: string): Promise { const task = await getTask(taskId); const currentUsage = await usageTracker.getCurrentUsage(); const budget = await budgetAllocator.getTaskBudget(taskId); const projectedCost = estimateTaskCost(task); const remaining = budget.limit - currentUsage.total; if (projectedCost > remaining) { return { canProceed: false, reason: 'Insufficient budget', recommendation: 'Pause or reallocate budget', }; } return { canProceed: true }; } ``` **2. Agent Spawn** (Agent Manager) Before spawning a Claude Code agent, verify budget headroom: ```typescript async spawnAgent(config: AgentConfig): Promise { const budgetCheck = await usageBudgetManager.canAffordTask(config.taskId); if (!budgetCheck.canProceed) { throw new InsufficientBudgetError(budgetCheck.reason); } // Proceed with spawn const agent = await claudeCode.spawn(config); // Track agent for usage rollup await usageTracker.registerAgent(agent.id, config.taskId); return agent; } ``` **3. Checkpoint Intervals** (Coordinator) During task execution, periodically verify budget compliance: ```typescript async checkpointBudgetCompliance(taskId: string): Promise { const usage = await usageTracker.getTaskUsage(taskId); const budget = await budgetAllocator.getTaskBudget(taskId); const percentUsed = (usage.current / budget.allocated) * 100; if (percentUsed > 90) { await coordinator.sendWarning(taskId, 'Approaching budget limit'); } if (percentUsed > 100) { await coordinator.pauseTask(taskId, 'Budget exceeded'); await notifyUser(taskId, 'Task paused: budget exhausted'); } } ``` **4. Pre-commit Validation** (Quality Gates) Before committing work, verify usage is reasonable: ```typescript async validateUsageEfficiency(taskId: string): Promise { const usage = await usageTracker.getTaskUsage(taskId); const linesChanged = await git.getChangedLines(taskId); const testsAdded = await git.getTestFiles(taskId); // Cost per line heuristic (adjust based on learnings) const expectedTokensPerLine = 150; // baseline + TDD overhead const expectedUsage = linesChanged * expectedTokensPerLine; const efficiency = expectedUsage / usage.current; if (efficiency < 0.5) { return { valid: false, reason: 'Usage appears inefficient', recommendation: 'Review agent logs for token waste', }; } return { valid: true }; } ``` ### Data Model #### `usage_budgets` Table ```sql CREATE TABLE usage_budgets ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), workspace_id UUID NOT NULL REFERENCES workspaces(id), -- Scope: 'global', 'project', 'milestone', 'task' scope VARCHAR(20) NOT NULL, scope_id VARCHAR(100), -- project_id, milestone_id, or task_id -- Budget limits (tokens) allocated BIGINT NOT NULL, consumed BIGINT NOT NULL DEFAULT 0, remaining BIGINT GENERATED ALWAYS AS (allocated - consumed) STORED, -- Tracking period_start TIMESTAMPTZ NOT NULL, period_end TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_usage_budgets_scope ON usage_budgets(scope, scope_id); CREATE INDEX idx_usage_budgets_workspace ON usage_budgets(workspace_id); ``` #### `agent_usage_logs` Table ```sql CREATE TABLE agent_usage_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), workspace_id UUID NOT NULL REFERENCES workspaces(id), agent_session_id UUID NOT NULL, task_id UUID REFERENCES agent_tasks(id), -- Usage details input_tokens BIGINT NOT NULL, output_tokens BIGINT NOT NULL, total_tokens BIGINT NOT NULL, model VARCHAR(100) NOT NULL, -- 'claude-sonnet-4', 'claude-haiku-3.5', etc. -- Cost tracking estimated_cost_usd DECIMAL(10, 6), -- Context operation VARCHAR(100), -- 'task_execution', 'quality_review', etc. logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_agent_usage_task ON agent_usage_logs(task_id); CREATE INDEX idx_agent_usage_session ON agent_usage_logs(agent_session_id); CREATE INDEX idx_agent_usage_workspace ON agent_usage_logs(workspace_id); ``` ### Valkey/Redis Keys ``` # Current usage (real-time) usage:current:{workspace_id} -> HASH { "total_tokens": "1234567", "total_cost_usd": "12.34", "last_updated": "2026-02-04T10:30:00Z" } # Per-task usage usage:task:{task_id} -> HASH { "allocated": "50000", "consumed": "23456", "remaining": "26544", "agents": ["agent-1", "agent-2"] } # Budget alerts usage:alerts:{workspace_id} -> LIST [ { "task_id": "task-123", "level": "warning", "percent_used": 92, "message": "Task approaching budget limit" } ] ``` ### Cost Estimation Formulas Based on autonomous execution learnings: ```typescript function estimateTaskCost(task: Task): number { const baselineTokens = task.estimatedComplexity * 1000; // tokens per complexity point // Overhead factors const tddOverhead = 1.2; // +20% for test writing const baselineBuffer = 1.3; // +30% general buffer const phaseBuffer = 1.15; // +15% phase-specific uncertainty const estimated = baselineTokens * tddOverhead * baselineBuffer * phaseBuffer; return Math.ceil(estimated); } ``` ### Model Tier Optimization Route tasks to appropriate model tiers for cost efficiency: | Model | Cost/MTok (input) | Cost/MTok (output) | Use Case | | ---------------- | ----------------- | ------------------ | ------------------------------------ | | Claude Haiku 3.5 | $0.80 | $4.00 | Simple CRUD, boilerplate, linting | | Claude Sonnet 4 | $3.00 | $15.00 | Standard development, refactoring | | Claude Opus 4 | $15.00 | $75.00 | Complex architecture, critical fixes | **Routing logic:** ```typescript function selectModel(task: Task): ModelTier { if (task.priority === "critical" || task.complexity > 8) { return "opus"; } if (task.type === "boilerplate" || task.estimatedTokens < 10000) { return "haiku"; } return "sonnet"; // default } ``` ### Projection Algorithm Predict budget exhaustion: ```typescript async function projectBudgetExhaustion(workspaceId: string): Promise { const usage = await usageTracker.getCurrentUsage(workspaceId); const budget = await budgetAllocator.getGlobalBudget(workspaceId); const dailyBurnRate = usage.tokensLast24h; const daysRemaining = budget.remaining / dailyBurnRate; const exhaustionDate = new Date(); exhaustionDate.setDate(exhaustionDate.getDate() + daysRemaining); return { remaining_tokens: budget.remaining, daily_burn_rate: dailyBurnRate, days_until_exhaustion: daysRemaining, projected_exhaustion_date: exhaustionDate, recommendation: daysRemaining < 7 ? "THROTTLE" : "CONTINUE", }; } ``` ### Implementation Priority **MVP (M6 Phase 3):** - ✅ Basic usage tracking (log tokens per task) - ✅ Simple budget checks (can afford this task?) - ✅ Alert on budget exceeded **Post-MVP (M6 Phase 5):** - Projection engine (when will budget run out?) - Model tier optimization (Haiku/Sonnet/Opus routing) - Historical analysis (actual vs estimated) - Budget reallocation (move budget between projects) ### Success Metrics - **Budget accuracy**: Estimated vs actual within 20% - **Cost optimization**: 40%+ savings from model tier routing - **No surprise exhaustion**: Zero instances of unexpected budget depletion - **Steady momentum**: Projects maintain velocity without budget interruptions --- ## Implementation Phases ### Phase 1: Foundation (Week 1-2) **Goal:** Basic task and agent models, no coordination yet. **Deliverables:** - [ ] Database schema migration (tables, indexes) - [ ] Prisma models for `AgentTask`, `AgentTaskLog`, `AgentHeartbeat` - [ ] Basic CRUD API endpoints for tasks - [ ] Agent registration API - [ ] Manual task assignment (no automation) **Testing:** - Unit tests for task state machine - Integration tests for task CRUD - Manual testing: create task, assign to agent, complete ### Phase 2: Coordination Core (Week 3-4) **Goal:** Autonomous coordinator with health monitoring. **Deliverables:** - [ ] `CoordinatorService` with distributed locking - [ ] Health monitoring (heartbeat TTL checks) - [ ] Automatic task assignment to available agents - [ ] Valkey integration for runtime state - [ ] Pub/Sub for coordination events **Testing:** - Unit tests for coordinator logic - Integration tests with Valkey - Chaos testing: kill agents, verify recovery - Load testing: 10+ agents, 50+ tasks ### Phase 3: Recovery & Resilience (Week 5) **Goal:** Fault-tolerant operation with automatic recovery. **Deliverables:** - [ ] Agent failure detection and task recovery - [ ] Exponential backoff for retries - [ ] Checkpoint/resume support for long-running tasks - [ ] Dependency resolution - [ ] Deadlock detection **Testing:** - Fault injection: kill agents, restart Gateway - Dependency cycle testing - Retry exhaustion testing - Split-brain simulation ### Phase 4: Observability (Week 6) **Goal:** Full visibility into orchestration state. **Deliverables:** - [ ] Coordinator status dashboard - [ ] Task progress tracking UI - [ ] Real-time logs API - [ ] Metrics export (Prometheus format) - [ ] Webhook integration for external monitoring **Testing:** - Load testing with metrics collection - Dashboard usability testing - Webhook reliability testing ### Phase 5: Advanced Features (Future) **Goal:** Production-grade features. **Deliverables:** - [ ] Task prioritization algorithms (SJF, priority queues) - [ ] Agent capability matching (skills-based routing) - [ ] Task batching (group similar tasks) - [ ] Cost tracking (token usage, compute time) - [ ] Multi-region coordination (geo-distributed agents) --- ## Security Considerations ### Row-Level Security (RLS) All queries must enforce workspace isolation: ```typescript // Example: Only return tasks for user's workspace async getTasks(userId: string, workspaceId: string) { const membership = await this.prisma.workspaceMember.findUnique({ where: { workspaceId_userId: { workspaceId, userId } } }); if (!membership) { throw new ForbiddenException('Not a member of this workspace'); } return this.prisma.agentTask.findMany({ where: { workspaceId } }); } ``` ### API Authentication - All endpoints require valid session token - Coordinator endpoints restricted to `ADMIN` or `OWNER` roles - Agent heartbeat endpoints use JWT with agent-specific claims ### Secrets Management - Task `inputContext` may contain secrets (API keys, passwords) - Encrypt sensitive fields at rest using Prisma middleware - Never log secrets in task logs - Redact secrets in API responses --- ## Monitoring & Alerting ### Key Metrics | Metric | Description | Alert Threshold | | -------------------------------- | --------------------------------- | --------------- | | `coordinator.cycle.duration_ms` | Coordination cycle execution time | >5000ms | | `coordinator.stale_agents.count` | Number of stale agents detected | >5 | | `tasks.pending.count` | Tasks waiting for assignment | >50 | | `tasks.failed.count` | Total failed tasks (last 1h) | >10 | | `tasks.retry.exhausted.count` | Tasks exceeding max retries | >0 | | `agents.spawned.count` | Agent spawn rate | >100/min | | `valkey.connection.errors` | Valkey connection failures | >0 | ### Health Checks ```http GET /health/coordinator { "status": "healthy", "coordinator": { "active": true, "lastCycle": "2025-01-29T14:30:00Z", "cycleCount": 1234 }, "database": "connected", "valkey": "connected", "gateway": "reachable" } ``` --- ## API Integration Examples ### Creating a Task from Main Agent ```typescript // Main agent creates a development task const task = await fetch("/api/v1/agent-tasks", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${sessionToken}`, }, body: JSON.stringify({ title: "Fix TypeScript strict errors in U-Connect", description: "Run tsc --noEmit, fix all errors, commit changes", taskType: "development", priority: 8, inputContext: { repository: "u-connect", branch: "main", commands: ["pnpm install", "pnpm tsc:check"], }, maxRetries: 2, estimatedDurationMinutes: 30, }), }); const { id } = await task.json(); console.log(`Task created: ${id}`); ``` ### Agent Heartbeat (from spawned subagent) ```typescript // Subagent sends heartbeat every 30s setInterval(async () => { await fetch(`/api/v1/agents/${agentId}/heartbeat`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${agentToken}`, }, body: JSON.stringify({ status: "healthy", currentTaskId: taskId, progressPercent: 45, currentStep: "Running tsc --noEmit", memoryMb: 512, cpuPercent: 35, }), }); }, 30000); ``` ### Task Progress Update ```typescript // Agent updates task progress await fetch(`/api/v1/agent-tasks/${taskId}/progress`, { method: "PATCH", headers: { "Content-Type": "application/json", Authorization: `Bearer ${agentToken}`, }, body: JSON.stringify({ progressPercent: 70, currentStep: "Fixing type errors in packages/shared", checkpointData: { filesProcessed: 15, errorsFixed: 8, remainingFiles: 5, }, }), }); ``` ### Task Completion ```typescript // Agent marks task complete await fetch(`/api/v1/agent-tasks/${taskId}/complete`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${agentToken}`, }, body: JSON.stringify({ outputResult: { filesModified: 20, errorsFixed: 23, commitHash: "abc123", buildStatus: "passing", }, summary: "All TypeScript strict errors resolved. Build passing.", }), }); ``` --- ## Glossary | Term | Definition | | ----------------- | ------------------------------------------------------------------ | | **Agent** | Autonomous AI instance (e.g., Claude subagent) that executes tasks | | **Task** | Unit of work to be executed by an agent | | **Coordinator** | Background service that assigns tasks and monitors agent health | | **Heartbeat** | Periodic signal from agent indicating it's alive and working | | **Stale Agent** | Agent that has stopped sending heartbeats (assumed dead) | | **Checkpoint** | Snapshot of task state allowing resumption after failure | | **Workspace** | Tenant isolation boundary (all tasks belong to a workspace) | | **Session** | Gateway-managed connection between user and agent | | **Orchestration** | Automated coordination of multiple agents working on tasks | --- ## References - [Mosaic Stack Architecture](../3-architecture/README.md) - [Existing Agent/AgentSession Schema](../../apps/api/prisma/schema.prisma) - [NestJS Task Scheduling](https://docs.nestjs.com/techniques/task-scheduling) - [Valkey Documentation](https://valkey.io/docs/) - [PostgreSQL Advisory Locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) --- **Next Steps:** 1. Review and approve this design document 2. Create GitHub issues for Phase 1 tasks 3. Set up development branch: `feature/agent-orchestration` 4. Begin database schema migration **Questions/Feedback:** Open an issue in `mosaic-stack` repo with label `orchestration`.