Files
stack/apps/orchestrator/src/api/agents/agents.controller.ts

305 lines
9.0 KiB
TypeScript

import {
Controller,
Post,
Get,
Body,
Param,
NotFoundException,
Logger,
UsePipes,
ValidationPipe,
HttpCode,
UseGuards,
ParseUUIDPipe,
Sse,
MessageEvent,
Query,
} from "@nestjs/common";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
/**
* Controller for agent management endpoints
*
* All endpoints require API key authentication via X-API-Key header.
* Set ORCHESTRATOR_API_KEY environment variable to configure the expected key.
*
* Rate limits:
* - Status endpoints: 200 requests/minute
* - Spawn/kill endpoints: 10 requests/minute (strict)
* - Default: 100 requests/minute
*/
@Controller("agents")
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
export class AgentsController {
private readonly logger = new Logger(AgentsController.name);
constructor(
private readonly queueService: QueueService,
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService
) {}
/**
* Spawn a new agent for the given task
* @param dto Spawn agent request
* @returns Agent spawn response with agentId and status
*/
@Post("spawn")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async spawn(@Body() dto: SpawnAgentDto): Promise<SpawnAgentResponseDto> {
this.logger.log(`Received spawn request for task: ${dto.taskId}`);
try {
// Validation is handled by:
// 1. ValidationPipe + DTO decorators at the HTTP layer
// 2. AgentSpawnerService.validateSpawnRequest for business logic
// Spawn agent using spawner service
const spawnResponse = this.spawnerService.spawnAgent({
taskId: dto.taskId,
agentType: dto.agentType,
context: dto.context,
});
// Persist initial lifecycle state in Valkey.
await this.lifecycleService.registerSpawnedAgent(spawnResponse.agentId, dto.taskId);
// Queue task in Valkey
await this.queueService.addTask(dto.taskId, dto.context, {
priority: 5, // Default priority
});
this.logger.log(`Agent spawned successfully: ${spawnResponse.agentId}`);
// Return response
return {
agentId: spawnResponse.agentId,
status: "spawning",
};
} catch (error) {
this.logger.error(`Failed to spawn agent: ${String(error)}`);
throw error;
}
}
/**
* Stream orchestrator events as server-sent events (SSE)
*/
@Sse("events")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamEvents(): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
const unsubscribe = this.eventsService.subscribe((event) => {
if (!isClosed) {
subscriber.next({ data: event });
}
});
void this.eventsService.getInitialSnapshot().then((snapshot) => {
if (!isClosed) {
subscriber.next({ data: snapshot });
}
});
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: this.eventsService.createHeartbeat() });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(heartbeat);
unsubscribe();
};
});
}
/**
* Return recent orchestrator events for non-streaming consumers.
*/
@Get("events/recent")
@Throttle({ status: { limit: 200, ttl: 60000 } })
getRecentEvents(@Query("limit") limit?: string): {
events: ReturnType<AgentEventsService["getRecentEvents"]>;
} {
const parsedLimit = Number.parseInt(limit ?? "100", 10);
return {
events: this.eventsService.getRecentEvents(Number.isNaN(parsedLimit) ? 100 : parsedLimit),
};
}
/**
* List all agents
* @returns Array of all agent sessions with their status
*/
@Get()
@Throttle({ status: { limit: 200, ttl: 60000 } })
listAgents(): {
agentId: string;
taskId: string;
status: string;
agentType: string;
spawnedAt: string;
completedAt?: string;
error?: string;
}[] {
this.logger.log("Received request to list all agents");
try {
// Get all sessions from spawner service
const sessions = this.spawnerService.listAgentSessions();
// Map to response format
const agents = sessions.map((session) => ({
agentId: session.agentId,
taskId: session.taskId,
status: session.state,
agentType: session.agentType,
spawnedAt: session.spawnedAt.toISOString(),
completedAt: session.completedAt?.toISOString(),
error: session.error,
}));
this.logger.log(`Found ${agents.length.toString()} agents`);
return agents;
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to list agents: ${errorMessage}`);
throw new Error(`Failed to list agents: ${errorMessage}`);
}
}
/**
* Get agent status
* @param agentId Agent ID to query
* @returns Agent status details
*/
@Get(":agentId/status")
@Throttle({ status: { limit: 200, ttl: 60000 } })
async getAgentStatus(@Param("agentId", ParseUUIDPipe) agentId: string): Promise<{
agentId: string;
taskId: string;
status: string;
spawnedAt: string;
startedAt?: string;
completedAt?: string;
error?: string;
}> {
this.logger.log(`Received status request for agent: ${agentId}`);
try {
// Try to get from lifecycle service (Valkey)
const lifecycleState = await this.lifecycleService.getAgentLifecycleState(agentId);
if (lifecycleState) {
return {
agentId: lifecycleState.agentId,
taskId: lifecycleState.taskId,
status: lifecycleState.status,
spawnedAt: lifecycleState.startedAt ?? new Date().toISOString(),
startedAt: lifecycleState.startedAt,
completedAt: lifecycleState.completedAt,
error: lifecycleState.error,
};
}
// Fallback to spawner service (in-memory)
const session = this.spawnerService.getAgentSession(agentId);
if (session) {
return {
agentId: session.agentId,
taskId: session.taskId,
status: session.state,
spawnedAt: session.spawnedAt.toISOString(),
completedAt: session.completedAt?.toISOString(),
error: session.error,
};
}
throw new NotFoundException(`Agent ${agentId} not found`);
} catch (error: unknown) {
if (error instanceof NotFoundException) {
throw error;
}
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to get agent status: ${errorMessage}`);
throw new Error(`Failed to get agent status: ${errorMessage}`);
}
}
/**
* Kill a single agent immediately
* @param agentId Agent ID to kill
* @returns Success message
*/
@Post(":agentId/kill")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async killAgent(@Param("agentId", ParseUUIDPipe) agentId: string): Promise<{ message: string }> {
this.logger.warn(`Received kill request for agent: ${agentId}`);
try {
await this.killswitchService.killAgent(agentId);
this.logger.warn(`Agent ${agentId} killed successfully`);
return {
message: `Agent ${agentId} killed successfully`,
};
} catch (error) {
this.logger.error(`Failed to kill agent ${agentId}: ${String(error)}`);
throw error;
}
}
/**
* Kill all active agents
* @returns Summary of kill operation
*/
@Post("kill-all")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async killAllAgents(): Promise<{
message: string;
total: number;
killed: number;
failed: number;
errors?: string[];
}> {
this.logger.warn("Received kill-all request");
try {
const result = await this.killswitchService.killAllAgents();
this.logger.warn(
`Kill all completed: ${result.killed.toString()} killed, ${result.failed.toString()} failed out of ${result.total.toString()}`
);
return {
message: `Kill all completed: ${result.killed.toString()} killed, ${result.failed.toString()} failed`,
...result,
};
} catch (error) {
this.logger.error(`Failed to kill all agents: ${String(error)}`);
throw error;
}
}
}