feat(#93): implement agent spawn via federation

Implements FED-010: Agent Spawn via Federation feature that enables
spawning and managing Claude agents on remote federated Mosaic Stack
instances via COMMAND message type.

Features:
- Federation agent command types (spawn, status, kill)
- FederationAgentService for handling agent operations
- Integration with orchestrator's agent spawner/lifecycle services
- API endpoints for spawning, querying status, and killing agents
- Full command routing through federation COMMAND infrastructure
- Comprehensive test coverage (12/12 tests passing)

Architecture:
- Hub → Spoke: Spawn agents on remote instances
- Command flow: FederationController → FederationAgentService →
  CommandService → Remote Orchestrator
- Response handling: Remote orchestrator returns agent status/results
- Security: Connection validation, signature verification

Files created:
- apps/api/src/federation/types/federation-agent.types.ts
- apps/api/src/federation/federation-agent.service.ts
- apps/api/src/federation/federation-agent.service.spec.ts

Files modified:
- apps/api/src/federation/command.service.ts (agent command routing)
- apps/api/src/federation/federation.controller.ts (agent endpoints)
- apps/api/src/federation/federation.module.ts (service registration)
- apps/orchestrator/src/api/agents/agents.controller.ts (status endpoint)
- apps/orchestrator/src/api/agents/agents.module.ts (lifecycle integration)

Testing:
- 12/12 tests passing for FederationAgentService
- All command service tests passing
- TypeScript compilation successful
- Linting passed

Refs #93

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Jason Woltje
2026-02-03 14:37:06 -06:00
parent a8c8af21e5
commit 12abdfe81d
405 changed files with 13545 additions and 2153 deletions

View File

@@ -69,8 +69,8 @@ docker compose up -d valkey
### 1. Inject the Service
```typescript
import { Injectable } from '@nestjs/common';
import { ValkeyService } from './valkey/valkey.service';
import { Injectable } from "@nestjs/common";
import { ValkeyService } from "./valkey/valkey.service";
@Injectable()
export class MyService {
@@ -82,11 +82,11 @@ export class MyService {
```typescript
const task = await this.valkeyService.enqueue({
type: 'send-email',
type: "send-email",
data: {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Hello, welcome to Mosaic Stack',
to: "user@example.com",
subject: "Welcome!",
body: "Hello, welcome to Mosaic Stack",
},
});
@@ -102,11 +102,11 @@ const task = await this.valkeyService.dequeue();
if (task) {
console.log(task.status); // 'processing'
try {
// Do work...
await sendEmail(task.data);
// Mark as completed
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
@@ -129,8 +129,8 @@ const status = await this.valkeyService.getStatus(taskId);
if (status) {
console.log(status.status); // 'completed' | 'failed' | 'processing' | 'pending'
console.log(status.data); // Task metadata
console.log(status.error); // Error message if failed
console.log(status.data); // Task metadata
console.log(status.error); // Error message if failed
}
```
@@ -143,7 +143,7 @@ console.log(`${length} tasks in queue`);
// Health check
const healthy = await this.valkeyService.healthCheck();
console.log(`Valkey is ${healthy ? 'healthy' : 'down'}`);
console.log(`Valkey is ${healthy ? "healthy" : "down"}`);
// Clear queue (use with caution!)
await this.valkeyService.clearQueue();
@@ -181,12 +181,12 @@ export class EmailWorker {
private async startWorker() {
while (true) {
const task = await this.valkeyService.dequeue();
if (task) {
await this.processTask(task);
} else {
// No tasks, wait 5 seconds
await new Promise(resolve => setTimeout(resolve, 5000));
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}
@@ -194,14 +194,14 @@ export class EmailWorker {
private async processTask(task: TaskDto) {
try {
switch (task.type) {
case 'send-email':
case "send-email":
await this.sendEmail(task.data);
break;
case 'generate-report':
case "generate-report":
await this.generateReport(task.data);
break;
}
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
});
@@ -222,10 +222,10 @@ export class EmailWorker {
export class ScheduledTasks {
constructor(private readonly valkeyService: ValkeyService) {}
@Cron('0 0 * * *') // Daily at midnight
@Cron("0 0 * * *") // Daily at midnight
async dailyReport() {
await this.valkeyService.enqueue({
type: 'daily-report',
type: "daily-report",
data: { date: new Date().toISOString() },
});
}
@@ -241,6 +241,7 @@ pnpm test valkey.service.spec.ts
```
Tests cover:
- ✅ Connection and initialization
- ✅ Enqueue operations
- ✅ Dequeue FIFO behavior
@@ -254,9 +255,11 @@ Tests cover:
### ValkeyService Methods
#### `enqueue(task: EnqueueTaskDto): Promise<TaskDto>`
Add a task to the queue.
**Parameters:**
- `task.type` (string): Task type identifier
- `task.data` (object): Task metadata
@@ -265,6 +268,7 @@ Add a task to the queue.
---
#### `dequeue(): Promise<TaskDto | null>`
Get the next task from the queue (FIFO).
**Returns:** Next task with status updated to PROCESSING, or null if queue is empty
@@ -272,9 +276,11 @@ Get the next task from the queue (FIFO).
---
#### `getStatus(taskId: string): Promise<TaskDto | null>`
Retrieve task status and metadata.
**Parameters:**
- `taskId` (string): Task UUID
**Returns:** Task data or null if not found
@@ -282,9 +288,11 @@ Retrieve task status and metadata.
---
#### `updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise<TaskDto | null>`
Update task status and optionally add results or errors.
**Parameters:**
- `taskId` (string): Task UUID
- `update.status` (TaskStatus): New status
- `update.error` (string, optional): Error message for failed tasks
@@ -295,6 +303,7 @@ Update task status and optionally add results or errors.
---
#### `getQueueLength(): Promise<number>`
Get the number of tasks in queue.
**Returns:** Queue length
@@ -302,11 +311,13 @@ Get the number of tasks in queue.
---
#### `clearQueue(): Promise<void>`
Remove all tasks from queue (metadata remains until TTL).
---
#### `healthCheck(): Promise<boolean>`
Verify Valkey connectivity.
**Returns:** true if connected, false otherwise
@@ -314,6 +325,7 @@ Verify Valkey connectivity.
## Migration Notes
If upgrading from BullMQ or another queue system:
1. Task IDs are UUIDs (not incremental)
2. No built-in retry mechanism (implement in worker)
3. No job priorities (strict FIFO)
@@ -329,7 +341,7 @@ For advanced features like retries, priorities, or scheduled jobs, consider wrap
// Check Valkey connectivity
const healthy = await this.valkeyService.healthCheck();
if (!healthy) {
console.error('Valkey is not responding');
console.error("Valkey is not responding");
}
```
@@ -349,6 +361,7 @@ docker exec -it mosaic-valkey valkey-cli DEL mosaic:task:queue
### Debug Logging
The service logs all operations at `info` level. Check application logs for:
- Task enqueue/dequeue operations
- Status updates
- Connection events
@@ -356,6 +369,7 @@ The service logs all operations at `info` level. Check application logs for:
## Future Enhancements
Potential improvements for consideration:
- [ ] Task priorities (weighted queues)
- [ ] Retry mechanism with exponential backoff
- [ ] Delayed/scheduled tasks

View File

@@ -1,10 +1,10 @@
import { Test, TestingModule } from '@nestjs/testing';
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { ValkeyService } from './valkey.service';
import { TaskStatus } from './dto/task.dto';
import { Test, TestingModule } from "@nestjs/testing";
import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
import { ValkeyService } from "./valkey.service";
import { TaskStatus } from "./dto/task.dto";
// Mock ioredis module
vi.mock('ioredis', () => {
vi.mock("ioredis", () => {
// In-memory store for mocked Redis
const store = new Map<string, string>();
const lists = new Map<string, string[]>();
@@ -13,13 +13,13 @@ vi.mock('ioredis', () => {
class MockRedisClient {
// Connection methods
async ping() {
return 'PONG';
return "PONG";
}
async quit() {
return undefined;
}
on() {
return this;
}
@@ -27,9 +27,9 @@ vi.mock('ioredis', () => {
// String operations
async setex(key: string, ttl: number, value: string) {
store.set(key, value);
return 'OK';
return "OK";
}
async get(key: string) {
return store.get(key) || null;
}
@@ -43,7 +43,7 @@ vi.mock('ioredis', () => {
list.push(...values);
return list.length;
}
async lpop(key: string) {
const list = lists.get(key);
if (!list || list.length === 0) {
@@ -51,15 +51,15 @@ vi.mock('ioredis', () => {
}
return list.shift()!;
}
async llen(key: string) {
const list = lists.get(key);
return list ? list.length : 0;
}
async del(...keys: string[]) {
let deleted = 0;
keys.forEach(key => {
keys.forEach((key) => {
if (store.delete(key)) deleted++;
if (lists.delete(key)) deleted++;
});
@@ -78,16 +78,16 @@ vi.mock('ioredis', () => {
};
});
describe('ValkeyService', () => {
describe("ValkeyService", () => {
let service: ValkeyService;
let module: TestingModule;
beforeEach(async () => {
// Clear environment
process.env.VALKEY_URL = 'redis://localhost:6379';
process.env.VALKEY_URL = "redis://localhost:6379";
// Clear the mock store before each test
const Redis = await import('ioredis');
const Redis = await import("ioredis");
(Redis.default as any).__clearStore();
module = await Test.createTestingModule({
@@ -95,7 +95,7 @@ describe('ValkeyService', () => {
}).compile();
service = module.get<ValkeyService>(ValkeyService);
// Initialize the service
await service.onModuleInit();
});
@@ -104,41 +104,41 @@ describe('ValkeyService', () => {
await service.onModuleDestroy();
});
describe('initialization', () => {
it('should be defined', () => {
describe("initialization", () => {
it("should be defined", () => {
expect(service).toBeDefined();
});
it('should connect to Valkey on module init', async () => {
it("should connect to Valkey on module init", async () => {
expect(service).toBeDefined();
const healthCheck = await service.healthCheck();
expect(healthCheck).toBe(true);
});
});
describe('enqueue', () => {
it('should enqueue a task successfully', async () => {
describe("enqueue", () => {
it("should enqueue a task successfully", async () => {
const taskDto = {
type: 'test-task',
data: { message: 'Hello World' },
type: "test-task",
data: { message: "Hello World" },
};
const result = await service.enqueue(taskDto);
expect(result).toBeDefined();
expect(result.id).toBeDefined();
expect(result.type).toBe('test-task');
expect(result.data).toEqual({ message: 'Hello World' });
expect(result.type).toBe("test-task");
expect(result.data).toEqual({ message: "Hello World" });
expect(result.status).toBe(TaskStatus.PENDING);
expect(result.createdAt).toBeDefined();
expect(result.updatedAt).toBeDefined();
});
it('should increment queue length when enqueueing', async () => {
it("should increment queue length when enqueueing", async () => {
const initialLength = await service.getQueueLength();
await service.enqueue({
type: 'task-1',
type: "task-1",
data: {},
});
@@ -147,20 +147,20 @@ describe('ValkeyService', () => {
});
});
describe('dequeue', () => {
it('should return null when queue is empty', async () => {
describe("dequeue", () => {
it("should return null when queue is empty", async () => {
const result = await service.dequeue();
expect(result).toBeNull();
});
it('should dequeue tasks in FIFO order', async () => {
it("should dequeue tasks in FIFO order", async () => {
const task1 = await service.enqueue({
type: 'task-1',
type: "task-1",
data: { order: 1 },
});
const task2 = await service.enqueue({
type: 'task-2',
type: "task-2",
data: { order: 2 },
});
@@ -173,9 +173,9 @@ describe('ValkeyService', () => {
expect(dequeued2?.status).toBe(TaskStatus.PROCESSING);
});
it('should update task status to PROCESSING when dequeued', async () => {
it("should update task status to PROCESSING when dequeued", async () => {
const task = await service.enqueue({
type: 'test-task',
type: "test-task",
data: {},
});
@@ -187,73 +187,73 @@ describe('ValkeyService', () => {
});
});
describe('getStatus', () => {
it('should return null for non-existent task', async () => {
const status = await service.getStatus('non-existent-id');
describe("getStatus", () => {
it("should return null for non-existent task", async () => {
const status = await service.getStatus("non-existent-id");
expect(status).toBeNull();
});
it('should return task status for existing task', async () => {
it("should return task status for existing task", async () => {
const task = await service.enqueue({
type: 'test-task',
data: { key: 'value' },
type: "test-task",
data: { key: "value" },
});
const status = await service.getStatus(task.id);
expect(status).toBeDefined();
expect(status?.id).toBe(task.id);
expect(status?.type).toBe('test-task');
expect(status?.data).toEqual({ key: 'value' });
expect(status?.type).toBe("test-task");
expect(status?.data).toEqual({ key: "value" });
});
});
describe('updateStatus', () => {
it('should update task status to COMPLETED', async () => {
describe("updateStatus", () => {
it("should update task status to COMPLETED", async () => {
const task = await service.enqueue({
type: 'test-task',
type: "test-task",
data: {},
});
const updated = await service.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
result: { output: 'success' },
result: { output: "success" },
});
expect(updated).toBeDefined();
expect(updated?.status).toBe(TaskStatus.COMPLETED);
expect(updated?.completedAt).toBeDefined();
expect(updated?.data).toEqual({ output: 'success' });
expect(updated?.data).toEqual({ output: "success" });
});
it('should update task status to FAILED with error', async () => {
it("should update task status to FAILED with error", async () => {
const task = await service.enqueue({
type: 'test-task',
type: "test-task",
data: {},
});
const updated = await service.updateStatus(task.id, {
status: TaskStatus.FAILED,
error: 'Task failed due to error',
error: "Task failed due to error",
});
expect(updated).toBeDefined();
expect(updated?.status).toBe(TaskStatus.FAILED);
expect(updated?.error).toBe('Task failed due to error');
expect(updated?.error).toBe("Task failed due to error");
expect(updated?.completedAt).toBeDefined();
});
it('should return null when updating non-existent task', async () => {
const updated = await service.updateStatus('non-existent-id', {
it("should return null when updating non-existent task", async () => {
const updated = await service.updateStatus("non-existent-id", {
status: TaskStatus.COMPLETED,
});
expect(updated).toBeNull();
});
it('should preserve existing data when updating status', async () => {
it("should preserve existing data when updating status", async () => {
const task = await service.enqueue({
type: 'test-task',
data: { original: 'data' },
type: "test-task",
data: { original: "data" },
});
await service.updateStatus(task.id, {
@@ -261,28 +261,28 @@ describe('ValkeyService', () => {
});
const status = await service.getStatus(task.id);
expect(status?.data).toEqual({ original: 'data' });
expect(status?.data).toEqual({ original: "data" });
});
});
describe('getQueueLength', () => {
it('should return 0 for empty queue', async () => {
describe("getQueueLength", () => {
it("should return 0 for empty queue", async () => {
const length = await service.getQueueLength();
expect(length).toBe(0);
});
it('should return correct queue length', async () => {
await service.enqueue({ type: 'task-1', data: {} });
await service.enqueue({ type: 'task-2', data: {} });
await service.enqueue({ type: 'task-3', data: {} });
it("should return correct queue length", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
await service.enqueue({ type: "task-3", data: {} });
const length = await service.getQueueLength();
expect(length).toBe(3);
});
it('should decrease when tasks are dequeued', async () => {
await service.enqueue({ type: 'task-1', data: {} });
await service.enqueue({ type: 'task-2', data: {} });
it("should decrease when tasks are dequeued", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
expect(await service.getQueueLength()).toBe(2);
@@ -294,10 +294,10 @@ describe('ValkeyService', () => {
});
});
describe('clearQueue', () => {
it('should clear all tasks from queue', async () => {
await service.enqueue({ type: 'task-1', data: {} });
await service.enqueue({ type: 'task-2', data: {} });
describe("clearQueue", () => {
it("should clear all tasks from queue", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
expect(await service.getQueueLength()).toBe(2);
@@ -306,21 +306,21 @@ describe('ValkeyService', () => {
});
});
describe('healthCheck', () => {
it('should return true when Valkey is healthy', async () => {
describe("healthCheck", () => {
it("should return true when Valkey is healthy", async () => {
const healthy = await service.healthCheck();
expect(healthy).toBe(true);
});
});
describe('integration flow', () => {
it('should handle complete task lifecycle', async () => {
describe("integration flow", () => {
it("should handle complete task lifecycle", async () => {
// 1. Enqueue task
const task = await service.enqueue({
type: 'email-notification',
type: "email-notification",
data: {
to: 'user@example.com',
subject: 'Test Email',
to: "user@example.com",
subject: "Test Email",
},
});
@@ -335,8 +335,8 @@ describe('ValkeyService', () => {
const completedTask = await service.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
result: {
to: 'user@example.com',
subject: 'Test Email',
to: "user@example.com",
subject: "Test Email",
sentAt: new Date().toISOString(),
},
});
@@ -350,11 +350,11 @@ describe('ValkeyService', () => {
expect(finalStatus?.data.sentAt).toBeDefined();
});
it('should handle multiple concurrent tasks', async () => {
it("should handle multiple concurrent tasks", async () => {
const tasks = await Promise.all([
service.enqueue({ type: 'task-1', data: { id: 1 } }),
service.enqueue({ type: 'task-2', data: { id: 2 } }),
service.enqueue({ type: 'task-3', data: { id: 3 } }),
service.enqueue({ type: "task-1", data: { id: 1 } }),
service.enqueue({ type: "task-2", data: { id: 2 } }),
service.enqueue({ type: "task-3", data: { id: 3 } }),
]);
expect(await service.getQueueLength()).toBe(3);