feat(orchestrator): add SSE events, queue controls, and mosaic rails sync

This commit is contained in:
Jason Woltje
2026-02-17 15:39:15 -06:00
parent 758b2a839b
commit 3258cd4f4d
35 changed files with 1016 additions and 89 deletions

View File

@@ -45,12 +45,21 @@ Monitored via `apps/web/` (Agent Dashboard).
### Agents
| Method | Path | Description |
| ------ | ------------------------- | ---------------------- |
| POST | `/agents/spawn` | Spawn a new agent |
| GET | `/agents/:agentId/status` | Get agent status |
| POST | `/agents/:agentId/kill` | Kill a single agent |
| POST | `/agents/kill-all` | Kill all active agents |
| Method | Path | Description |
| ------ | ------------------------- | ------------------------- |
| POST | `/agents/spawn` | Spawn a new agent |
| GET | `/agents/:agentId/status` | Get agent status |
| POST | `/agents/:agentId/kill` | Kill a single agent |
| POST | `/agents/kill-all` | Kill all active agents |
| GET | `/agents/events` | SSE lifecycle/task events |
### Queue
| Method | Path | Description |
| ------ | --------------- | ---------------------------- |
| GET | `/queue/stats` | Queue depth and worker stats |
| POST | `/queue/pause` | Pause queue processing |
| POST | `/queue/resume` | Resume queue processing |
#### POST /agents/spawn
@@ -176,14 +185,17 @@ pnpm --filter @mosaic/orchestrator lint
Environment variables loaded via `@nestjs/config`. Key variables:
| Variable | Description |
| ------------------- | -------------------------------------- |
| `ORCHESTRATOR_PORT` | HTTP port (default: 3001) |
| `CLAUDE_API_KEY` | Claude API key for agents |
| `VALKEY_HOST` | Valkey/Redis host (default: localhost) |
| `VALKEY_PORT` | Valkey/Redis port (default: 6379) |
| `COORDINATOR_URL` | Quality Coordinator base URL |
| `SANDBOX_ENABLED` | Enable Docker sandbox (true/false) |
| Variable | Description |
| -------------------------------- | -------------------------------------------------- |
| `ORCHESTRATOR_PORT` | HTTP port (default: 3001) |
| `CLAUDE_API_KEY` | Claude API key for agents |
| `VALKEY_HOST` | Valkey/Redis host (default: localhost) |
| `VALKEY_PORT` | Valkey/Redis port (default: 6379) |
| `COORDINATOR_URL` | Quality Coordinator base URL |
| `SANDBOX_ENABLED` | Enable Docker sandbox (true/false) |
| `MAX_CONCURRENT_AGENTS` | Maximum concurrent in-memory sessions (default: 2) |
| `ORCHESTRATOR_QUEUE_CONCURRENCY` | BullMQ worker concurrency (default: 1) |
| `SANDBOX_DEFAULT_MEMORY_MB` | Sandbox memory limit in MB (default: 256) |
## Related Documentation

View File

@@ -0,0 +1,70 @@
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
import { randomUUID } from "crypto";
import { ValkeyService } from "../../valkey/valkey.service";
import type { EventHandler, OrchestratorEvent } from "../../valkey/types";
type UnsubscribeFn = () => void;
@Injectable()
export class AgentEventsService implements OnModuleInit {
private readonly logger = new Logger(AgentEventsService.name);
private readonly subscribers = new Map<string, EventHandler>();
private connected = false;
constructor(private readonly valkeyService: ValkeyService) {}
async onModuleInit(): Promise<void> {
if (this.connected) return;
await this.valkeyService.subscribeToEvents(
(event) => {
this.subscribers.forEach((handler) => {
void handler(event);
});
},
(error, _raw, channel) => {
this.logger.warn(`Event stream parse/validation warning on ${channel}: ${error.message}`);
}
);
this.connected = true;
this.logger.log("Agent event stream subscription active");
}
subscribe(handler: EventHandler): UnsubscribeFn {
const id = randomUUID();
this.subscribers.set(id, handler);
return () => {
this.subscribers.delete(id);
};
}
async getInitialSnapshot(): Promise<{
type: "stream.snapshot";
timestamp: string;
agents: number;
tasks: number;
}> {
const [agents, tasks] = await Promise.all([
this.valkeyService.listAgents(),
this.valkeyService.listTasks(),
]);
return {
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: agents.length,
tasks: tasks.length,
};
}
createHeartbeat(): OrchestratorEvent {
return {
type: "task.processing",
timestamp: new Date().toISOString(),
data: {
heartbeat: true,
},
};
}
}

View File

@@ -4,6 +4,7 @@ import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -20,6 +21,12 @@ describe("AgentsController - Killswitch Endpoints", () => {
};
let mockLifecycleService: {
getAgentLifecycleState: ReturnType<typeof vi.fn>;
registerSpawnedAgent: ReturnType<typeof vi.fn>;
};
let mockEventsService: {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -38,13 +45,30 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockLifecycleService = {
getAgentLifecycleState: vi.fn(),
registerSpawnedAgent: vi.fn(),
};
mockEventsService = {
subscribe: vi.fn().mockReturnValue(() => {}),
getInitialSnapshot: vi.fn().mockResolvedValue({
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: 0,
tasks: 0,
}),
createHeartbeat: vi.fn().mockReturnValue({
type: "task.processing",
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
};
controller = new AgentsController(
mockQueueService as unknown as QueueService,
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService
);
});

View File

@@ -3,6 +3,7 @@ import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -17,11 +18,17 @@ describe("AgentsController", () => {
};
let lifecycleService: {
getAgentLifecycleState: ReturnType<typeof vi.fn>;
registerSpawnedAgent: ReturnType<typeof vi.fn>;
};
let killswitchService: {
killAgent: ReturnType<typeof vi.fn>;
killAllAgents: ReturnType<typeof vi.fn>;
};
let eventsService: {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -37,6 +44,7 @@ describe("AgentsController", () => {
lifecycleService = {
getAgentLifecycleState: vi.fn(),
registerSpawnedAgent: vi.fn().mockResolvedValue(undefined),
};
killswitchService = {
@@ -44,12 +52,28 @@ describe("AgentsController", () => {
killAllAgents: vi.fn(),
};
eventsService = {
subscribe: vi.fn().mockReturnValue(() => {}),
getInitialSnapshot: vi.fn().mockResolvedValue({
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: 0,
tasks: 0,
}),
createHeartbeat: vi.fn().mockReturnValue({
type: "task.processing",
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService
);
});
@@ -195,6 +219,10 @@ describe("AgentsController", () => {
expect(queueService.addTask).toHaveBeenCalledWith(validRequest.taskId, validRequest.context, {
priority: 5,
});
expect(lifecycleService.registerSpawnedAgent).toHaveBeenCalledWith(
agentId,
validRequest.taskId
);
expect(result).toEqual({
agentId,
status: "spawning",

View File

@@ -11,8 +11,11 @@ import {
HttpCode,
UseGuards,
ParseUUIDPipe,
Sse,
MessageEvent,
} from "@nestjs/common";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
@@ -20,6 +23,7 @@ import { KillswitchService } from "../../killswitch/killswitch.service";
import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
/**
* Controller for agent management endpoints
@@ -41,7 +45,8 @@ export class AgentsController {
private readonly queueService: QueueService,
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService
) {}
/**
@@ -67,6 +72,9 @@ export class AgentsController {
context: dto.context,
});
// Persist initial lifecycle state in Valkey.
await this.lifecycleService.registerSpawnedAgent(spawnResponse.agentId, dto.taskId);
// Queue task in Valkey
await this.queueService.addTask(dto.taskId, dto.context, {
priority: 5, // Default priority
@@ -85,6 +93,41 @@ export class AgentsController {
}
}
/**
* Stream orchestrator events as server-sent events (SSE)
*/
@Sse("events")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamEvents(): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
const unsubscribe = this.eventsService.subscribe((event) => {
if (!isClosed) {
subscriber.next({ data: event });
}
});
void this.eventsService.getInitialSnapshot().then((snapshot) => {
if (!isClosed) {
subscriber.next({ data: snapshot });
}
});
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: this.eventsService.createHeartbeat() });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(heartbeat);
unsubscribe();
};
});
}
/**
* List all agents
* @returns Array of all agent sessions with their status

View File

@@ -5,10 +5,11 @@ import { SpawnerModule } from "../../spawner/spawner.module";
import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard],
providers: [OrchestratorApiKeyGuard, AgentEventsService],
})
export class AgentsModule {}

View File

@@ -0,0 +1,11 @@
import { Module } from "@nestjs/common";
import { QueueController } from "./queue.controller";
import { QueueModule } from "../../queue/queue.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
@Module({
imports: [QueueModule],
controllers: [QueueController],
providers: [OrchestratorApiKeyGuard],
})
export class QueueApiModule {}

View File

@@ -0,0 +1,65 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { QueueController } from "./queue.controller";
import { QueueService } from "../../queue/queue.service";
describe("QueueController", () => {
let controller: QueueController;
let queueService: {
getStats: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
queueService = {
getStats: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
};
controller = new QueueController(queueService as unknown as QueueService);
});
afterEach(() => {
vi.clearAllMocks();
});
it("should return queue stats", async () => {
queueService.getStats.mockResolvedValue({
pending: 5,
active: 1,
completed: 10,
failed: 2,
delayed: 0,
});
const result = await controller.getStats();
expect(queueService.getStats).toHaveBeenCalledOnce();
expect(result).toEqual({
pending: 5,
active: 1,
completed: 10,
failed: 2,
delayed: 0,
});
});
it("should pause queue processing", async () => {
queueService.pause.mockResolvedValue(undefined);
const result = await controller.pause();
expect(queueService.pause).toHaveBeenCalledOnce();
expect(result).toEqual({ message: "Queue processing paused" });
});
it("should resume queue processing", async () => {
queueService.resume.mockResolvedValue(undefined);
const result = await controller.resume();
expect(queueService.resume).toHaveBeenCalledOnce();
expect(result).toEqual({ message: "Queue processing resumed" });
});
});

View File

@@ -0,0 +1,39 @@
import { Controller, Get, HttpCode, Post, UseGuards } from "@nestjs/common";
import { Throttle } from "@nestjs/throttler";
import { QueueService } from "../../queue/queue.service";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
@Controller("queue")
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
export class QueueController {
constructor(private readonly queueService: QueueService) {}
@Get("stats")
@Throttle({ status: { limit: 200, ttl: 60000 } })
async getStats(): Promise<{
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
return this.queueService.getStats();
}
@Post("pause")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async pause(): Promise<{ message: string }> {
await this.queueService.pause();
return { message: "Queue processing paused" };
}
@Post("resume")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async resume(): Promise<{ message: string }> {
await this.queueService.resume();
return { message: "Queue processing resumed" };
}
}

View File

@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
import { ThrottlerModule } from "@nestjs/throttler";
import { HealthModule } from "./api/health/health.module";
import { AgentsModule } from "./api/agents/agents.module";
import { QueueApiModule } from "./api/queue/queue-api.module";
import { CoordinatorModule } from "./coordinator/coordinator.module";
import { BudgetModule } from "./budget/budget.module";
import { CIModule } from "./ci";
@@ -46,6 +47,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
]),
HealthModule,
AgentsModule,
QueueApiModule,
CoordinatorModule,
BudgetModule,
CIModule,

View File

@@ -157,12 +157,12 @@ describe("orchestratorConfig", () => {
});
describe("spawner config", () => {
it("should use default maxConcurrentAgents of 20 when not set", () => {
it("should use default maxConcurrentAgents of 2 when not set", () => {
delete process.env.MAX_CONCURRENT_AGENTS;
const config = orchestratorConfig();
expect(config.spawner.maxConcurrentAgents).toBe(20);
expect(config.spawner.maxConcurrentAgents).toBe(2);
});
it("should use provided maxConcurrentAgents when MAX_CONCURRENT_AGENTS is set", () => {

View File

@@ -27,7 +27,7 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({
sandbox: {
enabled: process.env.SANDBOX_ENABLED !== "false",
defaultImage: process.env.SANDBOX_DEFAULT_IMAGE ?? "node:20-alpine",
defaultMemoryMB: parseInt(process.env.SANDBOX_DEFAULT_MEMORY_MB ?? "512", 10),
defaultMemoryMB: parseInt(process.env.SANDBOX_DEFAULT_MEMORY_MB ?? "256", 10),
defaultCpuLimit: parseFloat(process.env.SANDBOX_DEFAULT_CPU_LIMIT ?? "1.0"),
networkMode: process.env.SANDBOX_NETWORK_MODE ?? "none",
},
@@ -41,9 +41,15 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({
enabled: process.env.YOLO_MODE === "true",
},
spawner: {
maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "20", 10),
maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "2", 10),
sessionCleanupDelayMs: parseInt(process.env.SESSION_CLEANUP_DELAY_MS ?? "30000", 10),
},
queue: {
name: process.env.ORCHESTRATOR_QUEUE_NAME ?? "orchestrator-tasks",
maxRetries: parseInt(process.env.ORCHESTRATOR_QUEUE_MAX_RETRIES ?? "3", 10),
baseDelay: parseInt(process.env.ORCHESTRATOR_QUEUE_BASE_DELAY_MS ?? "1000", 10),
maxDelay: parseInt(process.env.ORCHESTRATOR_QUEUE_MAX_DELAY_MS ?? "60000", 10),
concurrency: parseInt(process.env.ORCHESTRATOR_QUEUE_CONCURRENCY ?? "1", 10),
completedRetentionCount: parseInt(process.env.QUEUE_COMPLETED_RETENTION_COUNT ?? "100", 10),
completedRetentionAgeSeconds: parseInt(
process.env.QUEUE_COMPLETED_RETENTION_AGE_S ?? "3600",

View File

@@ -2,9 +2,10 @@ import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { QueueService } from "./queue.service";
import { ValkeyModule } from "../valkey/valkey.module";
import { SpawnerModule } from "../spawner/spawner.module";
@Module({
imports: [ConfigModule, ValkeyModule],
imports: [ConfigModule, ValkeyModule, SpawnerModule],
providers: [QueueService],
exports: [QueueService],
})

View File

@@ -991,12 +991,17 @@ describe("QueueService", () => {
success: true,
metadata: { attempt: 1 },
});
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith("task-123", "executing");
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(
"task-123",
"executing",
undefined
);
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.processing",
type: "task.executing",
timestamp: expect.any(String),
taskId: "task-123",
data: { attempt: 1 },
agentId: undefined,
data: { attempt: 1, dispatchedByQueue: true },
});
});

View File

@@ -1,7 +1,9 @@
import { Injectable, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Injectable, OnModuleDestroy, OnModuleInit, Optional, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Queue, Worker, Job } from "bullmq";
import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../spawner/agent-lifecycle.service";
import type { TaskContext } from "../valkey/types";
import type {
QueuedTask,
@@ -16,6 +18,7 @@ import type {
*/
@Injectable()
export class QueueService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(QueueService.name);
private queue!: Queue<QueuedTask>;
private worker!: Worker<QueuedTask, TaskProcessingResult>;
private readonly queueName: string;
@@ -23,7 +26,9 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
constructor(
private readonly valkeyService: ValkeyService,
private readonly configService: ConfigService
private readonly configService: ConfigService,
@Optional() private readonly spawnerService?: AgentSpawnerService,
@Optional() private readonly lifecycleService?: AgentLifecycleService
) {
this.queueName = this.configService.get<string>(
"orchestrator.queue.name",
@@ -132,6 +137,16 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
context,
};
// Ensure task state exists before queue lifecycle updates.
const getTaskState = (this.valkeyService as Partial<ValkeyService>).getTaskState;
const createTask = (this.valkeyService as Partial<ValkeyService>).createTask;
if (typeof getTaskState === "function" && typeof createTask === "function") {
const existingTask = await getTaskState.call(this.valkeyService, taskId);
if (!existingTask) {
await createTask.call(this.valkeyService, taskId, context);
}
}
// Add to BullMQ queue
await this.queue.add(taskId, queuedTask, {
priority: 10 - priority + 1, // BullMQ: lower number = higher priority, so invert
@@ -214,23 +229,35 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
const { taskId } = job.data;
try {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
const agentId = session?.agentId;
if (agentId) {
if (this.lifecycleService) {
await this.lifecycleService.transitionToRunning(agentId);
}
this.spawnerService?.setSessionState(agentId, "running");
}
// Update task state to executing
await this.valkeyService.updateTaskStatus(taskId, "executing");
await this.valkeyService.updateTaskStatus(taskId, "executing", agentId);
// Publish event
await this.valkeyService.publishEvent({
type: "task.processing",
type: "task.executing",
timestamp: new Date().toISOString(),
taskId,
data: { attempt: job.attemptsMade + 1 },
agentId,
data: {
attempt: job.attemptsMade + 1,
dispatchedByQueue: true,
},
});
// Task processing will be handled by agent spawner
// For now, just mark as processing
return {
success: true,
metadata: {
attempt: job.attemptsMade + 1,
...(agentId && { agentId }),
},
};
} catch (error) {
@@ -270,6 +297,14 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
* Handle task failure
*/
private async handleTaskFailure(taskId: string, error: Error): Promise<void> {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
if (session) {
this.spawnerService?.setSessionState(session.agentId, "failed", error.message, new Date());
if (this.lifecycleService) {
await this.lifecycleService.transitionToFailed(session.agentId, error.message);
}
}
await this.valkeyService.updateTaskStatus(taskId, "failed", undefined, error.message);
await this.valkeyService.publishEvent({
@@ -284,12 +319,25 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
* Handle task completion
*/
private async handleTaskCompletion(taskId: string): Promise<void> {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
if (session) {
this.spawnerService?.setSessionState(session.agentId, "completed", undefined, new Date());
if (this.lifecycleService) {
await this.lifecycleService.transitionToCompleted(session.agentId);
}
} else {
this.logger.warn(
`Queue completed task ${taskId} but no session was found; using queue-only completion state`
);
}
await this.valkeyService.updateTaskStatus(taskId, "completed");
await this.valkeyService.publishEvent({
type: "task.completed",
timestamp: new Date().toISOString(),
taskId,
...(session && { agentId: session.agentId }),
});
}
}

View File

@@ -37,6 +37,24 @@ export class AgentLifecycleService {
this.logger.log("AgentLifecycleService initialized");
}
/**
* Register a newly spawned agent in persistent state and emit spawned event.
*/
async registerSpawnedAgent(agentId: string, taskId: string): Promise<AgentState> {
await this.valkeyService.createAgent(agentId, taskId);
const createdState = await this.getAgentState(agentId);
const event: AgentEvent = {
type: "agent.spawned",
agentId,
taskId,
timestamp: new Date().toISOString(),
};
await this.valkeyService.publishEvent(event);
return createdState;
}
/**
* Acquire a per-agent mutex to serialize state transitions.
* Uses promise chaining: each caller chains onto the previous lock,

View File

@@ -116,6 +116,33 @@ export class AgentSpawnerService implements OnModuleDestroy {
return this.sessions.get(agentId);
}
/**
* Find an active session by task ID.
*/
findAgentSessionByTaskId(taskId: string): AgentSession | undefined {
return Array.from(this.sessions.values()).find((session) => session.taskId === taskId);
}
/**
* Update in-memory session state for visibility in list/status endpoints.
*/
setSessionState(
agentId: string,
state: AgentSession["state"],
error?: string,
completedAt?: Date
): void {
const session = this.sessions.get(agentId);
if (!session) return;
session.state = state;
session.error = error;
if (completedAt) {
session.completedAt = completedAt;
}
this.sessions.set(agentId, session);
}
/**
* List all agent sessions
* @returns Array of all agent sessions

View File

@@ -0,0 +1,50 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(): Promise<Response> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const upstream = await fetch(`${getOrchestratorUrl()}/agents/events`, {
method: "GET",
headers: {
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
if (!upstream.ok || !upstream.body) {
const text = await upstream.text();
return new NextResponse(text || "Failed to connect to orchestrator events stream", {
status: upstream.status || 502,
});
}
return new Response(upstream.body, {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function POST(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/pause`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function POST(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/resume`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/stats`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -2,7 +2,7 @@
* Agent Status Widget - shows running agents
*/
import { useState, useEffect } from "react";
import { useState, useEffect, useCallback } from "react";
import { Bot, Activity, AlertCircle, CheckCircle, Clock } from "lucide-react";
import type { WidgetProps } from "@mosaic/shared";
@@ -21,46 +21,57 @@ export function AgentStatusWidget({ id: _id, config: _config }: WidgetProps): Re
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const fetchAgents = useCallback(async (): Promise<void> => {
setIsLoading(true);
setError(null);
try {
const response = await fetch("/api/orchestrator/agents", {
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
throw new Error(`Failed to fetch agents: ${response.statusText}`);
}
const data = (await response.json()) as Agent[];
setAgents(data);
} catch (err: unknown) {
const errorMessage = err instanceof Error ? err.message : "Unknown error";
console.error("Failed to fetch agents:", errorMessage);
setError(errorMessage);
setAgents([]); // Clear agents on error
} finally {
setIsLoading(false);
}
}, []);
// Fetch agents from orchestrator API
useEffect(() => {
const fetchAgents = async (): Promise<void> => {
setIsLoading(true);
setError(null);
try {
const response = await fetch("/api/orchestrator/agents", {
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
throw new Error(`Failed to fetch agents: ${response.statusText}`);
}
const data = (await response.json()) as Agent[];
setAgents(data);
} catch (err: unknown) {
const errorMessage = err instanceof Error ? err.message : "Unknown error";
console.error("Failed to fetch agents:", errorMessage);
setError(errorMessage);
setAgents([]); // Clear agents on error
} finally {
setIsLoading(false);
}
};
void fetchAgents();
// Refresh every 30 seconds
const interval = setInterval(() => {
void fetchAgents();
}, 30000);
}, 20000);
const eventSource =
typeof EventSource !== "undefined" ? new EventSource("/api/orchestrator/events") : null;
if (eventSource) {
eventSource.onmessage = (): void => {
void fetchAgents();
};
eventSource.onerror = (): void => {
// polling remains fallback
};
}
return (): void => {
clearInterval(interval);
eventSource?.close();
};
}, []);
}, [fetchAgents]);
const getStatusIcon = (status: string): React.JSX.Element => {
const statusLower = status.toLowerCase();

View File

@@ -5,8 +5,8 @@
* including status, elapsed time, and work item details.
*/
import { useState, useEffect } from "react";
import { Activity, CheckCircle, XCircle, Clock, Loader2 } from "lucide-react";
import { useState, useEffect, useCallback } from "react";
import { Activity, CheckCircle, XCircle, Clock, Loader2, Pause, Play } from "lucide-react";
import type { WidgetProps } from "@mosaic/shared";
interface AgentTask {
@@ -19,6 +19,14 @@ interface AgentTask {
error?: string;
}
interface QueueStats {
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
function getElapsedTime(spawnedAt: string, completedAt?: string): string {
const start = new Date(spawnedAt).getTime();
const end = completedAt ? new Date(completedAt).getTime() : Date.now();
@@ -94,34 +102,84 @@ function getAgentTypeLabel(agentType: string): string {
export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): React.JSX.Element {
const [tasks, setTasks] = useState<AgentTask[]>([]);
const [queueStats, setQueueStats] = useState<QueueStats | null>(null);
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [isQueuePaused, setIsQueuePaused] = useState(false);
const [isActionPending, setIsActionPending] = useState(false);
const fetchTasks = useCallback(async (): Promise<void> => {
try {
const res = await fetch("/api/orchestrator/agents");
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
const data = (await res.json()) as AgentTask[];
setTasks(data);
setError(null);
setIsLoading(false);
} catch {
setError("Unable to reach orchestrator");
setIsLoading(false);
}
}, []);
const fetchQueueStats = useCallback(async (): Promise<void> => {
try {
const res = await fetch("/api/orchestrator/queue/stats");
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
const data = (await res.json()) as QueueStats;
setQueueStats(data);
// Heuristic: active=0 with pending>0 for sustained windows usually means paused.
setIsQueuePaused(data.active === 0 && data.pending > 0);
} catch {
// Keep widget functional even if queue controls are temporarily unavailable.
}
}, []);
const setQueueState = useCallback(
async (action: "pause" | "resume"): Promise<void> => {
setIsActionPending(true);
try {
const res = await fetch(`/api/orchestrator/queue/${action}`, {
method: "POST",
});
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
setIsQueuePaused(action === "pause");
await fetchQueueStats();
} catch {
setError("Unable to control queue state");
} finally {
setIsActionPending(false);
}
},
[fetchQueueStats]
);
useEffect(() => {
const fetchTasks = (): void => {
fetch("/api/orchestrator/agents")
.then((res) => {
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
return res.json() as Promise<AgentTask[]>;
})
.then((data) => {
setTasks(data);
setError(null);
setIsLoading(false);
})
.catch(() => {
setError("Unable to reach orchestrator");
setIsLoading(false);
});
};
void fetchTasks();
void fetchQueueStats();
fetchTasks();
const interval = setInterval(fetchTasks, 15000);
const interval = setInterval(() => {
void fetchTasks();
void fetchQueueStats();
}, 15000);
const eventSource =
typeof EventSource !== "undefined" ? new EventSource("/api/orchestrator/events") : null;
if (eventSource) {
eventSource.onmessage = (): void => {
void fetchTasks();
void fetchQueueStats();
};
eventSource.onerror = (): void => {
// Polling remains the resilience path.
};
}
return (): void => {
clearInterval(interval);
eventSource?.close();
};
}, []);
}, [fetchTasks, fetchQueueStats]);
const stats = {
total: tasks.length,
@@ -151,6 +209,23 @@ export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): R
return (
<div className="flex flex-col h-full space-y-3">
<div className="flex items-center justify-between">
<div className="text-xs text-gray-500 dark:text-gray-400">
Queue: {isQueuePaused ? "Paused" : "Running"}
</div>
<button
type="button"
onClick={(): void => {
void setQueueState(isQueuePaused ? "resume" : "pause");
}}
disabled={isActionPending}
className="inline-flex items-center gap-1 rounded border border-gray-300 dark:border-gray-700 px-2 py-1 text-xs hover:bg-gray-100 dark:hover:bg-gray-800 disabled:opacity-50"
>
{isQueuePaused ? <Play className="w-3 h-3" /> : <Pause className="w-3 h-3" />}
{isQueuePaused ? "Resume" : "Pause"}
</button>
</div>
{/* Summary stats */}
<div className="grid grid-cols-4 gap-1 text-center text-xs">
<div className="bg-gray-50 dark:bg-gray-800 rounded p-2">
@@ -173,6 +248,29 @@ export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): R
</div>
</div>
{queueStats && (
<div className="grid grid-cols-3 gap-1 text-center text-xs">
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.pending}
</div>
<div className="text-gray-500">Queued</div>
</div>
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.active}
</div>
<div className="text-gray-500">Workers</div>
</div>
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.failed}
</div>
<div className="text-gray-500">Failed</div>
</div>
</div>
)}
{/* Task list */}
<div className="flex-1 overflow-auto space-y-2">
{tasks.length === 0 ? (