Files
stack/docs/design/agent-orchestration.md
Jason Woltje 12abdfe81d feat(#93): implement agent spawn via federation
Implements FED-010: Agent Spawn via Federation feature that enables
spawning and managing Claude agents on remote federated Mosaic Stack
instances via COMMAND message type.

Features:
- Federation agent command types (spawn, status, kill)
- FederationAgentService for handling agent operations
- Integration with orchestrator's agent spawner/lifecycle services
- API endpoints for spawning, querying status, and killing agents
- Full command routing through federation COMMAND infrastructure
- Comprehensive test coverage (12/12 tests passing)

Architecture:
- Hub → Spoke: Spawn agents on remote instances
- Command flow: FederationController → FederationAgentService →
  CommandService → Remote Orchestrator
- Response handling: Remote orchestrator returns agent status/results
- Security: Connection validation, signature verification

Files created:
- apps/api/src/federation/types/federation-agent.types.ts
- apps/api/src/federation/federation-agent.service.ts
- apps/api/src/federation/federation-agent.service.spec.ts

Files modified:
- apps/api/src/federation/command.service.ts (agent command routing)
- apps/api/src/federation/federation.controller.ts (agent endpoints)
- apps/api/src/federation/federation.module.ts (service registration)
- apps/orchestrator/src/api/agents/agents.controller.ts (status endpoint)
- apps/orchestrator/src/api/agents/agents.module.ts (lifecycle integration)

Testing:
- 12/12 tests passing for FederationAgentService
- All command service tests passing
- TypeScript compilation successful
- Linting passed

Refs #93

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 14:37:06 -06:00

37 KiB

Agent Orchestration Layer

Version: 1.0
Status: Design Phase
Author: Mosaic Stack Team
Date: 2025-01-29

Table of Contents

  1. Problem Statement
  2. Architecture Overview
  3. Database Schema
  4. Valkey/Redis Key Patterns
  5. API Endpoints
  6. Coordinator Service Design
  7. Recovery & Resilience Patterns
  8. Implementation Phases

Problem Statement

Current Limitations

  1. No Persistent Orchestration

    • Agents spawned in sessions have no lifecycle management beyond the session
    • Work continuity is lost when Gateway restarts or sessions end
    • No coordination between agents working on the same project
  2. No Health Monitoring

    • No visibility into agent status after spawn
    • Failed agents leave tasks in limbo
    • No automated recovery from agent failures
  3. No Task Persistence

    • Task state only exists in session memory
    • Users can't track long-running work
    • No audit trail of agent activity
  4. Conversation Interference

    • Main session conversations derail active development work
    • Context switching disrupts agent focus
    • No separation between user chat and background work

Requirements

The Agent Orchestration Layer must provide:

  • Persistent Task Management — Task definitions, state, and history survive restarts
  • Autonomous Coordination — Independent of main session; agents work continuously
  • Health Monitoring — Real-time tracking of agent status and progress
  • Automatic Recovery — Detect failures and resume work intelligently
  • Complete Audit Trail — Every state transition logged for debugging and accountability
  • Multi-Workspace Support — Row-level security and tenant isolation

Architecture Overview

High-Level Design

┌─────────────────────────────────────────────────────────────────┐
│                         API Layer                                │
│  (NestJS Controllers + Guards + Interceptors)                   │
└────────────┬─────────────────────────────────────┬──────────────┘
             │                                     │
             │                                     │
    ┌────────▼────────┐                  ┌────────▼─────────┐
    │  Task Manager   │                  │  Agent Manager   │
    │   Service       │                  │    Service       │
    └────────┬────────┘                  └────────┬─────────┘
             │                                     │
             │         ┌───────────────────┐      │
             └─────────►   Coordinator     ◄──────┘
                       │     Service       │
                       └─────────┬─────────┘
                                 │
            ┌────────────────────┼────────────────────┐
            │                    │                    │
    ┌───────▼────────┐  ┌────────▼────────┐  ┌──────▼──────┐
    │   PostgreSQL   │  │  Valkey/Redis   │  │   Gateway   │
    │  (Persistent)  │  │   (Runtime)     │  │   (Agent    │
    │                │  │                 │  │   Spawner)  │
    └────────────────┘  └─────────────────┘  └─────────────┘

Component Responsibilities

Component Responsibility
Task Manager CRUD operations on tasks, state transitions, assignment logic
Agent Manager Agent lifecycle, health tracking, session management
Coordinator Heartbeat processing, failure detection, recovery orchestration
PostgreSQL Persistent storage of tasks, agents, sessions, logs
Valkey/Redis Runtime state, heartbeats, quick lookups, pub/sub
Gateway Agent spawning, session management, message routing

Database Schema

Task State Machine

                    ┌──────────────┐
                    │   PENDING    │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
               ┌────┤   ASSIGNED   ├────┐
               │    └──────┬───────┘    │
               │           │            │
        ┌──────▼───────┐   │   ┌────────▼──────┐
        │   RUNNING    │   │   │    PAUSED     │
        └──────┬───────┘   │   └────────┬──────┘
               │           │            │
               │    ┌──────▼───────┐    │
               └────►  COMPLETED   ◄────┘
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │   ARCHIVED   │
                    └──────────────┘

                    ┌──────────────┐
                    │    FAILED    │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │   ABORTED    │────┐
                    └──────────────┘    │
                           │            │
                           └────────────┘
                          (retry → PENDING)

New Tables

agent_tasks

Extends existing task management with orchestration-specific fields.

CREATE TYPE task_orchestration_status AS ENUM (
  'PENDING',      -- Awaiting assignment
  'ASSIGNED',     -- Assigned to an agent
  'RUNNING',      -- Agent actively working
  'PAUSED',       -- User paused
  'COMPLETED',    -- Successfully finished
  'FAILED',       -- Failed with error
  'ABORTED',      -- Terminated (stale agent, manual cancel)
  'ARCHIVED'      -- Historical record
);

CREATE TABLE agent_tasks (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,

  -- Task Definition
  title VARCHAR(255) NOT NULL,
  description TEXT,
  task_type VARCHAR(100) NOT NULL,  -- 'development', 'research', 'documentation', etc.

  -- Status & Priority
  status task_orchestration_status DEFAULT 'PENDING',
  priority INT DEFAULT 5,  -- 1 (low) to 10 (high)

  -- Assignment
  agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,
  session_key VARCHAR(255),  -- Current active session

  -- Progress Tracking
  progress_percent INT DEFAULT 0 CHECK (progress_percent BETWEEN 0 AND 100),
  current_step TEXT,
  estimated_completion_at TIMESTAMPTZ,

  -- Retry Logic
  retry_count INT DEFAULT 0,
  max_retries INT DEFAULT 3,
  retry_backoff_seconds INT DEFAULT 300,  -- 5 minutes
  last_error TEXT,

  -- Dependencies
  depends_on UUID[] DEFAULT ARRAY[]::UUID[],  -- Array of task IDs
  blocks UUID[] DEFAULT ARRAY[]::UUID[],      -- Tasks blocked by this one

  -- Context
  input_context JSONB DEFAULT '{}',   -- Input data/params for agent
  output_result JSONB,                -- Final result from agent
  checkpoint_data JSONB DEFAULT '{}', -- Resumable state

  -- Metadata
  metadata JSONB DEFAULT '{}',
  tags VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[],

  -- Audit
  created_by UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW(),
  assigned_at TIMESTAMPTZ,
  started_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ,
  failed_at TIMESTAMPTZ,

  CONSTRAINT fk_workspace FOREIGN KEY (workspace_id) REFERENCES workspaces(id),
  CONSTRAINT fk_agent FOREIGN KEY (agent_id) REFERENCES agents(id)
);

CREATE INDEX idx_agent_tasks_workspace ON agent_tasks(workspace_id);
CREATE INDEX idx_agent_tasks_status ON agent_tasks(status) WHERE status IN ('PENDING', 'ASSIGNED', 'RUNNING');
CREATE INDEX idx_agent_tasks_agent ON agent_tasks(agent_id) WHERE agent_id IS NOT NULL;
CREATE INDEX idx_agent_tasks_priority ON agent_tasks(priority DESC, created_at ASC) WHERE status = 'PENDING';
CREATE INDEX idx_agent_tasks_depends_on ON agent_tasks USING gin(depends_on);
CREATE INDEX idx_agent_tasks_tags ON agent_tasks USING gin(tags);

agent_task_logs

Immutable audit trail of all task state transitions and agent actions.

CREATE TYPE task_log_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL');

CREATE TABLE agent_task_logs (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
  task_id UUID NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE,
  agent_id UUID REFERENCES agents(id) ON DELETE SET NULL,

  -- Log Entry
  level task_log_level DEFAULT 'INFO',
  event VARCHAR(100) NOT NULL,  -- 'state_transition', 'progress_update', 'error', etc.
  message TEXT,
  details JSONB DEFAULT '{}',

  -- State Snapshot
  previous_status task_orchestration_status,
  new_status task_orchestration_status,

  -- Context
  session_key VARCHAR(255),
  stack_trace TEXT,

  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_task_logs_task ON agent_task_logs(task_id, created_at DESC);
CREATE INDEX idx_task_logs_workspace ON agent_task_logs(workspace_id, created_at DESC);
CREATE INDEX idx_task_logs_level ON agent_task_logs(level) WHERE level IN ('ERROR', 'CRITICAL');
CREATE INDEX idx_task_logs_event ON agent_task_logs(event);

agent_heartbeats

Short-lived records for health monitoring (TTL enforced in Valkey).

CREATE TABLE agent_heartbeats (
  id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
  agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,

  -- Health Data
  status VARCHAR(50) NOT NULL,  -- 'healthy', 'degraded', 'stale'
  current_task_id UUID REFERENCES agent_tasks(id) ON DELETE SET NULL,
  progress_percent INT,

  -- Resource Usage
  memory_mb INT,
  cpu_percent INT,

  -- Timing
  last_seen_at TIMESTAMPTZ DEFAULT NOW(),
  next_expected_at TIMESTAMPTZ,

  metadata JSONB DEFAULT '{}'
);

CREATE INDEX idx_heartbeats_agent ON agent_heartbeats(agent_id, last_seen_at DESC);
CREATE INDEX idx_heartbeats_stale ON agent_heartbeats(last_seen_at) WHERE status != 'healthy';

-- Cleanup function (run periodically)
CREATE OR REPLACE FUNCTION cleanup_old_heartbeats()
RETURNS void AS $$
BEGIN
  DELETE FROM agent_heartbeats
  WHERE last_seen_at < NOW() - INTERVAL '1 hour';
END;
$$ LANGUAGE plpgsql;

Updates to Existing Tables

Extend agents table

-- Add orchestration fields to existing agents table
ALTER TABLE agents ADD COLUMN IF NOT EXISTS coordinator_enabled BOOLEAN DEFAULT true;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS max_concurrent_tasks INT DEFAULT 1;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS heartbeat_interval_seconds INT DEFAULT 30;
ALTER TABLE agents ADD COLUMN IF NOT EXISTS stale_threshold_seconds INT DEFAULT 180;  -- 3 minutes
ALTER TABLE agents ADD COLUMN IF NOT EXISTS capabilities VARCHAR(100)[] DEFAULT ARRAY[]::VARCHAR[];

CREATE INDEX idx_agents_coordinator ON agents(coordinator_enabled) WHERE coordinator_enabled = true;

Valkey/Redis Key Patterns

Valkey is used for:

  • Real-time state (fast reads/writes)
  • Pub/Sub messaging (coordination events)
  • Distributed locks (prevent race conditions)
  • Session data (ephemeral context)

Key Naming Convention

{namespace}:{workspace_id}:{entity_type}:{entity_id}:{property}

Key Patterns

Task Queue

# Pending tasks (sorted set by priority + timestamp)
ZADD tasks:pending:{workspace_id} {priority}:{timestamp} {task_id}

# Assigned tasks (hash per agent)
HSET agent:tasks:{agent_id} {task_id} {assigned_at}

# Active sessions (set)
SADD tasks:active:{workspace_id} {task_id}:{session_key}

Agent Status

# Agent heartbeat (string with TTL)
SETEX agent:heartbeat:{agent_id} 60 "{\"status\":\"running\",\"task_id\":\"{task_id}\",\"timestamp\":{ts}}"

# Agent status (hash)
HSET agent:status:{agent_id}
  status "running"
  current_task "{task_id}"
  last_heartbeat {timestamp}

# Stale agents (sorted set by last heartbeat)
ZADD coordinator:stale_agents {timestamp} {agent_id}

Coordination

# Coordinator lock (to prevent multiple coordinators)
SET coordinator:lock:{workspace_id} {coordinator_instance_id} NX EX 30

# Task assignment lock
SET task:assign_lock:{task_id} {agent_id} NX EX 5

# Recovery queue (list)
LPUSH coordinator:recovery_queue:{workspace_id} {task_id}

Pub/Sub Channels

# Task events
PUBLISH tasks:events:{workspace_id} "{\"event\":\"task_completed\",\"task_id\":\"{task_id}\"}"

# Agent events
PUBLISH agents:events:{workspace_id} "{\"event\":\"agent_stale\",\"agent_id\":\"{agent_id}\"}"

# Coordinator commands
PUBLISH coordinator:commands:{workspace_id} "{\"command\":\"reassign_task\",\"task_id\":\"{task_id}\"}"

Session Data (TTL: 1 hour)

# Session context (hash with TTL)
HSET session:context:{session_key}
  workspace_id "{workspace_id}"
  agent_id "{agent_id}"
  task_id "{task_id}"
  started_at {timestamp}
EXPIRE session:context:{session_key} 3600

Data Lifecycle

Key Type TTL Cleanup Strategy
agent:heartbeat:* 60s Auto-expire
agent:status:* None Delete on agent termination
session:context:* 1h Auto-expire
tasks:pending:* None Remove on assignment
coordinator:lock:* 30s Auto-expire (renewed by active coordinator)
task:assign_lock:* 5s Auto-expire after assignment

API Endpoints

Task Management

// Create a new orchestration task
POST /api/v1/agent-tasks
{
  "title": "Fix TypeScript strict errors in U-Connect",
  "description": "...",
  "taskType": "development",
  "priority": 8,
  "inputContext": {
    "repository": "u-connect",
    "branch": "main",
    "scope": ["packages/shared"]
  },
  "dependsOn": [],
  "maxRetries": 3
}

// List tasks (with filtering)
GET /api/v1/agent-tasks?status=RUNNING&priority>=8&tags=typescript

// Get task details
GET /api/v1/agent-tasks/:id

// Update task (limited fields)
PATCH /api/v1/agent-tasks/:id
{
  "priority": 10,
  "status": "PAUSED"
}

// Cancel task
POST /api/v1/agent-tasks/:id/cancel
{
  "reason": "Requirements changed"
}

// Retry failed task
POST /api/v1/agent-tasks/:id/retry

// Get task logs
GET /api/v1/agent-tasks/:id/logs?level=ERROR&limit=100

Agent Management

// List agents
GET /api/v1/agents?status=WORKING&coordinatorEnabled=true

// Get agent details
GET /api/v1/agents/:id

// Register/update agent
PUT /api/v1/agents/:agentId
{
  "name": "Dev Agent #1",
  "model": "claude-sonnet-4",
  "capabilities": ["typescript", "nestjs", "prisma"],
  "maxConcurrentTasks": 2,
  "heartbeatIntervalSeconds": 30
}

// Get agent health
GET /api/v1/agents/:id/health

// Get agent tasks
GET /api/v1/agents/:id/tasks?status=RUNNING

// Terminate agent
POST /api/v1/agents/:id/terminate
{
  "reason": "Manual shutdown",
  "graceful": true  // Allow task completion
}

Coordination

// Coordinator status (internal/admin only)
GET /api/v1/coordinator/status
{
  "active": true,
  "instanceId": "coord-abc123",
  "workspaces": ["ws-1", "ws-2"],
  "lastHeartbeat": "2025-01-29T14:30:00Z",
  "stats": {
    "pendingTasks": 3,
    "runningTasks": 5,
    "healthyAgents": 8,
    "staleAgents": 1
  }
}

// Force recovery check (admin only)
POST /api/v1/coordinator/recover
{
  "workspaceId": "optional - all if omitted"
}

// Get coordination logs (admin only)
GET /api/v1/coordinator/logs?workspace={id}&level=ERROR

Webhooks (for external integration)

// Task state change webhook
POST {configured_webhook_url}
{
  "event": "task.status_changed",
  "taskId": "task-123",
  "previousStatus": "RUNNING",
  "newStatus": "COMPLETED",
  "workspaceId": "ws-1",
  "timestamp": "2025-01-29T14:30:00Z"
}

Coordinator Service Design

Core Responsibilities

  1. Health Monitoring — Check agent heartbeats, mark stale agents
  2. Task Assignment — Match pending tasks to available agents
  3. Recovery Orchestration — Reassign tasks from failed/stale agents
  4. Dependency Resolution — Ensure tasks wait for dependencies
  5. Resource Management — Enforce agent concurrency limits

Architecture

// Coordinator Service (NestJS)
@Injectable()
export class CoordinatorService {
  private coordinatorLock: string;
  private isRunning: boolean = false;

  constructor(
    private readonly taskManager: TaskManagerService,
    private readonly agentManager: AgentManagerService,
    private readonly valkey: Valkey,
    private readonly prisma: PrismaService,
    private readonly logger: Logger
  ) {}

  // Main coordination loop
  @Cron("*/30 * * * * *") // Every 30 seconds
  async coordinate() {
    if (!(await this.acquireLock())) {
      return; // Another coordinator is active
    }

    try {
      await this.checkAgentHealth();
      await this.assignPendingTasks();
      await this.resolveDependencies();
      await this.recoverFailedTasks();
    } catch (error) {
      this.logger.error("Coordination cycle failed", error);
    } finally {
      await this.releaseLock();
    }
  }

  // Distributed lock to prevent multiple coordinators
  private async acquireLock(): Promise<boolean> {
    const lockKey = `coordinator:lock:global`;
    const result = await this.valkey.set(
      lockKey,
      process.env.HOSTNAME || "coordinator",
      "NX",
      "EX",
      30
    );
    return result === "OK";
  }

  // Check agent heartbeats and mark stale
  private async checkAgentHealth() {
    const agents = await this.agentManager.getCoordinatorManagedAgents();
    const now = Date.now();

    for (const agent of agents) {
      const heartbeatKey = `agent:heartbeat:${agent.id}`;
      const lastHeartbeat = await this.valkey.get(heartbeatKey);

      if (!lastHeartbeat) {
        // No heartbeat - agent is stale
        await this.handleStaleAgent(agent);
      } else {
        const heartbeatData = JSON.parse(lastHeartbeat);
        const age = now - heartbeatData.timestamp;

        if (age > agent.staleThresholdSeconds * 1000) {
          await this.handleStaleAgent(agent);
        }
      }
    }
  }

  // Assign pending tasks to available agents
  private async assignPendingTasks() {
    const workspaces = await this.getActiveWorkspaces();

    for (const workspace of workspaces) {
      const pendingTasks = await this.taskManager.getPendingTasks(workspace.id, {
        orderBy: { priority: "desc", createdAt: "asc" },
      });

      for (const task of pendingTasks) {
        // Check dependencies
        if (!(await this.areDependenciesMet(task))) {
          continue;
        }

        // Find available agent
        const agent = await this.agentManager.findAvailableAgent(
          workspace.id,
          task.taskType,
          task.metadata.requiredCapabilities
        );

        if (agent) {
          await this.assignTask(task, agent);
        }
      }
    }
  }

  // Handle stale agents
  private async handleStaleAgent(agent: Agent) {
    this.logger.warn(`Agent ${agent.id} is stale - recovering tasks`);

    // Mark agent as ERROR
    await this.agentManager.updateAgentStatus(agent.id, AgentStatus.ERROR);

    // Get assigned tasks
    const tasks = await this.taskManager.getTasksForAgent(agent.id);

    for (const task of tasks) {
      await this.recoverTask(task, "agent_stale");
    }
  }

  // Recover a task from failure
  private async recoverTask(task: AgentTask, reason: string) {
    // Log the failure
    await this.taskManager.logTaskEvent(task.id, {
      level: "ERROR",
      event: "task_recovery",
      message: `Task recovery initiated: ${reason}`,
      previousStatus: task.status,
      newStatus: "ABORTED",
    });

    // Check retry limit
    if (task.retryCount >= task.maxRetries) {
      await this.taskManager.updateTask(task.id, {
        status: "FAILED",
        lastError: `Max retries exceeded (${task.retryCount}/${task.maxRetries})`,
        failedAt: new Date(),
      });
      return;
    }

    // Abort current assignment
    await this.taskManager.updateTask(task.id, {
      status: "ABORTED",
      agentId: null,
      sessionKey: null,
      retryCount: task.retryCount + 1,
    });

    // Wait for backoff period before requeuing
    const backoffMs = task.retryBackoffSeconds * 1000 * Math.pow(2, task.retryCount);
    setTimeout(async () => {
      await this.taskManager.updateTask(task.id, {
        status: "PENDING",
      });
    }, backoffMs);
  }

  // Assign task to agent
  private async assignTask(task: AgentTask, agent: Agent) {
    // Acquire assignment lock
    const lockKey = `task:assign_lock:${task.id}`;
    const locked = await this.valkey.set(lockKey, agent.id, "NX", "EX", 5);

    if (!locked) {
      return; // Another coordinator already assigned this task
    }

    try {
      // Update task
      await this.taskManager.updateTask(task.id, {
        status: "ASSIGNED",
        agentId: agent.id,
        assignedAt: new Date(),
      });

      // Spawn agent session via Gateway
      const session = await this.spawnAgentSession(agent, task);

      // Update task with session
      await this.taskManager.updateTask(task.id, {
        sessionKey: session.sessionKey,
        status: "RUNNING",
        startedAt: new Date(),
      });

      // Log assignment
      await this.taskManager.logTaskEvent(task.id, {
        level: "INFO",
        event: "task_assigned",
        message: `Task assigned to agent ${agent.id}`,
        details: { agentId: agent.id, sessionKey: session.sessionKey },
      });
    } finally {
      await this.valkey.del(lockKey);
    }
  }

  // Spawn agent session via Gateway
  private async spawnAgentSession(agent: Agent, task: AgentTask): Promise<AgentSession> {
    // Call Gateway API to spawn subagent with task context
    const response = await fetch(`${process.env.GATEWAY_URL}/api/agents/spawn`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        workspaceId: task.workspaceId,
        agentId: agent.id,
        taskId: task.id,
        label: `task:${task.id}:${task.title.slice(0, 30)}`,
        context: {
          taskTitle: task.title,
          taskDescription: task.description,
          inputContext: task.inputContext,
          checkpointData: task.checkpointData,
        },
      }),
    });

    const data = await response.json();
    return data.session;
  }
}

Coordination State Machine

┌─────────────────────────────────────────────────────────┐
│                   Coordinator Cycle                      │
└─────────────────────────────────────────────────────────┘
                          │
                          ▼
              ┌───────────────────────┐
              │   Acquire Lock (30s)  │
              └───────────┬───────────┘
                          │
                 ┌────────▼────────┐
                 │  Check Health   │
                 │  - Read agents  │
                 │  - Check ttls   │
                 │  - Mark stale   │
                 └────────┬────────┘
                          │
                 ┌────────▼────────┐
                 │ Assign Tasks    │
                 │ - Read pending  │
                 │ - Match agents  │
                 │ - Spawn sessions│
                 └────────┬────────┘
                          │
                 ┌────────▼────────┐
                 │ Resolve Deps    │
                 │ - Check waiting │
                 │ - Unblock ready │
                 └────────┬────────┘
                          │
                 ┌────────▼────────┐
                 │ Recover Failed  │
                 │ - Requeue retry │
                 │ - Log failures  │
                 └────────┬────────┘
                          │
              ┌───────────▼───────────┐
              │   Release Lock        │
              └───────────────────────┘

Recovery & Resilience Patterns

1. Agent Failure Recovery

Scenario: Agent crashes mid-task.

Detection:

  • Heartbeat TTL expires in Valkey
  • Coordinator detects missing heartbeat

Recovery:

  1. Mark agent as ERROR in database
  2. Abort assigned tasks with status = ABORTED
  3. Log failure with stack trace (if available)
  4. Apply exponential backoff: backoff = retryBackoffSeconds * 2^retryCount
  5. Requeue task to PENDING after backoff
  6. Coordinator reassigns on next cycle

2. Gateway Restart Recovery

Scenario: Gateway restarts, killing all agent sessions.

Detection:

  • All agent heartbeats stop simultaneously
  • Coordinator detects mass stale agents

Recovery:

  1. Coordinator marks all RUNNING tasks as ABORTED
  2. Tasks with checkpointData can resume from last checkpoint
  3. Tasks without checkpoints restart from scratch
  4. Exponential backoff prevents thundering herd

3. Task Dependency Deadlock

Scenario: Task A depends on Task B, which depends on Task A (circular dependency).

Detection:

  • Coordinator builds dependency graph
  • Detects cycles during resolveDependencies()

Recovery:

  1. Log ERROR with cycle details
  2. Mark all tasks in cycle as FAILED with reason dependency_cycle
  3. Notify workspace owner via webhook

4. Database Failure

Scenario: PostgreSQL becomes unavailable.

Detection:

  • Prisma query fails with connection error

Recovery:

  1. Coordinator catches error, logs to stderr
  2. Releases lock (allowing failover to another instance)
  3. Retries with exponential backoff: 5s, 10s, 20s, 40s
  4. If database remains down >5 minutes, alert admin

5. Split-Brain Prevention

Scenario: Network partition causes two coordinators to run simultaneously.

Prevention:

  • Distributed lock in Valkey with 30s TTL
  • Coordinators must renew lock every cycle
  • Only one coordinator can hold lock at a time

Detection:

  • Task assigned to multiple agents (conflict detection)

Recovery:

  1. Newer assignment wins (based on assignedAt timestamp)
  2. Cancel older session
  3. Log conflict for investigation

6. Stale Task Timeout

Scenario: Task runs longer than estimatedCompletionAt + grace period.

Detection:

  • Coordinator checks estimatedCompletionAt field
  • If exceeded by >30 minutes, mark as potentially hung

Recovery:

  1. Send warning to agent session (via pub/sub)
  2. If no progress update in 10 minutes, abort task
  3. Log timeout error
  4. Requeue with higher priority

Implementation Phases

Phase 1: Foundation (Week 1-2)

Goal: Basic task and agent models, no coordination yet.

Deliverables:

  • Database schema migration (tables, indexes)
  • Prisma models for AgentTask, AgentTaskLog, AgentHeartbeat
  • Basic CRUD API endpoints for tasks
  • Agent registration API
  • Manual task assignment (no automation)

Testing:

  • Unit tests for task state machine
  • Integration tests for task CRUD
  • Manual testing: create task, assign to agent, complete

Phase 2: Coordination Core (Week 3-4)

Goal: Autonomous coordinator with health monitoring.

Deliverables:

  • CoordinatorService with distributed locking
  • Health monitoring (heartbeat TTL checks)
  • Automatic task assignment to available agents
  • Valkey integration for runtime state
  • Pub/Sub for coordination events

Testing:

  • Unit tests for coordinator logic
  • Integration tests with Valkey
  • Chaos testing: kill agents, verify recovery
  • Load testing: 10+ agents, 50+ tasks

Phase 3: Recovery & Resilience (Week 5)

Goal: Fault-tolerant operation with automatic recovery.

Deliverables:

  • Agent failure detection and task recovery
  • Exponential backoff for retries
  • Checkpoint/resume support for long-running tasks
  • Dependency resolution
  • Deadlock detection

Testing:

  • Fault injection: kill agents, restart Gateway
  • Dependency cycle testing
  • Retry exhaustion testing
  • Split-brain simulation

Phase 4: Observability (Week 6)

Goal: Full visibility into orchestration state.

Deliverables:

  • Coordinator status dashboard
  • Task progress tracking UI
  • Real-time logs API
  • Metrics export (Prometheus format)
  • Webhook integration for external monitoring

Testing:

  • Load testing with metrics collection
  • Dashboard usability testing
  • Webhook reliability testing

Phase 5: Advanced Features (Future)

Goal: Production-grade features.

Deliverables:

  • Task prioritization algorithms (SJF, priority queues)
  • Agent capability matching (skills-based routing)
  • Task batching (group similar tasks)
  • Cost tracking (token usage, compute time)
  • Multi-region coordination (geo-distributed agents)

Security Considerations

Row-Level Security (RLS)

All queries must enforce workspace isolation:

// Example: Only return tasks for user's workspace
async getTasks(userId: string, workspaceId: string) {
  const membership = await this.prisma.workspaceMember.findUnique({
    where: { workspaceId_userId: { workspaceId, userId } }
  });

  if (!membership) {
    throw new ForbiddenException('Not a member of this workspace');
  }

  return this.prisma.agentTask.findMany({
    where: { workspaceId }
  });
}

API Authentication

  • All endpoints require valid session token
  • Coordinator endpoints restricted to ADMIN or OWNER roles
  • Agent heartbeat endpoints use JWT with agent-specific claims

Secrets Management

  • Task inputContext may contain secrets (API keys, passwords)
  • Encrypt sensitive fields at rest using Prisma middleware
  • Never log secrets in task logs
  • Redact secrets in API responses

Monitoring & Alerting

Key Metrics

Metric Description Alert Threshold
coordinator.cycle.duration_ms Coordination cycle execution time >5000ms
coordinator.stale_agents.count Number of stale agents detected >5
tasks.pending.count Tasks waiting for assignment >50
tasks.failed.count Total failed tasks (last 1h) >10
tasks.retry.exhausted.count Tasks exceeding max retries >0
agents.spawned.count Agent spawn rate >100/min
valkey.connection.errors Valkey connection failures >0

Health Checks

GET /health/coordinator
{
  "status": "healthy",
  "coordinator": {
    "active": true,
    "lastCycle": "2025-01-29T14:30:00Z",
    "cycleCount": 1234
  },
  "database": "connected",
  "valkey": "connected",
  "gateway": "reachable"
}

API Integration Examples

Creating a Task from Main Agent

// Main agent creates a development task
const task = await fetch("/api/v1/agent-tasks", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    Authorization: `Bearer ${sessionToken}`,
  },
  body: JSON.stringify({
    title: "Fix TypeScript strict errors in U-Connect",
    description: "Run tsc --noEmit, fix all errors, commit changes",
    taskType: "development",
    priority: 8,
    inputContext: {
      repository: "u-connect",
      branch: "main",
      commands: ["pnpm install", "pnpm tsc:check"],
    },
    maxRetries: 2,
    estimatedDurationMinutes: 30,
  }),
});

const { id } = await task.json();
console.log(`Task created: ${id}`);

Agent Heartbeat (from spawned subagent)

// Subagent sends heartbeat every 30s
setInterval(async () => {
  await fetch(`/api/v1/agents/${agentId}/heartbeat`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${agentToken}`,
    },
    body: JSON.stringify({
      status: "healthy",
      currentTaskId: taskId,
      progressPercent: 45,
      currentStep: "Running tsc --noEmit",
      memoryMb: 512,
      cpuPercent: 35,
    }),
  });
}, 30000);

Task Progress Update

// Agent updates task progress
await fetch(`/api/v1/agent-tasks/${taskId}/progress`, {
  method: "PATCH",
  headers: {
    "Content-Type": "application/json",
    Authorization: `Bearer ${agentToken}`,
  },
  body: JSON.stringify({
    progressPercent: 70,
    currentStep: "Fixing type errors in packages/shared",
    checkpointData: {
      filesProcessed: 15,
      errorsFixed: 8,
      remainingFiles: 5,
    },
  }),
});

Task Completion

// Agent marks task complete
await fetch(`/api/v1/agent-tasks/${taskId}/complete`, {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    Authorization: `Bearer ${agentToken}`,
  },
  body: JSON.stringify({
    outputResult: {
      filesModified: 20,
      errorsFixed: 23,
      commitHash: "abc123",
      buildStatus: "passing",
    },
    summary: "All TypeScript strict errors resolved. Build passing.",
  }),
});

Glossary

Term Definition
Agent Autonomous AI instance (e.g., Claude subagent) that executes tasks
Task Unit of work to be executed by an agent
Coordinator Background service that assigns tasks and monitors agent health
Heartbeat Periodic signal from agent indicating it's alive and working
Stale Agent Agent that has stopped sending heartbeats (assumed dead)
Checkpoint Snapshot of task state allowing resumption after failure
Workspace Tenant isolation boundary (all tasks belong to a workspace)
Session Gateway-managed connection between user and agent
Orchestration Automated coordination of multiple agents working on tasks

References


Next Steps:

  1. Review and approve this design document
  2. Create GitHub issues for Phase 1 tasks
  3. Set up development branch: feature/agent-orchestration
  4. Begin database schema migration

Questions/Feedback: Open an issue in mosaic-stack repo with label orchestration.