Files
stack/apps/orchestrator/src/spawner/agent-spawner.service.ts
Jason Woltje a42f88d64c fix(#338): Add session cleanup on terminal states
- Add removeSession and scheduleSessionCleanup methods to AgentSpawnerService
- Schedule session cleanup after completed/failed/killed transitions
- Default 30 second delay before cleanup to allow status queries
- Implement OnModuleDestroy to clean up pending timers
- Add forwardRef injection to avoid circular dependency
- Add comprehensive tests for cleanup functionality

Refs #338
2026-02-05 18:47:14 -06:00

229 lines
7.0 KiB
TypeScript

import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import Anthropic from "@anthropic-ai/sdk";
import { randomUUID } from "crypto";
import {
SpawnAgentRequest,
SpawnAgentResponse,
AgentSession,
AgentType,
} from "./types/agent-spawner.types";
/**
* Default delay in milliseconds before cleaning up sessions after terminal states
* This allows time for status queries before the session is removed
*/
const DEFAULT_SESSION_CLEANUP_DELAY_MS = 30000; // 30 seconds
/**
* Service responsible for spawning Claude agents using Anthropic SDK
*/
@Injectable()
export class AgentSpawnerService implements OnModuleDestroy {
private readonly logger = new Logger(AgentSpawnerService.name);
private readonly anthropic: Anthropic;
private readonly sessions = new Map<string, AgentSession>();
private readonly maxConcurrentAgents: number;
private readonly sessionCleanupDelayMs: number;
private readonly cleanupTimers = new Map<string, NodeJS.Timeout>();
constructor(private readonly configService: ConfigService) {
const apiKey = this.configService.get<string>("orchestrator.claude.apiKey");
if (!apiKey) {
throw new Error("CLAUDE_API_KEY is not configured");
}
this.anthropic = new Anthropic({
apiKey,
});
// Default to 20 if not configured
this.maxConcurrentAgents =
this.configService.get<number>("orchestrator.spawner.maxConcurrentAgents") ?? 20;
// Default to 30 seconds if not configured
this.sessionCleanupDelayMs =
this.configService.get<number>("orchestrator.spawner.sessionCleanupDelayMs") ??
DEFAULT_SESSION_CLEANUP_DELAY_MS;
this.logger.log(
`AgentSpawnerService initialized with Claude SDK (max concurrent agents: ${String(this.maxConcurrentAgents)}, cleanup delay: ${String(this.sessionCleanupDelayMs)}ms)`
);
}
/**
* Clean up all pending cleanup timers on module destroy
*/
onModuleDestroy(): void {
this.cleanupTimers.forEach((timer, agentId) => {
clearTimeout(timer);
this.logger.debug(`Cleared cleanup timer for agent ${agentId}`);
});
this.cleanupTimers.clear();
}
/**
* Spawn a new agent with the given configuration
* @param request Agent spawn request
* @returns Agent spawn response with agentId
*/
spawnAgent(request: SpawnAgentRequest): SpawnAgentResponse {
this.logger.log(`Spawning agent for task: ${request.taskId}`);
// Check concurrent agent limit before proceeding
this.checkConcurrentAgentLimit();
// Validate request
this.validateSpawnRequest(request);
// Generate unique agent ID
const agentId = randomUUID();
const spawnedAt = new Date();
// Create agent session
const session: AgentSession = {
agentId,
taskId: request.taskId,
agentType: request.agentType,
state: "spawning",
context: request.context,
options: request.options,
spawnedAt,
};
// Store session
this.sessions.set(agentId, session);
this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`);
// NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD)
// For now, we're just creating the session and tracking it
return {
agentId,
state: "spawning",
spawnedAt,
};
}
/**
* Get agent session by agentId
* @param agentId Unique agent identifier
* @returns Agent session or undefined if not found
*/
getAgentSession(agentId: string): AgentSession | undefined {
return this.sessions.get(agentId);
}
/**
* List all agent sessions
* @returns Array of all agent sessions
*/
listAgentSessions(): AgentSession[] {
return Array.from(this.sessions.values());
}
/**
* Remove an agent session from the in-memory map
* @param agentId Unique agent identifier
* @returns true if session was removed, false if not found
*/
removeSession(agentId: string): boolean {
// Clear any pending cleanup timer for this agent
const timer = this.cleanupTimers.get(agentId);
if (timer) {
clearTimeout(timer);
this.cleanupTimers.delete(agentId);
}
const deleted = this.sessions.delete(agentId);
if (deleted) {
this.logger.log(`Session removed for agent ${agentId}`);
}
return deleted;
}
/**
* Schedule session cleanup after a delay
* This allows time for status queries before the session is removed
* @param agentId Unique agent identifier
* @param delayMs Optional delay in milliseconds (defaults to configured value)
*/
scheduleSessionCleanup(agentId: string, delayMs?: number): void {
const delay = delayMs ?? this.sessionCleanupDelayMs;
// Clear any existing timer for this agent
const existingTimer = this.cleanupTimers.get(agentId);
if (existingTimer) {
clearTimeout(existingTimer);
}
this.logger.debug(`Scheduling session cleanup for agent ${agentId} in ${String(delay)}ms`);
const timer = setTimeout(() => {
this.removeSession(agentId);
this.cleanupTimers.delete(agentId);
}, delay);
this.cleanupTimers.set(agentId, timer);
}
/**
* Get the number of pending cleanup timers (for testing)
* @returns Number of pending cleanup timers
*/
getPendingCleanupCount(): number {
return this.cleanupTimers.size;
}
/**
* Check if the concurrent agent limit has been reached
* @throws HttpException with 429 Too Many Requests if limit reached
*/
private checkConcurrentAgentLimit(): void {
const currentCount = this.sessions.size;
if (currentCount >= this.maxConcurrentAgents) {
this.logger.warn(
`Maximum concurrent agents limit reached: ${String(currentCount)}/${String(this.maxConcurrentAgents)}`
);
throw new HttpException(
{
message: `Maximum concurrent agents limit reached (${String(this.maxConcurrentAgents)}). Please wait for existing agents to complete.`,
currentCount,
maxLimit: this.maxConcurrentAgents,
},
HttpStatus.TOO_MANY_REQUESTS
);
}
}
/**
* Validate spawn agent request
* @param request Spawn request to validate
* @throws Error if validation fails
*/
private validateSpawnRequest(request: SpawnAgentRequest): void {
if (!request.taskId || request.taskId.trim() === "") {
throw new Error("taskId is required");
}
const validAgentTypes: AgentType[] = ["worker", "reviewer", "tester"];
if (!validAgentTypes.includes(request.agentType)) {
throw new Error(`agentType must be one of: ${validAgentTypes.join(", ")}`);
}
if (!request.context.repository || request.context.repository.trim() === "") {
throw new Error("context.repository is required");
}
if (!request.context.branch || request.context.branch.trim() === "") {
throw new Error("context.branch is required");
}
if (request.context.workItems.length === 0) {
throw new Error("context.workItems must not be empty");
}
}
}