Files
stack/apps/orchestrator/src/spawner/agent-spawner.service.ts
Jason Woltje 6fd8e85266
All checks were successful
ci/woodpecker/push/infra Pipeline was successful
ci/woodpecker/push/orchestrator Pipeline was successful
fix(orchestrator): make provider-aware Claude key startup requirements
2026-02-17 17:15:42 -06:00

291 lines
9.0 KiB
TypeScript

import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import Anthropic from "@anthropic-ai/sdk";
import { randomUUID } from "crypto";
import {
SpawnAgentRequest,
SpawnAgentResponse,
AgentSession,
AgentType,
} from "./types/agent-spawner.types";
/**
* Default delay in milliseconds before cleaning up sessions after terminal states
* This allows time for status queries before the session is removed
*/
const DEFAULT_SESSION_CLEANUP_DELAY_MS = 30000; // 30 seconds
const SUPPORTED_AI_PROVIDERS = ["ollama", "claude", "openai"] as const;
type SupportedAiProvider = (typeof SUPPORTED_AI_PROVIDERS)[number];
/**
* Service responsible for spawning Claude agents using Anthropic SDK
*/
@Injectable()
export class AgentSpawnerService implements OnModuleDestroy {
private readonly logger = new Logger(AgentSpawnerService.name);
private readonly anthropic: Anthropic | undefined;
private readonly aiProvider: SupportedAiProvider;
private readonly sessions = new Map<string, AgentSession>();
private readonly maxConcurrentAgents: number;
private readonly sessionCleanupDelayMs: number;
private readonly cleanupTimers = new Map<string, NodeJS.Timeout>();
constructor(private readonly configService: ConfigService) {
const configuredProvider = this.configService.get<string>("orchestrator.aiProvider");
this.aiProvider = this.normalizeAiProvider(configuredProvider);
this.logger.log(`AgentSpawnerService resolved AI provider: ${this.aiProvider}`);
const apiKey = this.configService.get<string>("orchestrator.claude.apiKey");
if (this.aiProvider === "claude") {
if (!apiKey) {
throw new Error("CLAUDE_API_KEY is required when AI_PROVIDER is set to 'claude'");
}
this.logger.log("CLAUDE_API_KEY is configured. Initializing Anthropic client.");
this.anthropic = new Anthropic({ apiKey });
} else {
if (apiKey) {
this.logger.debug(
`CLAUDE_API_KEY is set but ignored because AI provider is '${this.aiProvider}'`
);
} else {
this.logger.log(`CLAUDE_API_KEY not required for AI provider '${this.aiProvider}'.`);
}
this.anthropic = undefined;
}
// Default to 20 if not configured
this.maxConcurrentAgents =
this.configService.get<number>("orchestrator.spawner.maxConcurrentAgents") ?? 20;
// Default to 30 seconds if not configured
this.sessionCleanupDelayMs =
this.configService.get<number>("orchestrator.spawner.sessionCleanupDelayMs") ??
DEFAULT_SESSION_CLEANUP_DELAY_MS;
this.logger.log(
`AgentSpawnerService initialized with ${this.aiProvider} AI provider (max concurrent agents: ${String(
this.maxConcurrentAgents
)}, cleanup delay: ${String(this.sessionCleanupDelayMs)}ms)`
);
}
private normalizeAiProvider(provider?: string): SupportedAiProvider {
const normalizedProvider = provider?.trim().toLowerCase();
if (!normalizedProvider) {
return "ollama";
}
if (!SUPPORTED_AI_PROVIDERS.includes(normalizedProvider as SupportedAiProvider)) {
this.logger.warn(`Unsupported AI provider '${normalizedProvider}'. Defaulting to 'ollama'.`);
return "ollama";
}
return normalizedProvider as SupportedAiProvider;
}
/**
* Clean up all pending cleanup timers on module destroy
*/
onModuleDestroy(): void {
this.cleanupTimers.forEach((timer, agentId) => {
clearTimeout(timer);
this.logger.debug(`Cleared cleanup timer for agent ${agentId}`);
});
this.cleanupTimers.clear();
}
/**
* Spawn a new agent with the given configuration
* @param request Agent spawn request
* @returns Agent spawn response with agentId
*/
spawnAgent(request: SpawnAgentRequest): SpawnAgentResponse {
this.logger.log(`Spawning agent for task: ${request.taskId}`);
// Check concurrent agent limit before proceeding
this.checkConcurrentAgentLimit();
// Validate request
this.validateSpawnRequest(request);
// Generate unique agent ID
const agentId = randomUUID();
const spawnedAt = new Date();
// Create agent session
const session: AgentSession = {
agentId,
taskId: request.taskId,
agentType: request.agentType,
state: "spawning",
context: request.context,
options: request.options,
spawnedAt,
};
// Store session
this.sessions.set(agentId, session);
this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`);
// NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD)
// For now, we're just creating the session and tracking it
return {
agentId,
state: "spawning",
spawnedAt,
};
}
/**
* Get agent session by agentId
* @param agentId Unique agent identifier
* @returns Agent session or undefined if not found
*/
getAgentSession(agentId: string): AgentSession | undefined {
return this.sessions.get(agentId);
}
/**
* Find an active session by task ID.
*/
findAgentSessionByTaskId(taskId: string): AgentSession | undefined {
return Array.from(this.sessions.values()).find((session) => session.taskId === taskId);
}
/**
* Update in-memory session state for visibility in list/status endpoints.
*/
setSessionState(
agentId: string,
state: AgentSession["state"],
error?: string,
completedAt?: Date
): void {
const session = this.sessions.get(agentId);
if (!session) return;
session.state = state;
session.error = error;
if (completedAt) {
session.completedAt = completedAt;
}
this.sessions.set(agentId, session);
}
/**
* List all agent sessions
* @returns Array of all agent sessions
*/
listAgentSessions(): AgentSession[] {
return Array.from(this.sessions.values());
}
/**
* Remove an agent session from the in-memory map
* @param agentId Unique agent identifier
* @returns true if session was removed, false if not found
*/
removeSession(agentId: string): boolean {
// Clear any pending cleanup timer for this agent
const timer = this.cleanupTimers.get(agentId);
if (timer) {
clearTimeout(timer);
this.cleanupTimers.delete(agentId);
}
const deleted = this.sessions.delete(agentId);
if (deleted) {
this.logger.log(`Session removed for agent ${agentId}`);
}
return deleted;
}
/**
* Schedule session cleanup after a delay
* This allows time for status queries before the session is removed
* @param agentId Unique agent identifier
* @param delayMs Optional delay in milliseconds (defaults to configured value)
*/
scheduleSessionCleanup(agentId: string, delayMs?: number): void {
const delay = delayMs ?? this.sessionCleanupDelayMs;
// Clear any existing timer for this agent
const existingTimer = this.cleanupTimers.get(agentId);
if (existingTimer) {
clearTimeout(existingTimer);
}
this.logger.debug(`Scheduling session cleanup for agent ${agentId} in ${String(delay)}ms`);
const timer = setTimeout(() => {
this.removeSession(agentId);
this.cleanupTimers.delete(agentId);
}, delay);
this.cleanupTimers.set(agentId, timer);
}
/**
* Get the number of pending cleanup timers (for testing)
* @returns Number of pending cleanup timers
*/
getPendingCleanupCount(): number {
return this.cleanupTimers.size;
}
/**
* Check if the concurrent agent limit has been reached
* @throws HttpException with 429 Too Many Requests if limit reached
*/
private checkConcurrentAgentLimit(): void {
const currentCount = this.sessions.size;
if (currentCount >= this.maxConcurrentAgents) {
this.logger.warn(
`Maximum concurrent agents limit reached: ${String(currentCount)}/${String(this.maxConcurrentAgents)}`
);
throw new HttpException(
{
message: `Maximum concurrent agents limit reached (${String(this.maxConcurrentAgents)}). Please wait for existing agents to complete.`,
currentCount,
maxLimit: this.maxConcurrentAgents,
},
HttpStatus.TOO_MANY_REQUESTS
);
}
}
/**
* Validate spawn agent request
* @param request Spawn request to validate
* @throws Error if validation fails
*/
private validateSpawnRequest(request: SpawnAgentRequest): void {
if (!request.taskId || request.taskId.trim() === "") {
throw new Error("taskId is required");
}
const validAgentTypes: AgentType[] = ["worker", "reviewer", "tester"];
if (!validAgentTypes.includes(request.agentType)) {
throw new Error(`agentType must be one of: ${validAgentTypes.join(", ")}`);
}
if (!request.context.repository || request.context.repository.trim() === "") {
throw new Error("context.repository is required");
}
if (!request.context.branch || request.context.branch.trim() === "") {
throw new Error("context.branch is required");
}
if (request.context.workItems.length === 0) {
throw new Error("context.workItems must not be empty");
}
}
}