All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
497 lines
15 KiB
TypeScript
497 lines
15 KiB
TypeScript
import {
|
|
Controller,
|
|
Post,
|
|
Get,
|
|
Body,
|
|
Param,
|
|
NotFoundException,
|
|
Logger,
|
|
UsePipes,
|
|
ValidationPipe,
|
|
HttpCode,
|
|
UseGuards,
|
|
ParseUUIDPipe,
|
|
Sse,
|
|
MessageEvent,
|
|
Query,
|
|
Request,
|
|
} from "@nestjs/common";
|
|
import type { AgentConversationMessage } from "@prisma/client";
|
|
import { Throttle } from "@nestjs/throttler";
|
|
import { Observable } from "rxjs";
|
|
import { QueueService } from "../../queue/queue.service";
|
|
import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
|
|
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
|
|
import { KillswitchService } from "../../killswitch/killswitch.service";
|
|
import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
|
|
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
|
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
|
|
import { AgentEventsService } from "./agent-events.service";
|
|
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
|
|
import { AgentMessagesService } from "./agent-messages.service";
|
|
import { AgentControlService } from "./agent-control.service";
|
|
import { AgentTreeService } from "./agent-tree.service";
|
|
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
|
|
import { InjectAgentDto } from "./dto/inject-agent.dto";
|
|
import { PauseAgentDto, ResumeAgentDto } from "./dto/control-agent.dto";
|
|
|
|
/**
|
|
* Controller for agent management endpoints
|
|
*
|
|
* All endpoints require API key authentication via X-API-Key header.
|
|
* Set ORCHESTRATOR_API_KEY environment variable to configure the expected key.
|
|
*
|
|
* Rate limits:
|
|
* - Status endpoints: 200 requests/minute
|
|
* - Spawn/kill endpoints: 10 requests/minute (strict)
|
|
* - Default: 100 requests/minute
|
|
*/
|
|
@Controller("agents")
|
|
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
|
|
export class AgentsController {
|
|
private readonly logger = new Logger(AgentsController.name);
|
|
|
|
constructor(
|
|
private readonly queueService: QueueService,
|
|
private readonly spawnerService: AgentSpawnerService,
|
|
private readonly lifecycleService: AgentLifecycleService,
|
|
private readonly killswitchService: KillswitchService,
|
|
private readonly eventsService: AgentEventsService,
|
|
private readonly messagesService: AgentMessagesService,
|
|
private readonly agentControlService: AgentControlService,
|
|
private readonly agentTreeService: AgentTreeService
|
|
) {}
|
|
|
|
/**
|
|
* Spawn a new agent for the given task
|
|
* @param dto Spawn agent request
|
|
* @returns Agent spawn response with agentId and status
|
|
*/
|
|
@Post("spawn")
|
|
@Throttle({ strict: { limit: 10, ttl: 60000 } })
|
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
|
async spawn(@Body() dto: SpawnAgentDto): Promise<SpawnAgentResponseDto> {
|
|
this.logger.log(`Received spawn request for task: ${dto.taskId}`);
|
|
|
|
try {
|
|
// Validation is handled by:
|
|
// 1. ValidationPipe + DTO decorators at the HTTP layer
|
|
// 2. AgentSpawnerService.validateSpawnRequest for business logic
|
|
|
|
// Spawn agent using spawner service
|
|
const spawnResponse = this.spawnerService.spawnAgent({
|
|
taskId: dto.taskId,
|
|
...(dto.parentAgentId !== undefined ? { parentAgentId: dto.parentAgentId } : {}),
|
|
agentType: dto.agentType,
|
|
context: dto.context,
|
|
});
|
|
|
|
// Persist initial lifecycle state in Valkey.
|
|
await this.lifecycleService.registerSpawnedAgent(spawnResponse.agentId, dto.taskId);
|
|
|
|
// Queue task in Valkey
|
|
await this.queueService.addTask(dto.taskId, dto.context, {
|
|
priority: 5, // Default priority
|
|
});
|
|
|
|
this.logger.log(`Agent spawned successfully: ${spawnResponse.agentId}`);
|
|
|
|
// Return response
|
|
return {
|
|
agentId: spawnResponse.agentId,
|
|
status: "spawning",
|
|
};
|
|
} catch (error) {
|
|
this.logger.error(`Failed to spawn agent: ${String(error)}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stream orchestrator events as server-sent events (SSE)
|
|
*/
|
|
@Sse("events")
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
streamEvents(): Observable<MessageEvent> {
|
|
return new Observable<MessageEvent>((subscriber) => {
|
|
let isClosed = false;
|
|
|
|
const unsubscribe = this.eventsService.subscribe((event) => {
|
|
if (!isClosed) {
|
|
subscriber.next({ data: event });
|
|
}
|
|
});
|
|
|
|
void this.eventsService.getInitialSnapshot().then((snapshot) => {
|
|
if (!isClosed) {
|
|
subscriber.next({ data: snapshot });
|
|
}
|
|
});
|
|
|
|
const heartbeat = setInterval(() => {
|
|
if (!isClosed) {
|
|
subscriber.next({ data: this.eventsService.createHeartbeat() });
|
|
}
|
|
}, 15000);
|
|
|
|
return () => {
|
|
isClosed = true;
|
|
clearInterval(heartbeat);
|
|
unsubscribe();
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Return recent orchestrator events for non-streaming consumers.
|
|
*/
|
|
@Get("events/recent")
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
getRecentEvents(@Query("limit") limit?: string): {
|
|
events: ReturnType<AgentEventsService["getRecentEvents"]>;
|
|
} {
|
|
const parsedLimit = Number.parseInt(limit ?? "100", 10);
|
|
return {
|
|
events: this.eventsService.getRecentEvents(Number.isNaN(parsedLimit) ? 100 : parsedLimit),
|
|
};
|
|
}
|
|
|
|
@Get("tree")
|
|
@UseGuards(OrchestratorApiKeyGuard)
|
|
@Throttle({ default: { limit: 200, ttl: 60000 } })
|
|
async getAgentTree(): Promise<AgentTreeResponseDto[]> {
|
|
return this.agentTreeService.getTree();
|
|
}
|
|
|
|
/**
|
|
* List all agents
|
|
* @returns Array of all agent sessions with their status
|
|
*/
|
|
@Get()
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
listAgents(): {
|
|
agentId: string;
|
|
taskId: string;
|
|
status: string;
|
|
agentType: string;
|
|
spawnedAt: string;
|
|
completedAt?: string;
|
|
error?: string;
|
|
}[] {
|
|
this.logger.log("Received request to list all agents");
|
|
|
|
try {
|
|
// Get all sessions from spawner service
|
|
const sessions = this.spawnerService.listAgentSessions();
|
|
|
|
// Map to response format
|
|
const agents = sessions.map((session) => ({
|
|
agentId: session.agentId,
|
|
taskId: session.taskId,
|
|
status: session.state,
|
|
agentType: session.agentType,
|
|
spawnedAt: session.spawnedAt.toISOString(),
|
|
completedAt: session.completedAt?.toISOString(),
|
|
error: session.error,
|
|
}));
|
|
|
|
this.logger.log(`Found ${agents.length.toString()} agents`);
|
|
|
|
return agents;
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Failed to list agents: ${errorMessage}`);
|
|
throw new Error(`Failed to list agents: ${errorMessage}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get paginated message history for an agent.
|
|
*/
|
|
@Get(":agentId/messages")
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
|
async getAgentMessages(
|
|
@Param("agentId", ParseUUIDPipe) agentId: string,
|
|
@Query() query: GetMessagesQueryDto
|
|
): Promise<{
|
|
messages: AgentConversationMessage[];
|
|
total: number;
|
|
}> {
|
|
return this.messagesService.getMessages(agentId, query.limit, query.skip);
|
|
}
|
|
|
|
/**
|
|
* Stream per-agent conversation messages as server-sent events (SSE).
|
|
*/
|
|
@Sse(":agentId/messages/stream")
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
|
|
return new Observable<MessageEvent>((subscriber) => {
|
|
let isClosed = false;
|
|
let lastSeenTimestamp = new Date();
|
|
let lastSeenMessageId: string | null = null;
|
|
|
|
const emitMessage = (message: AgentConversationMessage): void => {
|
|
if (isClosed) {
|
|
return;
|
|
}
|
|
|
|
subscriber.next({
|
|
data: this.toMessageStreamPayload(message),
|
|
});
|
|
|
|
lastSeenTimestamp = message.timestamp;
|
|
lastSeenMessageId = message.id;
|
|
};
|
|
|
|
void this.messagesService
|
|
.getReplayMessages(agentId, 50)
|
|
.then((messages) => {
|
|
if (isClosed) {
|
|
return;
|
|
}
|
|
|
|
messages.forEach((message) => {
|
|
emitMessage(message);
|
|
});
|
|
|
|
if (messages.length === 0) {
|
|
lastSeenTimestamp = new Date();
|
|
lastSeenMessageId = null;
|
|
}
|
|
})
|
|
.catch((error: unknown) => {
|
|
this.logger.error(
|
|
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
|
|
);
|
|
lastSeenTimestamp = new Date();
|
|
lastSeenMessageId = null;
|
|
});
|
|
|
|
const pollInterval = setInterval(() => {
|
|
if (isClosed) {
|
|
return;
|
|
}
|
|
|
|
void this.messagesService
|
|
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
|
|
.then((messages) => {
|
|
if (isClosed || messages.length === 0) {
|
|
return;
|
|
}
|
|
|
|
messages.forEach((message) => {
|
|
emitMessage(message);
|
|
});
|
|
})
|
|
.catch((error: unknown) => {
|
|
this.logger.error(
|
|
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
|
|
);
|
|
});
|
|
}, 1000);
|
|
|
|
const heartbeat = setInterval(() => {
|
|
if (!isClosed) {
|
|
subscriber.next({ data: { type: "heartbeat" } });
|
|
}
|
|
}, 15000);
|
|
|
|
return () => {
|
|
isClosed = true;
|
|
clearInterval(pollInterval);
|
|
clearInterval(heartbeat);
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get agent status
|
|
* @param agentId Agent ID to query
|
|
* @returns Agent status details
|
|
*/
|
|
@Get(":agentId/status")
|
|
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
|
async getAgentStatus(@Param("agentId", ParseUUIDPipe) agentId: string): Promise<{
|
|
agentId: string;
|
|
taskId: string;
|
|
status: string;
|
|
spawnedAt: string;
|
|
startedAt?: string;
|
|
completedAt?: string;
|
|
error?: string;
|
|
}> {
|
|
this.logger.log(`Received status request for agent: ${agentId}`);
|
|
|
|
try {
|
|
// Try to get from lifecycle service (Valkey)
|
|
const lifecycleState = await this.lifecycleService.getAgentLifecycleState(agentId);
|
|
|
|
if (lifecycleState) {
|
|
return {
|
|
agentId: lifecycleState.agentId,
|
|
taskId: lifecycleState.taskId,
|
|
status: lifecycleState.status,
|
|
spawnedAt: lifecycleState.startedAt ?? new Date().toISOString(),
|
|
startedAt: lifecycleState.startedAt,
|
|
completedAt: lifecycleState.completedAt,
|
|
error: lifecycleState.error,
|
|
};
|
|
}
|
|
|
|
// Fallback to spawner service (in-memory)
|
|
const session = this.spawnerService.getAgentSession(agentId);
|
|
|
|
if (session) {
|
|
return {
|
|
agentId: session.agentId,
|
|
taskId: session.taskId,
|
|
status: session.state,
|
|
spawnedAt: session.spawnedAt.toISOString(),
|
|
completedAt: session.completedAt?.toISOString(),
|
|
error: session.error,
|
|
};
|
|
}
|
|
|
|
throw new NotFoundException(`Agent ${agentId} not found`);
|
|
} catch (error: unknown) {
|
|
if (error instanceof NotFoundException) {
|
|
throw error;
|
|
}
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Failed to get agent status: ${errorMessage}`);
|
|
throw new Error(`Failed to get agent status: ${errorMessage}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Kill a single agent immediately
|
|
* @param agentId Agent ID to kill
|
|
* @returns Success message
|
|
*/
|
|
@Post(":agentId/kill")
|
|
@Throttle({ strict: { limit: 10, ttl: 60000 } })
|
|
@HttpCode(200)
|
|
async killAgent(@Param("agentId", ParseUUIDPipe) agentId: string): Promise<{ message: string }> {
|
|
this.logger.warn(`Received kill request for agent: ${agentId}`);
|
|
|
|
try {
|
|
await this.killswitchService.killAgent(agentId);
|
|
|
|
this.logger.warn(`Agent ${agentId} killed successfully`);
|
|
|
|
return {
|
|
message: `Agent ${agentId} killed successfully`,
|
|
};
|
|
} catch (error) {
|
|
this.logger.error(`Failed to kill agent ${agentId}: ${String(error)}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
@Post(":agentId/inject")
|
|
@Throttle({ default: { limit: 10, ttl: 60000 } })
|
|
@HttpCode(200)
|
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
|
async injectAgentMessage(
|
|
@Param("agentId", ParseUUIDPipe) agentId: string,
|
|
@Body() dto: InjectAgentDto,
|
|
@Request() req: { apiKey?: string }
|
|
): Promise<{ message: string }> {
|
|
const operatorId = req.apiKey ?? "operator";
|
|
await this.agentControlService.injectMessage(agentId, operatorId, dto.message);
|
|
|
|
return {
|
|
message: `Message injected into agent ${agentId}`,
|
|
};
|
|
}
|
|
|
|
@Post(":agentId/pause")
|
|
@Throttle({ default: { limit: 10, ttl: 60000 } })
|
|
@HttpCode(200)
|
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
|
async pauseAgent(
|
|
@Param("agentId", ParseUUIDPipe) agentId: string,
|
|
@Body() _dto: PauseAgentDto,
|
|
@Request() req: { apiKey?: string }
|
|
): Promise<{ message: string }> {
|
|
const operatorId = req.apiKey ?? "operator";
|
|
await this.agentControlService.pauseAgent(agentId, operatorId);
|
|
|
|
return {
|
|
message: `Agent ${agentId} paused`,
|
|
};
|
|
}
|
|
|
|
@Post(":agentId/resume")
|
|
@Throttle({ default: { limit: 10, ttl: 60000 } })
|
|
@HttpCode(200)
|
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
|
async resumeAgent(
|
|
@Param("agentId", ParseUUIDPipe) agentId: string,
|
|
@Body() _dto: ResumeAgentDto,
|
|
@Request() req: { apiKey?: string }
|
|
): Promise<{ message: string }> {
|
|
const operatorId = req.apiKey ?? "operator";
|
|
await this.agentControlService.resumeAgent(agentId, operatorId);
|
|
|
|
return {
|
|
message: `Agent ${agentId} resumed`,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Kill all active agents
|
|
* @returns Summary of kill operation
|
|
*/
|
|
@Post("kill-all")
|
|
@Throttle({ strict: { limit: 10, ttl: 60000 } })
|
|
@HttpCode(200)
|
|
async killAllAgents(): Promise<{
|
|
message: string;
|
|
total: number;
|
|
killed: number;
|
|
failed: number;
|
|
errors?: string[];
|
|
}> {
|
|
this.logger.warn("Received kill-all request");
|
|
|
|
try {
|
|
const result = await this.killswitchService.killAllAgents();
|
|
|
|
this.logger.warn(
|
|
`Kill all completed: ${result.killed.toString()} killed, ${result.failed.toString()} failed out of ${result.total.toString()}`
|
|
);
|
|
|
|
return {
|
|
message: `Kill all completed: ${result.killed.toString()} killed, ${result.failed.toString()} failed`,
|
|
...result,
|
|
};
|
|
} catch (error) {
|
|
this.logger.error(`Failed to kill all agents: ${String(error)}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private toMessageStreamPayload(message: AgentConversationMessage): {
|
|
messageId: string;
|
|
sessionId: string;
|
|
role: string;
|
|
content: string;
|
|
provider: string;
|
|
timestamp: string;
|
|
metadata: unknown;
|
|
} {
|
|
return {
|
|
messageId: message.id,
|
|
sessionId: message.sessionId,
|
|
role: message.role,
|
|
content: message.content,
|
|
provider: message.provider,
|
|
timestamp: message.timestamp.toISOString(),
|
|
metadata: message.metadata,
|
|
};
|
|
}
|
|
}
|