Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Add comprehensive usage budget management design to M6 orchestration architecture. FEATURES: - Real-time usage tracking across agents - Budget allocation per task/milestone/project - Usage projection and burn rate calculation - Throttling decisions to prevent budget exhaustion - Model tier optimization (Haiku/Sonnet/Opus) - Pre-commit usage validation DATA MODEL: - usage_budgets table (allocated/consumed/remaining) - agent_usage_logs table (per-agent tracking) - Valkey keys for real-time state BUDGET CHECKPOINTS: 1. Task assignment - can afford this task? 2. Agent spawn - verify budget headroom 3. Checkpoint intervals - periodic compliance 4. Pre-commit validation - usage efficiency PRIORITY: MVP (M6 Phase 3) for basic tracking, Phase 5 for advanced projection and optimization. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1543 lines
48 KiB
Markdown
1543 lines
48 KiB
Markdown
# Agent Orchestration Layer
|
|
|
|
**Version:** 1.0
|
|
**Status:** Design Phase
|
|
**Author:** Mosaic Stack Team
|
|
**Date:** 2025-01-29
|
|
|
|
## Table of Contents
|
|
|
|
1. [Problem Statement](#problem-statement)
|
|
2. [Architecture Overview](#architecture-overview)
|
|
3. [Database Schema](#database-schema)
|
|
4. [Valkey/Redis Key Patterns](#valkeyredis-key-patterns)
|
|
5. [API Endpoints](#api-endpoints)
|
|
6. [Coordinator Service Design](#coordinator-service-design)
|
|
7. [Recovery & Resilience Patterns](#recovery--resilience-patterns)
|
|
8. [Implementation Phases](#implementation-phases)
|
|
|
|
---
|
|
|
|
## Problem Statement
|
|
|
|
### Current Limitations
|
|
|
|
1. **No Persistent Orchestration**
|
|
- Agents spawned in sessions have no lifecycle management beyond the session
|
|
- Work continuity is lost when Gateway restarts or sessions end
|
|
- No coordination between agents working on the same project
|
|
|
|
2. **No Health Monitoring**
|
|
- No visibility into agent status after spawn
|
|
- Failed agents leave tasks in limbo
|
|
- No automated recovery from agent failures
|
|
|
|
3. **No Task Persistence**
|
|
- Task state only exists in session memory
|
|
- Users can't track long-running work
|
|
- No audit trail of agent activity
|
|
|
|
4. **Conversation Interference**
|
|
- Main session conversations derail active development work
|
|
- Context switching disrupts agent focus
|
|
- No separation between user chat and background work
|
|
|
|
### Requirements
|
|
|
|
The Agent Orchestration Layer must provide:
|
|
|
|
- **Persistent Task Management** — Task definitions, state, and history survive restarts
|
|
- **Autonomous Coordination** — Independent of main session; agents work continuously
|
|
- **Health Monitoring** — Real-time tracking of agent status and progress
|
|
- **Automatic Recovery** — Detect failures and resume work intelligently
|
|
- **Complete Audit Trail** — Every state transition logged for debugging and accountability
|
|
- **Multi-Workspace Support** — Row-level security and tenant isolation
|
|
|
|
---
|
|
|
|
## Architecture Overview
|
|
|
|
### High-Level Design
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ API Layer │
|
|
│ (NestJS Controllers + Guards + Interceptors) │
|
|
└────────────┬─────────────────────────────────────┬──────────────┘
|
|
│ │
|
|
│ │
|
|
┌────────▼────────┐ ┌────────▼─────────┐
|
|
│ Task Manager │ │ Agent Manager │
|
|
│ Service │ │ Service │
|
|
└────────┬────────┘ └────────┬─────────┘
|
|
│ │
|
|
│ ┌───────────────────┐ │
|
|
└─────────► Coordinator ◄──────┘
|
|
│ Service │
|
|
└─────────┬─────────┘
|
|
│
|
|
┌────────────────────┼────────────────────┐
|
|
│ │ │
|
|
┌───────▼────────┐ ┌────────▼────────┐ ┌──────▼──────┐
|
|
│ PostgreSQL │ │ Valkey/Redis │ │ Gateway │
|
|
│ (Persistent) │ │ (Runtime) │ │ (Agent │
|
|
│ │ │ │ │ Spawner) │
|
|
└────────────────┘ └─────────────────┘ └─────────────┘
|
|
```
|
|
|
|
### Component Responsibilities
|
|
|
|
| Component | Responsibility |
|
|
| ----------------- | --------------------------------------------------------------- |
|
|
| **Task Manager** | CRUD operations on tasks, state transitions, assignment logic |
|
|
| **Agent Manager** | Agent lifecycle, health tracking, session management |
|
|
| **Coordinator** | Heartbeat processing, failure detection, recovery orchestration |
|
|
| **PostgreSQL** | Persistent storage of tasks, agents, sessions, logs |
|
|
| **Valkey/Redis** | Runtime state, heartbeats, quick lookups, pub/sub |
|
|
| **Gateway** | Agent spawning, session management, message routing |
|
|
|
|
---
|
|
|
|
## Database Schema
|
|
|
|
### Task State Machine
|
|
|
|
```
|
|
┌──────────────┐
|
|
│ PENDING │
|
|
└──────┬───────┘
|
|
│
|
|
┌──────▼───────┐
|
|
┌────┤ ASSIGNED ├────┐
|
|
│ └──────┬───────┘ │
|
|
│ │ │
|
|
┌──────▼───────┐ │ ┌────────▼──────┐
|
|
│ RUNNING │ │ │ PAUSED │
|
|
└──────┬───────┘ │ └────────┬──────┘
|
|
│ │ │
|
|
│ ┌──────▼───────┐ │
|
|
└────► COMPLETED ◄────┘
|
|
└──────┬───────┘
|
|
│
|
|
┌──────▼───────┐
|
|
│ ARCHIVED │
|
|
└──────────────┘
|
|
|
|
┌──────────────┐
|
|
│ FAILED │
|
|
└──────┬───────┘
|
|
│
|
|
┌──────▼───────┐
|
|
│ ABORTED │────┐
|
|
└──────────────┘ │
|
|
│ │
|
|
└────────────┘
|
|
(retry → PENDING)
|
|
```
|
|
|
|
### New Tables
|
|
|
|
#### `agent_tasks`
|
|
|
|
Extends existing task management with orchestration-specific fields.
|
|
|
|
```sql
|
|
CREATE TYPE task_orchestration_status AS ENUM (
|
|
'PENDING', -- Awaiting assignment
|
|
'ASSIGNED', -- Assigned to an agent
|
|
'RUNNING', -- Agent actively working
|
|
'PAUSED', -- User paused
|
|
'COMPLETED', -- Successfully finished
|
|
'FAILED', -- Failed with error
|
|
'ABORTED', -- Terminated (stale agent, manual cancel)
|
|
'ARCHIVED' -- Historical record
|
|
);
|
|
|
|
CREATE TABLE agent_tasks (
|
|
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
|
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
|
|
|
-- Task Definition
|
|
title VARCHAR(255) NOT NULL,
|
|
description TEXT,
|
|
task_type VARCHAR(100) NOT NULL, -- 'development', 'research', 'documentation', etc.
|
|
|
|
-- Status & Priority
|
|
status task_orchestration_status DEFAULT 'PENDING',
|
|
priority INT DEFAULT 5, -- 1 (low) to 10 (high)
|
|
|
|
-- Assignment
|
|
agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,
|
|
session_key VARCHAR(255), -- Current active session
|
|
|
|
-- Progress Tracking
|
|
progress_percent INT DEFAULT 0 CHECK (progress_percent BETWEEN 0 AND 100),
|
|
current_step TEXT,
|
|
estimated_completion_at TIMESTAMPTZ,
|
|
|
|
-- Retry Logic
|
|
retry_count INT DEFAULT 0,
|
|
max_retries INT DEFAULT 3,
|
|
retry_backoff_seconds INT DEFAULT 300, -- 5 minutes
|
|
last_error TEXT,
|
|
|
|
-- Dependencies
|
|
depends_on UUID[] DEFAULT ARRAY[]::UUID[], -- Array of task IDs
|
|
blocks UUID[] DEFAULT ARRAY[]::UUID[], -- Tasks blocked by this one
|
|
|
|
-- Context
|
|
input_context JSONB DEFAULT '{}', -- Input data/params for agent
|
|
output_result JSONB, -- Final result from agent
|
|
checkpoint_data JSONB DEFAULT '{}', -- Resumable state
|
|
|
|
-- Metadata
|
|
metadata JSONB DEFAULT '{}',
|
|
tags VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[],
|
|
|
|
-- Audit
|
|
created_by UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
|
assigned_at TIMESTAMPTZ,
|
|
started_at TIMESTAMPTZ,
|
|
completed_at TIMESTAMPTZ,
|
|
failed_at TIMESTAMPTZ,
|
|
|
|
CONSTRAINT fk_workspace FOREIGN KEY (workspace_id) REFERENCES workspaces(id),
|
|
CONSTRAINT fk_agent FOREIGN KEY (agent_id) REFERENCES agents(id)
|
|
);
|
|
|
|
CREATE INDEX idx_agent_tasks_workspace ON agent_tasks(workspace_id);
|
|
CREATE INDEX idx_agent_tasks_status ON agent_tasks(status) WHERE status IN ('PENDING', 'ASSIGNED', 'RUNNING');
|
|
CREATE INDEX idx_agent_tasks_agent ON agent_tasks(agent_id) WHERE agent_id IS NOT NULL;
|
|
CREATE INDEX idx_agent_tasks_priority ON agent_tasks(priority DESC, created_at ASC) WHERE status = 'PENDING';
|
|
CREATE INDEX idx_agent_tasks_depends_on ON agent_tasks USING gin(depends_on);
|
|
CREATE INDEX idx_agent_tasks_tags ON agent_tasks USING gin(tags);
|
|
```
|
|
|
|
#### `agent_task_logs`
|
|
|
|
Immutable audit trail of all task state transitions and agent actions.
|
|
|
|
```sql
|
|
CREATE TYPE task_log_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL');
|
|
|
|
CREATE TABLE agent_task_logs (
|
|
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
|
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
|
task_id UUID NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE,
|
|
agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,
|
|
|
|
-- Log Entry
|
|
level task_log_level DEFAULT 'INFO',
|
|
event VARCHAR(100) NOT NULL, -- 'state_transition', 'progress_update', 'error', etc.
|
|
message TEXT,
|
|
details JSONB DEFAULT '{}',
|
|
|
|
-- State Snapshot
|
|
previous_status task_orchestration_status,
|
|
new_status task_orchestration_status,
|
|
|
|
-- Context
|
|
session_key VARCHAR(255),
|
|
stack_trace TEXT,
|
|
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_task_logs_task ON agent_task_logs(task_id, created_at DESC);
|
|
CREATE INDEX idx_task_logs_workspace ON agent_task_logs(workspace_id, created_at DESC);
|
|
CREATE INDEX idx_task_logs_level ON agent_task_logs(level) WHERE level IN ('ERROR', 'CRITICAL');
|
|
CREATE INDEX idx_task_logs_event ON agent_task_logs(event);
|
|
```
|
|
|
|
#### `agent_heartbeats`
|
|
|
|
Short-lived records for health monitoring (TTL enforced in Valkey).
|
|
|
|
```sql
|
|
CREATE TABLE agent_heartbeats (
|
|
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
|
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
|
agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
|
|
-- Health Data
|
|
status VARCHAR(50) NOT NULL, -- 'healthy', 'degraded', 'stale'
|
|
current_task_id UUID REFERENCES agent_tasks(id) ON DELETE SET NULL,
|
|
progress_percent INT,
|
|
|
|
-- Resource Usage
|
|
memory_mb INT,
|
|
cpu_percent INT,
|
|
|
|
-- Timing
|
|
last_seen_at TIMESTAMPTZ DEFAULT NOW(),
|
|
next_expected_at TIMESTAMPTZ,
|
|
|
|
metadata JSONB DEFAULT '{}'
|
|
);
|
|
|
|
CREATE INDEX idx_heartbeats_agent ON agent_heartbeats(agent_id, last_seen_at DESC);
|
|
CREATE INDEX idx_heartbeats_stale ON agent_heartbeats(last_seen_at) WHERE status != 'healthy';
|
|
|
|
-- Cleanup function (run periodically)
|
|
CREATE OR REPLACE FUNCTION cleanup_old_heartbeats()
|
|
RETURNS void AS $$
|
|
BEGIN
|
|
DELETE FROM agent_heartbeats
|
|
WHERE last_seen_at < NOW() - INTERVAL '1 hour';
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
```
|
|
|
|
### Updates to Existing Tables
|
|
|
|
#### Extend `agents` table
|
|
|
|
```sql
|
|
-- Add orchestration fields to existing agents table
|
|
ALTER TABLE agents ADD COLUMN IF NOT EXISTS coordinator_enabled BOOLEAN DEFAULT true;
|
|
ALTER TABLE agents ADD COLUMN IF NOT EXISTS max_concurrent_tasks INT DEFAULT 1;
|
|
ALTER TABLE agents ADD COLUMN IF NOT EXISTS heartbeat_interval_seconds INT DEFAULT 30;
|
|
ALTER TABLE agents ADD COLUMN IF NOT EXISTS stale_threshold_seconds INT DEFAULT 180; -- 3 minutes
|
|
ALTER TABLE agents ADD COLUMN IF NOT EXISTS capabilities VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[];
|
|
|
|
CREATE INDEX idx_agents_coordinator ON agents(coordinator_enabled) WHERE coordinator_enabled = true;
|
|
```
|
|
|
|
---
|
|
|
|
## Valkey/Redis Key Patterns
|
|
|
|
Valkey is used for:
|
|
|
|
- **Real-time state** (fast reads/writes)
|
|
- **Pub/Sub messaging** (coordination events)
|
|
- **Distributed locks** (prevent race conditions)
|
|
- **Session data** (ephemeral context)
|
|
|
|
### Key Naming Convention
|
|
|
|
```
|
|
{namespace}:{workspace_id}:{entity_type}:{entity_id}:{property}
|
|
```
|
|
|
|
### Key Patterns
|
|
|
|
#### Task Queue
|
|
|
|
```redis
|
|
# Pending tasks (sorted set by priority + timestamp)
|
|
ZADD tasks:pending:{workspace_id} {priority}:{timestamp} {task_id}
|
|
|
|
# Assigned tasks (hash per agent)
|
|
HSET agent:tasks:{agent_id} {task_id} {assigned_at}
|
|
|
|
# Active sessions (set)
|
|
SADD tasks:active:{workspace_id} {task_id}:{session_key}
|
|
```
|
|
|
|
#### Agent Status
|
|
|
|
```redis
|
|
# Agent heartbeat (string with TTL)
|
|
SETEX agent:heartbeat:{agent_id} 60 "{\"status\":\"running\",\"task_id\":\"{task_id}\",\"timestamp\":{ts}}"
|
|
|
|
# Agent status (hash)
|
|
HSET agent:status:{agent_id}
|
|
status "running"
|
|
current_task "{task_id}"
|
|
last_heartbeat {timestamp}
|
|
|
|
# Stale agents (sorted set by last heartbeat)
|
|
ZADD coordinator:stale_agents {timestamp} {agent_id}
|
|
```
|
|
|
|
#### Coordination
|
|
|
|
```redis
|
|
# Coordinator lock (to prevent multiple coordinators)
|
|
SET coordinator:lock:{workspace_id} {coordinator_instance_id} NX EX 30
|
|
|
|
# Task assignment lock
|
|
SET task:assign_lock:{task_id} {agent_id} NX EX 5
|
|
|
|
# Recovery queue (list)
|
|
LPUSH coordinator:recovery_queue:{workspace_id} {task_id}
|
|
```
|
|
|
|
#### Pub/Sub Channels
|
|
|
|
```redis
|
|
# Task events
|
|
PUBLISH tasks:events:{workspace_id} "{\"event\":\"task_completed\",\"task_id\":\"{task_id}\"}"
|
|
|
|
# Agent events
|
|
PUBLISH agents:events:{workspace_id} "{\"event\":\"agent_stale\",\"agent_id\":\"{agent_id}\"}"
|
|
|
|
# Coordinator commands
|
|
PUBLISH coordinator:commands:{workspace_id} "{\"command\":\"reassign_task\",\"task_id\":\"{task_id}\"}"
|
|
```
|
|
|
|
#### Session Data (TTL: 1 hour)
|
|
|
|
```redis
|
|
# Session context (hash with TTL)
|
|
HSET session:context:{session_key}
|
|
workspace_id "{workspace_id}"
|
|
agent_id "{agent_id}"
|
|
task_id "{task_id}"
|
|
started_at {timestamp}
|
|
EXPIRE session:context:{session_key} 3600
|
|
```
|
|
|
|
### Data Lifecycle
|
|
|
|
| Key Type | TTL | Cleanup Strategy |
|
|
| -------------------- | ---- | ------------------------------------------- |
|
|
| `agent:heartbeat:*` | 60s | Auto-expire |
|
|
| `agent:status:*` | None | Delete on agent termination |
|
|
| `session:context:*` | 1h | Auto-expire |
|
|
| `tasks:pending:*` | None | Remove on assignment |
|
|
| `coordinator:lock:*` | 30s | Auto-expire (renewed by active coordinator) |
|
|
| `task:assign_lock:*` | 5s | Auto-expire after assignment |
|
|
|
|
---
|
|
|
|
## API Endpoints
|
|
|
|
### Task Management
|
|
|
|
```typescript
|
|
// Create a new orchestration task
|
|
POST /api/v1/agent-tasks
|
|
{
|
|
"title": "Fix TypeScript strict errors in U-Connect",
|
|
"description": "...",
|
|
"taskType": "development",
|
|
"priority": 8,
|
|
"inputContext": {
|
|
"repository": "u-connect",
|
|
"branch": "main",
|
|
"scope": ["packages/shared"]
|
|
},
|
|
"dependsOn": [],
|
|
"maxRetries": 3
|
|
}
|
|
|
|
// List tasks (with filtering)
|
|
GET /api/v1/agent-tasks?status=RUNNING&priority>=8&tags=typescript
|
|
|
|
// Get task details
|
|
GET /api/v1/agent-tasks/:id
|
|
|
|
// Update task (limited fields)
|
|
PATCH /api/v1/agent-tasks/:id
|
|
{
|
|
"priority": 10,
|
|
"status": "PAUSED"
|
|
}
|
|
|
|
// Cancel task
|
|
POST /api/v1/agent-tasks/:id/cancel
|
|
{
|
|
"reason": "Requirements changed"
|
|
}
|
|
|
|
// Retry failed task
|
|
POST /api/v1/agent-tasks/:id/retry
|
|
|
|
// Get task logs
|
|
GET /api/v1/agent-tasks/:id/logs?level=ERROR&limit=100
|
|
```
|
|
|
|
### Agent Management
|
|
|
|
```typescript
|
|
// List agents
|
|
GET /api/v1/agents?status=WORKING&coordinatorEnabled=true
|
|
|
|
// Get agent details
|
|
GET /api/v1/agents/:id
|
|
|
|
// Register/update agent
|
|
PUT /api/v1/agents/:agentId
|
|
{
|
|
"name": "Dev Agent #1",
|
|
"model": "claude-sonnet-4",
|
|
"capabilities": ["typescript", "nestjs", "prisma"],
|
|
"maxConcurrentTasks": 2,
|
|
"heartbeatIntervalSeconds": 30
|
|
}
|
|
|
|
// Get agent health
|
|
GET /api/v1/agents/:id/health
|
|
|
|
// Get agent tasks
|
|
GET /api/v1/agents/:id/tasks?status=RUNNING
|
|
|
|
// Terminate agent
|
|
POST /api/v1/agents/:id/terminate
|
|
{
|
|
"reason": "Manual shutdown",
|
|
"graceful": true // Allow task completion
|
|
}
|
|
```
|
|
|
|
### Coordination
|
|
|
|
```typescript
|
|
// Coordinator status (internal/admin only)
|
|
GET /api/v1/coordinator/status
|
|
{
|
|
"active": true,
|
|
"instanceId": "coord-abc123",
|
|
"workspaces": ["ws-1", "ws-2"],
|
|
"lastHeartbeat": "2025-01-29T14:30:00Z",
|
|
"stats": {
|
|
"pendingTasks": 3,
|
|
"runningTasks": 5,
|
|
"healthyAgents": 8,
|
|
"staleAgents": 1
|
|
}
|
|
}
|
|
|
|
// Force recovery check (admin only)
|
|
POST /api/v1/coordinator/recover
|
|
{
|
|
"workspaceId": "optional - all if omitted"
|
|
}
|
|
|
|
// Get coordination logs (admin only)
|
|
GET /api/v1/coordinator/logs?workspace={id}&level=ERROR
|
|
```
|
|
|
|
### Webhooks (for external integration)
|
|
|
|
```typescript
|
|
// Task state change webhook
|
|
POST {configured_webhook_url}
|
|
{
|
|
"event": "task.status_changed",
|
|
"taskId": "task-123",
|
|
"previousStatus": "RUNNING",
|
|
"newStatus": "COMPLETED",
|
|
"workspaceId": "ws-1",
|
|
"timestamp": "2025-01-29T14:30:00Z"
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Coordinator Service Design
|
|
|
|
### Core Responsibilities
|
|
|
|
1. **Health Monitoring** — Check agent heartbeats, mark stale agents
|
|
2. **Task Assignment** — Match pending tasks to available agents
|
|
3. **Recovery Orchestration** — Reassign tasks from failed/stale agents
|
|
4. **Dependency Resolution** — Ensure tasks wait for dependencies
|
|
5. **Resource Management** — Enforce agent concurrency limits
|
|
|
|
### Architecture
|
|
|
|
```typescript
|
|
// Coordinator Service (NestJS)
|
|
@Injectable()
|
|
export class CoordinatorService {
|
|
private coordinatorLock: string;
|
|
private isRunning: boolean = false;
|
|
|
|
constructor(
|
|
private readonly taskManager: TaskManagerService,
|
|
private readonly agentManager: AgentManagerService,
|
|
private readonly valkey: Valkey,
|
|
private readonly prisma: PrismaService,
|
|
private readonly logger: Logger
|
|
) {}
|
|
|
|
// Main coordination loop
|
|
@Cron("*/30 * * * * *") // Every 30 seconds
|
|
async coordinate() {
|
|
if (!(await this.acquireLock())) {
|
|
return; // Another coordinator is active
|
|
}
|
|
|
|
try {
|
|
await this.checkAgentHealth();
|
|
await this.assignPendingTasks();
|
|
await this.resolveDependencies();
|
|
await this.recoverFailedTasks();
|
|
} catch (error) {
|
|
this.logger.error("Coordination cycle failed", error);
|
|
} finally {
|
|
await this.releaseLock();
|
|
}
|
|
}
|
|
|
|
// Distributed lock to prevent multiple coordinators
|
|
private async acquireLock(): Promise<boolean> {
|
|
const lockKey = `coordinator:lock:global`;
|
|
const result = await this.valkey.set(
|
|
lockKey,
|
|
process.env.HOSTNAME || "coordinator",
|
|
"NX",
|
|
"EX",
|
|
30
|
|
);
|
|
return result === "OK";
|
|
}
|
|
|
|
// Check agent heartbeats and mark stale
|
|
private async checkAgentHealth() {
|
|
const agents = await this.agentManager.getCoordinatorManagedAgents();
|
|
const now = Date.now();
|
|
|
|
for (const agent of agents) {
|
|
const heartbeatKey = `agent:heartbeat:${agent.id}`;
|
|
const lastHeartbeat = await this.valkey.get(heartbeatKey);
|
|
|
|
if (!lastHeartbeat) {
|
|
// No heartbeat - agent is stale
|
|
await this.handleStaleAgent(agent);
|
|
} else {
|
|
const heartbeatData = JSON.parse(lastHeartbeat);
|
|
const age = now - heartbeatData.timestamp;
|
|
|
|
if (age > agent.staleThresholdSeconds * 1000) {
|
|
await this.handleStaleAgent(agent);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Assign pending tasks to available agents
|
|
private async assignPendingTasks() {
|
|
const workspaces = await this.getActiveWorkspaces();
|
|
|
|
for (const workspace of workspaces) {
|
|
const pendingTasks = await this.taskManager.getPendingTasks(workspace.id, {
|
|
orderBy: { priority: "desc", createdAt: "asc" },
|
|
});
|
|
|
|
for (const task of pendingTasks) {
|
|
// Check dependencies
|
|
if (!(await this.areDependenciesMet(task))) {
|
|
continue;
|
|
}
|
|
|
|
// Find available agent
|
|
const agent = await this.agentManager.findAvailableAgent(
|
|
workspace.id,
|
|
task.taskType,
|
|
task.metadata.requiredCapabilities
|
|
);
|
|
|
|
if (agent) {
|
|
await this.assignTask(task, agent);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle stale agents
|
|
private async handleStaleAgent(agent: Agent) {
|
|
this.logger.warn(`Agent ${agent.id} is stale - recovering tasks`);
|
|
|
|
// Mark agent as ERROR
|
|
await this.agentManager.updateAgentStatus(agent.id, AgentStatus.ERROR);
|
|
|
|
// Get assigned tasks
|
|
const tasks = await this.taskManager.getTasksForAgent(agent.id);
|
|
|
|
for (const task of tasks) {
|
|
await this.recoverTask(task, "agent_stale");
|
|
}
|
|
}
|
|
|
|
// Recover a task from failure
|
|
private async recoverTask(task: AgentTask, reason: string) {
|
|
// Log the failure
|
|
await this.taskManager.logTaskEvent(task.id, {
|
|
level: "ERROR",
|
|
event: "task_recovery",
|
|
message: `Task recovery initiated: ${reason}`,
|
|
previousStatus: task.status,
|
|
newStatus: "ABORTED",
|
|
});
|
|
|
|
// Check retry limit
|
|
if (task.retryCount >= task.maxRetries) {
|
|
await this.taskManager.updateTask(task.id, {
|
|
status: "FAILED",
|
|
lastError: `Max retries exceeded (${task.retryCount}/${task.maxRetries})`,
|
|
failedAt: new Date(),
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Abort current assignment
|
|
await this.taskManager.updateTask(task.id, {
|
|
status: "ABORTED",
|
|
agentId: null,
|
|
sessionKey: null,
|
|
retryCount: task.retryCount + 1,
|
|
});
|
|
|
|
// Wait for backoff period before requeuing
|
|
const backoffMs = task.retryBackoffSeconds * 1000 * Math.pow(2, task.retryCount);
|
|
setTimeout(async () => {
|
|
await this.taskManager.updateTask(task.id, {
|
|
status: "PENDING",
|
|
});
|
|
}, backoffMs);
|
|
}
|
|
|
|
// Assign task to agent
|
|
private async assignTask(task: AgentTask, agent: Agent) {
|
|
// Acquire assignment lock
|
|
const lockKey = `task:assign_lock:${task.id}`;
|
|
const locked = await this.valkey.set(lockKey, agent.id, "NX", "EX", 5);
|
|
|
|
if (!locked) {
|
|
return; // Another coordinator already assigned this task
|
|
}
|
|
|
|
try {
|
|
// Update task
|
|
await this.taskManager.updateTask(task.id, {
|
|
status: "ASSIGNED",
|
|
agentId: agent.id,
|
|
assignedAt: new Date(),
|
|
});
|
|
|
|
// Spawn agent session via Gateway
|
|
const session = await this.spawnAgentSession(agent, task);
|
|
|
|
// Update task with session
|
|
await this.taskManager.updateTask(task.id, {
|
|
sessionKey: session.sessionKey,
|
|
status: "RUNNING",
|
|
startedAt: new Date(),
|
|
});
|
|
|
|
// Log assignment
|
|
await this.taskManager.logTaskEvent(task.id, {
|
|
level: "INFO",
|
|
event: "task_assigned",
|
|
message: `Task assigned to agent ${agent.id}`,
|
|
details: { agentId: agent.id, sessionKey: session.sessionKey },
|
|
});
|
|
} finally {
|
|
await this.valkey.del(lockKey);
|
|
}
|
|
}
|
|
|
|
// Spawn agent session via Gateway
|
|
private async spawnAgentSession(agent: Agent, task: AgentTask): Promise<AgentSession> {
|
|
// Call Gateway API to spawn subagent with task context
|
|
const response = await fetch(`${process.env.GATEWAY_URL}/api/agents/spawn`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: task.workspaceId,
|
|
agentId: agent.id,
|
|
taskId: task.id,
|
|
label: `task:${task.id}:${task.title.slice(0, 30)}`,
|
|
context: {
|
|
taskTitle: task.title,
|
|
taskDescription: task.description,
|
|
inputContext: task.inputContext,
|
|
checkpointData: task.checkpointData,
|
|
},
|
|
}),
|
|
});
|
|
|
|
const data = await response.json();
|
|
return data.session;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Coordination State Machine
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Coordinator Cycle │
|
|
└─────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌───────────────────────┐
|
|
│ Acquire Lock (30s) │
|
|
└───────────┬───────────┘
|
|
│
|
|
┌────────▼────────┐
|
|
│ Check Health │
|
|
│ - Read agents │
|
|
│ - Check ttls │
|
|
│ - Mark stale │
|
|
└────────┬────────┘
|
|
│
|
|
┌────────▼────────┐
|
|
│ Assign Tasks │
|
|
│ - Read pending │
|
|
│ - Match agents │
|
|
│ - Spawn sessions│
|
|
└────────┬────────┘
|
|
│
|
|
┌────────▼────────┐
|
|
│ Resolve Deps │
|
|
│ - Check waiting │
|
|
│ - Unblock ready │
|
|
└────────┬────────┘
|
|
│
|
|
┌────────▼────────┐
|
|
│ Recover Failed │
|
|
│ - Requeue retry │
|
|
│ - Log failures │
|
|
└────────┬────────┘
|
|
│
|
|
┌───────────▼───────────┐
|
|
│ Release Lock │
|
|
└───────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Recovery & Resilience Patterns
|
|
|
|
### 1. Agent Failure Recovery
|
|
|
|
**Scenario:** Agent crashes mid-task.
|
|
|
|
**Detection:**
|
|
|
|
- Heartbeat TTL expires in Valkey
|
|
- Coordinator detects missing heartbeat
|
|
|
|
**Recovery:**
|
|
|
|
1. Mark agent as `ERROR` in database
|
|
2. Abort assigned tasks with `status = ABORTED`
|
|
3. Log failure with stack trace (if available)
|
|
4. Apply exponential backoff: `backoff = retryBackoffSeconds * 2^retryCount`
|
|
5. Requeue task to `PENDING` after backoff
|
|
6. Coordinator reassigns on next cycle
|
|
|
|
### 2. Gateway Restart Recovery
|
|
|
|
**Scenario:** Gateway restarts, killing all agent sessions.
|
|
|
|
**Detection:**
|
|
|
|
- All agent heartbeats stop simultaneously
|
|
- Coordinator detects mass stale agents
|
|
|
|
**Recovery:**
|
|
|
|
1. Coordinator marks all `RUNNING` tasks as `ABORTED`
|
|
2. Tasks with `checkpointData` can resume from last checkpoint
|
|
3. Tasks without checkpoints restart from scratch
|
|
4. Exponential backoff prevents thundering herd
|
|
|
|
### 3. Task Dependency Deadlock
|
|
|
|
**Scenario:** Task A depends on Task B, which depends on Task A (circular dependency).
|
|
|
|
**Detection:**
|
|
|
|
- Coordinator builds dependency graph
|
|
- Detects cycles during `resolveDependencies()`
|
|
|
|
**Recovery:**
|
|
|
|
1. Log `ERROR` with cycle details
|
|
2. Mark all tasks in cycle as `FAILED` with reason `dependency_cycle`
|
|
3. Notify workspace owner via webhook
|
|
|
|
### 4. Database Failure
|
|
|
|
**Scenario:** PostgreSQL becomes unavailable.
|
|
|
|
**Detection:**
|
|
|
|
- Prisma query fails with connection error
|
|
|
|
**Recovery:**
|
|
|
|
1. Coordinator catches error, logs to stderr
|
|
2. Releases lock (allowing failover to another instance)
|
|
3. Retries with exponential backoff: 5s, 10s, 20s, 40s
|
|
4. If database remains down >5 minutes, alert admin
|
|
|
|
### 5. Split-Brain Prevention
|
|
|
|
**Scenario:** Network partition causes two coordinators to run simultaneously.
|
|
|
|
**Prevention:**
|
|
|
|
- Distributed lock in Valkey with 30s TTL
|
|
- Coordinators must renew lock every cycle
|
|
- Only one coordinator can hold lock at a time
|
|
|
|
**Detection:**
|
|
|
|
- Task assigned to multiple agents (conflict detection)
|
|
|
|
**Recovery:**
|
|
|
|
1. Newer assignment wins (based on `assignedAt` timestamp)
|
|
2. Cancel older session
|
|
3. Log conflict for investigation
|
|
|
|
### 6. Stale Task Timeout
|
|
|
|
**Scenario:** Task runs longer than `estimatedCompletionAt + grace period`.
|
|
|
|
**Detection:**
|
|
|
|
- Coordinator checks `estimatedCompletionAt` field
|
|
- If exceeded by >30 minutes, mark as potentially hung
|
|
|
|
**Recovery:**
|
|
|
|
1. Send warning to agent session (via pub/sub)
|
|
2. If no progress update in 10 minutes, abort task
|
|
3. Log timeout error
|
|
4. Requeue with higher priority
|
|
|
|
---
|
|
|
|
## Usage Budget Management
|
|
|
|
**Version:** 1.0
|
|
**Date:** 2026-02-04
|
|
**Status:** Required for MVP
|
|
|
|
### Problem Statement
|
|
|
|
Autonomous agents using Claude Code can consume significant API tokens without proper governance. Without real-time usage tracking and budgeting, projects risk:
|
|
|
|
1. **Cost overruns** — Agents exceed budget before milestone completion
|
|
2. **Service disruption** — Hit API rate limits mid-task
|
|
3. **Unpredictable momentum** — Can't estimate project velocity
|
|
4. **Budget exhaustion** — Agents consume entire monthly budget in days
|
|
|
|
### Requirements
|
|
|
|
The orchestration layer must provide:
|
|
|
|
- **Real-time usage tracking** — Current token usage across all active agents
|
|
- **Budget allocation** — Pre-allocate budgets per task/milestone/project
|
|
- **Usage projection** — Estimate remaining work vs remaining budget
|
|
- **Throttling decisions** — Pause/slow agents approaching limits
|
|
- **Cost optimization** — Route tasks to appropriate model tiers (Haiku/Sonnet/Opus)
|
|
|
|
### Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Usage Budget Manager Service │
|
|
│ ┌───────────────────┐ ┌────────────────────────────┐ │
|
|
│ │ Usage Tracker │ │ Budget Allocator │ │
|
|
│ │ - Query API │ │ - Per-task budgets │ │
|
|
│ │ - Real-time sum │ │ - Per-milestone budgets │ │
|
|
│ │ - Agent rollup │ │ - Global limits │ │
|
|
│ └────────┬──────────┘ └──────────┬─────────────────┘ │
|
|
│ │ │ │
|
|
│ ┌────────▼────────────────────────▼─────────────────┐ │
|
|
│ │ Projection Engine │ │
|
|
│ │ - Estimate remaining work │ │
|
|
│ │ - Calculate burn rate │ │
|
|
│ │ - Predict budget exhaustion │ │
|
|
│ │ - Recommend throttle/pause │ │
|
|
│ └───────────────────────────┬───────────────────────┘ │
|
|
└────────────────────────────────┼─────────────────────────┘
|
|
│
|
|
┌────────────┼────────────┐
|
|
│ │ │
|
|
┌───────▼──────┐ ┌──▼──────┐ ┌──▼────────────┐
|
|
│Queue Manager │ │Agent Mgr│ │ Coordinator │
|
|
│- Check budget│ │- Spawn │ │ - Pre-commit │
|
|
│ before queue│ │ check │ │ validation │
|
|
└──────────────┘ └─────────┘ └───────────────┘
|
|
```
|
|
|
|
### Budget Check Points
|
|
|
|
**1. Task Assignment** (Queue Manager)
|
|
|
|
```typescript
|
|
async canAffordTask(taskId: string): Promise<BudgetDecision> {
|
|
const task = await getTask(taskId);
|
|
const currentUsage = await usageTracker.getCurrentUsage();
|
|
const budget = await budgetAllocator.getTaskBudget(taskId);
|
|
|
|
const projectedCost = estimateTaskCost(task);
|
|
const remaining = budget.limit - currentUsage.total;
|
|
|
|
if (projectedCost > remaining) {
|
|
return {
|
|
canProceed: false,
|
|
reason: 'Insufficient budget',
|
|
recommendation: 'Pause or reallocate budget',
|
|
};
|
|
}
|
|
|
|
return { canProceed: true };
|
|
}
|
|
```
|
|
|
|
**2. Agent Spawn** (Agent Manager)
|
|
|
|
Before spawning a Claude Code agent, verify budget headroom:
|
|
|
|
```typescript
|
|
async spawnAgent(config: AgentConfig): Promise<Agent> {
|
|
const budgetCheck = await usageBudgetManager.canAffordTask(config.taskId);
|
|
|
|
if (!budgetCheck.canProceed) {
|
|
throw new InsufficientBudgetError(budgetCheck.reason);
|
|
}
|
|
|
|
// Proceed with spawn
|
|
const agent = await claudeCode.spawn(config);
|
|
|
|
// Track agent for usage rollup
|
|
await usageTracker.registerAgent(agent.id, config.taskId);
|
|
|
|
return agent;
|
|
}
|
|
```
|
|
|
|
**3. Checkpoint Intervals** (Coordinator)
|
|
|
|
During task execution, periodically verify budget compliance:
|
|
|
|
```typescript
|
|
async checkpointBudgetCompliance(taskId: string): Promise<void> {
|
|
const usage = await usageTracker.getTaskUsage(taskId);
|
|
const budget = await budgetAllocator.getTaskBudget(taskId);
|
|
|
|
const percentUsed = (usage.current / budget.allocated) * 100;
|
|
|
|
if (percentUsed > 90) {
|
|
await coordinator.sendWarning(taskId, 'Approaching budget limit');
|
|
}
|
|
|
|
if (percentUsed > 100) {
|
|
await coordinator.pauseTask(taskId, 'Budget exceeded');
|
|
await notifyUser(taskId, 'Task paused: budget exhausted');
|
|
}
|
|
}
|
|
```
|
|
|
|
**4. Pre-commit Validation** (Quality Gates)
|
|
|
|
Before committing work, verify usage is reasonable:
|
|
|
|
```typescript
|
|
async validateUsageEfficiency(taskId: string): Promise<ValidationResult> {
|
|
const usage = await usageTracker.getTaskUsage(taskId);
|
|
const linesChanged = await git.getChangedLines(taskId);
|
|
const testsAdded = await git.getTestFiles(taskId);
|
|
|
|
// Cost per line heuristic (adjust based on learnings)
|
|
const expectedTokensPerLine = 150; // baseline + TDD overhead
|
|
const expectedUsage = linesChanged * expectedTokensPerLine;
|
|
|
|
const efficiency = expectedUsage / usage.current;
|
|
|
|
if (efficiency < 0.5) {
|
|
return {
|
|
valid: false,
|
|
reason: 'Usage appears inefficient',
|
|
recommendation: 'Review agent logs for token waste',
|
|
};
|
|
}
|
|
|
|
return { valid: true };
|
|
}
|
|
```
|
|
|
|
### Data Model
|
|
|
|
#### `usage_budgets` Table
|
|
|
|
```sql
|
|
CREATE TABLE usage_budgets (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
workspace_id UUID NOT NULL REFERENCES workspaces(id),
|
|
|
|
-- Scope: 'global', 'project', 'milestone', 'task'
|
|
scope VARCHAR(20) NOT NULL,
|
|
scope_id VARCHAR(100), -- project_id, milestone_id, or task_id
|
|
|
|
-- Budget limits (tokens)
|
|
allocated BIGINT NOT NULL,
|
|
consumed BIGINT NOT NULL DEFAULT 0,
|
|
remaining BIGINT GENERATED ALWAYS AS (allocated - consumed) STORED,
|
|
|
|
-- Tracking
|
|
period_start TIMESTAMPTZ NOT NULL,
|
|
period_end TIMESTAMPTZ NOT NULL,
|
|
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_usage_budgets_scope ON usage_budgets(scope, scope_id);
|
|
CREATE INDEX idx_usage_budgets_workspace ON usage_budgets(workspace_id);
|
|
```
|
|
|
|
#### `agent_usage_logs` Table
|
|
|
|
```sql
|
|
CREATE TABLE agent_usage_logs (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
workspace_id UUID NOT NULL REFERENCES workspaces(id),
|
|
agent_session_id UUID NOT NULL,
|
|
task_id UUID REFERENCES agent_tasks(id),
|
|
|
|
-- Usage details
|
|
input_tokens BIGINT NOT NULL,
|
|
output_tokens BIGINT NOT NULL,
|
|
total_tokens BIGINT NOT NULL,
|
|
model VARCHAR(100) NOT NULL, -- 'claude-sonnet-4', 'claude-haiku-3.5', etc.
|
|
|
|
-- Cost tracking
|
|
estimated_cost_usd DECIMAL(10, 6),
|
|
|
|
-- Context
|
|
operation VARCHAR(100), -- 'task_execution', 'quality_review', etc.
|
|
|
|
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_agent_usage_task ON agent_usage_logs(task_id);
|
|
CREATE INDEX idx_agent_usage_session ON agent_usage_logs(agent_session_id);
|
|
CREATE INDEX idx_agent_usage_workspace ON agent_usage_logs(workspace_id);
|
|
```
|
|
|
|
### Valkey/Redis Keys
|
|
|
|
```
|
|
# Current usage (real-time)
|
|
usage:current:{workspace_id} -> HASH
|
|
{
|
|
"total_tokens": "1234567",
|
|
"total_cost_usd": "12.34",
|
|
"last_updated": "2026-02-04T10:30:00Z"
|
|
}
|
|
|
|
# Per-task usage
|
|
usage:task:{task_id} -> HASH
|
|
{
|
|
"allocated": "50000",
|
|
"consumed": "23456",
|
|
"remaining": "26544",
|
|
"agents": ["agent-1", "agent-2"]
|
|
}
|
|
|
|
# Budget alerts
|
|
usage:alerts:{workspace_id} -> LIST
|
|
[
|
|
{
|
|
"task_id": "task-123",
|
|
"level": "warning",
|
|
"percent_used": 92,
|
|
"message": "Task approaching budget limit"
|
|
}
|
|
]
|
|
```
|
|
|
|
### Cost Estimation Formulas
|
|
|
|
Based on autonomous execution learnings:
|
|
|
|
```typescript
|
|
function estimateTaskCost(task: Task): number {
|
|
const baselineTokens = task.estimatedComplexity * 1000; // tokens per complexity point
|
|
|
|
// Overhead factors
|
|
const tddOverhead = 1.2; // +20% for test writing
|
|
const baselineBuffer = 1.3; // +30% general buffer
|
|
const phaseBuffer = 1.15; // +15% phase-specific uncertainty
|
|
|
|
const estimated = baselineTokens * tddOverhead * baselineBuffer * phaseBuffer;
|
|
|
|
return Math.ceil(estimated);
|
|
}
|
|
```
|
|
|
|
### Model Tier Optimization
|
|
|
|
Route tasks to appropriate model tiers for cost efficiency:
|
|
|
|
| Model | Cost/MTok (input) | Cost/MTok (output) | Use Case |
|
|
| ---------------- | ----------------- | ------------------ | ------------------------------------ |
|
|
| Claude Haiku 3.5 | $0.80 | $4.00 | Simple CRUD, boilerplate, linting |
|
|
| Claude Sonnet 4 | $3.00 | $15.00 | Standard development, refactoring |
|
|
| Claude Opus 4 | $15.00 | $75.00 | Complex architecture, critical fixes |
|
|
|
|
**Routing logic:**
|
|
|
|
```typescript
|
|
function selectModel(task: Task): ModelTier {
|
|
if (task.priority === "critical" || task.complexity > 8) {
|
|
return "opus";
|
|
}
|
|
|
|
if (task.type === "boilerplate" || task.estimatedTokens < 10000) {
|
|
return "haiku";
|
|
}
|
|
|
|
return "sonnet"; // default
|
|
}
|
|
```
|
|
|
|
### Projection Algorithm
|
|
|
|
Predict budget exhaustion:
|
|
|
|
```typescript
|
|
async function projectBudgetExhaustion(workspaceId: string): Promise<Projection> {
|
|
const usage = await usageTracker.getCurrentUsage(workspaceId);
|
|
const budget = await budgetAllocator.getGlobalBudget(workspaceId);
|
|
|
|
const dailyBurnRate = usage.tokensLast24h;
|
|
const daysRemaining = budget.remaining / dailyBurnRate;
|
|
|
|
const exhaustionDate = new Date();
|
|
exhaustionDate.setDate(exhaustionDate.getDate() + daysRemaining);
|
|
|
|
return {
|
|
remaining_tokens: budget.remaining,
|
|
daily_burn_rate: dailyBurnRate,
|
|
days_until_exhaustion: daysRemaining,
|
|
projected_exhaustion_date: exhaustionDate,
|
|
recommendation: daysRemaining < 7 ? "THROTTLE" : "CONTINUE",
|
|
};
|
|
}
|
|
```
|
|
|
|
### Implementation Priority
|
|
|
|
**MVP (M6 Phase 3):**
|
|
|
|
- ✅ Basic usage tracking (log tokens per task)
|
|
- ✅ Simple budget checks (can afford this task?)
|
|
- ✅ Alert on budget exceeded
|
|
|
|
**Post-MVP (M6 Phase 5):**
|
|
|
|
- Projection engine (when will budget run out?)
|
|
- Model tier optimization (Haiku/Sonnet/Opus routing)
|
|
- Historical analysis (actual vs estimated)
|
|
- Budget reallocation (move budget between projects)
|
|
|
|
### Success Metrics
|
|
|
|
- **Budget accuracy**: Estimated vs actual within 20%
|
|
- **Cost optimization**: 40%+ savings from model tier routing
|
|
- **No surprise exhaustion**: Zero instances of unexpected budget depletion
|
|
- **Steady momentum**: Projects maintain velocity without budget interruptions
|
|
|
|
---
|
|
|
|
## Implementation Phases
|
|
|
|
### Phase 1: Foundation (Week 1-2)
|
|
|
|
**Goal:** Basic task and agent models, no coordination yet.
|
|
|
|
**Deliverables:**
|
|
|
|
- [ ] Database schema migration (tables, indexes)
|
|
- [ ] Prisma models for `AgentTask`, `AgentTaskLog`, `AgentHeartbeat`
|
|
- [ ] Basic CRUD API endpoints for tasks
|
|
- [ ] Agent registration API
|
|
- [ ] Manual task assignment (no automation)
|
|
|
|
**Testing:**
|
|
|
|
- Unit tests for task state machine
|
|
- Integration tests for task CRUD
|
|
- Manual testing: create task, assign to agent, complete
|
|
|
|
### Phase 2: Coordination Core (Week 3-4)
|
|
|
|
**Goal:** Autonomous coordinator with health monitoring.
|
|
|
|
**Deliverables:**
|
|
|
|
- [ ] `CoordinatorService` with distributed locking
|
|
- [ ] Health monitoring (heartbeat TTL checks)
|
|
- [ ] Automatic task assignment to available agents
|
|
- [ ] Valkey integration for runtime state
|
|
- [ ] Pub/Sub for coordination events
|
|
|
|
**Testing:**
|
|
|
|
- Unit tests for coordinator logic
|
|
- Integration tests with Valkey
|
|
- Chaos testing: kill agents, verify recovery
|
|
- Load testing: 10+ agents, 50+ tasks
|
|
|
|
### Phase 3: Recovery & Resilience (Week 5)
|
|
|
|
**Goal:** Fault-tolerant operation with automatic recovery.
|
|
|
|
**Deliverables:**
|
|
|
|
- [ ] Agent failure detection and task recovery
|
|
- [ ] Exponential backoff for retries
|
|
- [ ] Checkpoint/resume support for long-running tasks
|
|
- [ ] Dependency resolution
|
|
- [ ] Deadlock detection
|
|
|
|
**Testing:**
|
|
|
|
- Fault injection: kill agents, restart Gateway
|
|
- Dependency cycle testing
|
|
- Retry exhaustion testing
|
|
- Split-brain simulation
|
|
|
|
### Phase 4: Observability (Week 6)
|
|
|
|
**Goal:** Full visibility into orchestration state.
|
|
|
|
**Deliverables:**
|
|
|
|
- [ ] Coordinator status dashboard
|
|
- [ ] Task progress tracking UI
|
|
- [ ] Real-time logs API
|
|
- [ ] Metrics export (Prometheus format)
|
|
- [ ] Webhook integration for external monitoring
|
|
|
|
**Testing:**
|
|
|
|
- Load testing with metrics collection
|
|
- Dashboard usability testing
|
|
- Webhook reliability testing
|
|
|
|
### Phase 5: Advanced Features (Future)
|
|
|
|
**Goal:** Production-grade features.
|
|
|
|
**Deliverables:**
|
|
|
|
- [ ] Task prioritization algorithms (SJF, priority queues)
|
|
- [ ] Agent capability matching (skills-based routing)
|
|
- [ ] Task batching (group similar tasks)
|
|
- [ ] Cost tracking (token usage, compute time)
|
|
- [ ] Multi-region coordination (geo-distributed agents)
|
|
|
|
---
|
|
|
|
## Security Considerations
|
|
|
|
### Row-Level Security (RLS)
|
|
|
|
All queries must enforce workspace isolation:
|
|
|
|
```typescript
|
|
// Example: Only return tasks for user's workspace
|
|
async getTasks(userId: string, workspaceId: string) {
|
|
const membership = await this.prisma.workspaceMember.findUnique({
|
|
where: { workspaceId_userId: { workspaceId, userId } }
|
|
});
|
|
|
|
if (!membership) {
|
|
throw new ForbiddenException('Not a member of this workspace');
|
|
}
|
|
|
|
return this.prisma.agentTask.findMany({
|
|
where: { workspaceId }
|
|
});
|
|
}
|
|
```
|
|
|
|
### API Authentication
|
|
|
|
- All endpoints require valid session token
|
|
- Coordinator endpoints restricted to `ADMIN` or `OWNER` roles
|
|
- Agent heartbeat endpoints use JWT with agent-specific claims
|
|
|
|
### Secrets Management
|
|
|
|
- Task `inputContext` may contain secrets (API keys, passwords)
|
|
- Encrypt sensitive fields at rest using Prisma middleware
|
|
- Never log secrets in task logs
|
|
- Redact secrets in API responses
|
|
|
|
---
|
|
|
|
## Monitoring & Alerting
|
|
|
|
### Key Metrics
|
|
|
|
| Metric | Description | Alert Threshold |
|
|
| -------------------------------- | --------------------------------- | --------------- |
|
|
| `coordinator.cycle.duration_ms` | Coordination cycle execution time | >5000ms |
|
|
| `coordinator.stale_agents.count` | Number of stale agents detected | >5 |
|
|
| `tasks.pending.count` | Tasks waiting for assignment | >50 |
|
|
| `tasks.failed.count` | Total failed tasks (last 1h) | >10 |
|
|
| `tasks.retry.exhausted.count` | Tasks exceeding max retries | >0 |
|
|
| `agents.spawned.count` | Agent spawn rate | >100/min |
|
|
| `valkey.connection.errors` | Valkey connection failures | >0 |
|
|
|
|
### Health Checks
|
|
|
|
```http
|
|
GET /health/coordinator
|
|
{
|
|
"status": "healthy",
|
|
"coordinator": {
|
|
"active": true,
|
|
"lastCycle": "2025-01-29T14:30:00Z",
|
|
"cycleCount": 1234
|
|
},
|
|
"database": "connected",
|
|
"valkey": "connected",
|
|
"gateway": "reachable"
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## API Integration Examples
|
|
|
|
### Creating a Task from Main Agent
|
|
|
|
```typescript
|
|
// Main agent creates a development task
|
|
const task = await fetch("/api/v1/agent-tasks", {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
Authorization: `Bearer ${sessionToken}`,
|
|
},
|
|
body: JSON.stringify({
|
|
title: "Fix TypeScript strict errors in U-Connect",
|
|
description: "Run tsc --noEmit, fix all errors, commit changes",
|
|
taskType: "development",
|
|
priority: 8,
|
|
inputContext: {
|
|
repository: "u-connect",
|
|
branch: "main",
|
|
commands: ["pnpm install", "pnpm tsc:check"],
|
|
},
|
|
maxRetries: 2,
|
|
estimatedDurationMinutes: 30,
|
|
}),
|
|
});
|
|
|
|
const { id } = await task.json();
|
|
console.log(`Task created: ${id}`);
|
|
```
|
|
|
|
### Agent Heartbeat (from spawned subagent)
|
|
|
|
```typescript
|
|
// Subagent sends heartbeat every 30s
|
|
setInterval(async () => {
|
|
await fetch(`/api/v1/agents/${agentId}/heartbeat`, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
Authorization: `Bearer ${agentToken}`,
|
|
},
|
|
body: JSON.stringify({
|
|
status: "healthy",
|
|
currentTaskId: taskId,
|
|
progressPercent: 45,
|
|
currentStep: "Running tsc --noEmit",
|
|
memoryMb: 512,
|
|
cpuPercent: 35,
|
|
}),
|
|
});
|
|
}, 30000);
|
|
```
|
|
|
|
### Task Progress Update
|
|
|
|
```typescript
|
|
// Agent updates task progress
|
|
await fetch(`/api/v1/agent-tasks/${taskId}/progress`, {
|
|
method: "PATCH",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
Authorization: `Bearer ${agentToken}`,
|
|
},
|
|
body: JSON.stringify({
|
|
progressPercent: 70,
|
|
currentStep: "Fixing type errors in packages/shared",
|
|
checkpointData: {
|
|
filesProcessed: 15,
|
|
errorsFixed: 8,
|
|
remainingFiles: 5,
|
|
},
|
|
}),
|
|
});
|
|
```
|
|
|
|
### Task Completion
|
|
|
|
```typescript
|
|
// Agent marks task complete
|
|
await fetch(`/api/v1/agent-tasks/${taskId}/complete`, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
Authorization: `Bearer ${agentToken}`,
|
|
},
|
|
body: JSON.stringify({
|
|
outputResult: {
|
|
filesModified: 20,
|
|
errorsFixed: 23,
|
|
commitHash: "abc123",
|
|
buildStatus: "passing",
|
|
},
|
|
summary: "All TypeScript strict errors resolved. Build passing.",
|
|
}),
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Glossary
|
|
|
|
| Term | Definition |
|
|
| ----------------- | ------------------------------------------------------------------ |
|
|
| **Agent** | Autonomous AI instance (e.g., Claude subagent) that executes tasks |
|
|
| **Task** | Unit of work to be executed by an agent |
|
|
| **Coordinator** | Background service that assigns tasks and monitors agent health |
|
|
| **Heartbeat** | Periodic signal from agent indicating it's alive and working |
|
|
| **Stale Agent** | Agent that has stopped sending heartbeats (assumed dead) |
|
|
| **Checkpoint** | Snapshot of task state allowing resumption after failure |
|
|
| **Workspace** | Tenant isolation boundary (all tasks belong to a workspace) |
|
|
| **Session** | Gateway-managed connection between user and agent |
|
|
| **Orchestration** | Automated coordination of multiple agents working on tasks |
|
|
|
|
---
|
|
|
|
## References
|
|
|
|
- [Mosaic Stack Architecture](../3-architecture/README.md)
|
|
- [Existing Agent/AgentSession Schema](../../apps/api/prisma/schema.prisma)
|
|
- [NestJS Task Scheduling](https://docs.nestjs.com/techniques/task-scheduling)
|
|
- [Valkey Documentation](https://valkey.io/docs/)
|
|
- [PostgreSQL Advisory Locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS)
|
|
|
|
---
|
|
|
|
**Next Steps:**
|
|
|
|
1. Review and approve this design document
|
|
2. Create GitHub issues for Phase 1 tasks
|
|
3. Set up development branch: `feature/agent-orchestration`
|
|
4. Begin database schema migration
|
|
|
|
**Questions/Feedback:** Open an issue in `mosaic-stack` repo with label `orchestration`.
|