From a42f88d64ca3c4d7aa2b58da42fc087f0c68c5a2 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Thu, 5 Feb 2026 18:47:14 -0600 Subject: [PATCH] fix(#338): Add session cleanup on terminal states - Add removeSession and scheduleSessionCleanup methods to AgentSpawnerService - Schedule session cleanup after completed/failed/killed transitions - Default 30 second delay before cleanup to allow status queries - Implement OnModuleDestroy to clean up pending timers - Add forwardRef injection to avoid circular dependency - Add comprehensive tests for cleanup functionality Refs #338 --- .../spawner/agent-lifecycle.service.spec.ts | 98 ++++++++++- .../src/spawner/agent-lifecycle.service.ts | 18 +- .../src/spawner/agent-spawner.service.spec.ts | 155 ++++++++++++++++++ .../src/spawner/agent-spawner.service.ts | 83 +++++++++- 4 files changed, 347 insertions(+), 7 deletions(-) diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts index ad466cc..6b359db 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts @@ -1,5 +1,6 @@ import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { AgentLifecycleService } from "./agent-lifecycle.service"; +import { AgentSpawnerService } from "./agent-spawner.service"; import { ValkeyService } from "../valkey/valkey.service"; import type { AgentState } from "../valkey/types"; @@ -12,6 +13,9 @@ describe("AgentLifecycleService", () => { publishEvent: ReturnType; listAgents: ReturnType; }; + let mockSpawnerService: { + scheduleSessionCleanup: ReturnType; + }; const mockAgentId = "test-agent-123"; const mockTaskId = "test-task-456"; @@ -26,8 +30,15 @@ describe("AgentLifecycleService", () => { listAgents: vi.fn(), }; - // Create service with mock - service = new AgentLifecycleService(mockValkeyService as unknown as ValkeyService); + mockSpawnerService = { + scheduleSessionCleanup: vi.fn(), + }; + + // Create service with mocks + service = new AgentLifecycleService( + mockValkeyService as unknown as ValkeyService, + mockSpawnerService as unknown as AgentSpawnerService + ); }); afterEach(() => { @@ -612,4 +623,87 @@ describe("AgentLifecycleService", () => { ); }); }); + + describe("session cleanup on terminal states", () => { + it("should schedule session cleanup when transitioning to completed", async () => { + const mockState: AgentState = { + agentId: mockAgentId, + status: "running", + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + }; + + mockValkeyService.getAgentState.mockResolvedValue(mockState); + mockValkeyService.updateAgentStatus.mockResolvedValue({ + ...mockState, + status: "completed", + completedAt: "2026-02-02T11:00:00Z", + }); + + await service.transitionToCompleted(mockAgentId); + + expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId); + }); + + it("should schedule session cleanup when transitioning to failed", async () => { + const mockState: AgentState = { + agentId: mockAgentId, + status: "running", + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + }; + const errorMessage = "Runtime error occurred"; + + mockValkeyService.getAgentState.mockResolvedValue(mockState); + mockValkeyService.updateAgentStatus.mockResolvedValue({ + ...mockState, + status: "failed", + error: errorMessage, + completedAt: "2026-02-02T11:00:00Z", + }); + + await service.transitionToFailed(mockAgentId, errorMessage); + + expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId); + }); + + it("should schedule session cleanup when transitioning to killed", async () => { + const mockState: AgentState = { + agentId: mockAgentId, + status: "running", + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + }; + + mockValkeyService.getAgentState.mockResolvedValue(mockState); + mockValkeyService.updateAgentStatus.mockResolvedValue({ + ...mockState, + status: "killed", + completedAt: "2026-02-02T11:00:00Z", + }); + + await service.transitionToKilled(mockAgentId); + + expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId); + }); + + it("should not schedule session cleanup when transitioning to running", async () => { + const mockState: AgentState = { + agentId: mockAgentId, + status: "spawning", + taskId: mockTaskId, + }; + + mockValkeyService.getAgentState.mockResolvedValue(mockState); + mockValkeyService.updateAgentStatus.mockResolvedValue({ + ...mockState, + status: "running", + startedAt: "2026-02-02T10:00:00Z", + }); + + await service.transitionToRunning(mockAgentId); + + expect(mockSpawnerService.scheduleSessionCleanup).not.toHaveBeenCalled(); + }); + }); }); diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts index aa8cbe8..b2fccdc 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts @@ -1,5 +1,6 @@ -import { Injectable, Logger } from "@nestjs/common"; +import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; import { ValkeyService } from "../valkey/valkey.service"; +import { AgentSpawnerService } from "./agent-spawner.service"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import { isValidAgentTransition } from "../valkey/types/state.types"; @@ -18,7 +19,11 @@ import { isValidAgentTransition } from "../valkey/types/state.types"; export class AgentLifecycleService { private readonly logger = new Logger(AgentLifecycleService.name); - constructor(private readonly valkeyService: ValkeyService) { + constructor( + private readonly valkeyService: ValkeyService, + @Inject(forwardRef(() => AgentSpawnerService)) + private readonly spawnerService: AgentSpawnerService + ) { this.logger.log("AgentLifecycleService initialized"); } @@ -84,6 +89,9 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.completed", updatedState); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); + this.logger.log(`Agent ${agentId} transitioned to completed`); return updatedState; } @@ -116,6 +124,9 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.failed", updatedState, error); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); + this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); return updatedState; } @@ -147,6 +158,9 @@ export class AgentLifecycleService { // Emit event await this.publishStateChangeEvent("agent.killed", updatedState); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); + this.logger.warn(`Agent ${agentId} transitioned to killed`); return updatedState; } diff --git a/apps/orchestrator/src/spawner/agent-spawner.service.spec.ts b/apps/orchestrator/src/spawner/agent-spawner.service.spec.ts index 8eb2a42..6cc0ff0 100644 --- a/apps/orchestrator/src/spawner/agent-spawner.service.spec.ts +++ b/apps/orchestrator/src/spawner/agent-spawner.service.spec.ts @@ -401,4 +401,159 @@ describe("AgentSpawnerService", () => { } }); }); + + describe("session cleanup", () => { + const createValidRequest = (taskId: string): SpawnAgentRequest => ({ + taskId, + agentType: "worker", + context: { + repository: "https://github.com/test/repo.git", + branch: "main", + workItems: ["Implement feature X"], + }, + }); + + it("should remove session immediately", () => { + const response = service.spawnAgent(createValidRequest("task-1")); + expect(service.getAgentSession(response.agentId)).toBeDefined(); + + const removed = service.removeSession(response.agentId); + + expect(removed).toBe(true); + expect(service.getAgentSession(response.agentId)).toBeUndefined(); + }); + + it("should return false when removing non-existent session", () => { + const removed = service.removeSession("non-existent-id"); + expect(removed).toBe(false); + }); + + it("should schedule session cleanup with delay", async () => { + vi.useFakeTimers(); + + const response = service.spawnAgent(createValidRequest("task-1")); + expect(service.getAgentSession(response.agentId)).toBeDefined(); + + // Schedule cleanup with short delay + service.scheduleSessionCleanup(response.agentId, 100); + + // Session should still exist before delay + expect(service.getAgentSession(response.agentId)).toBeDefined(); + expect(service.getPendingCleanupCount()).toBe(1); + + // Advance timer past the delay + vi.advanceTimersByTime(150); + + // Session should be cleaned up + expect(service.getAgentSession(response.agentId)).toBeUndefined(); + expect(service.getPendingCleanupCount()).toBe(0); + + vi.useRealTimers(); + }); + + it("should replace existing cleanup timer when rescheduled", async () => { + vi.useFakeTimers(); + + const response = service.spawnAgent(createValidRequest("task-1")); + + // Schedule cleanup with 100ms delay + service.scheduleSessionCleanup(response.agentId, 100); + expect(service.getPendingCleanupCount()).toBe(1); + + // Advance by 50ms (halfway) + vi.advanceTimersByTime(50); + expect(service.getAgentSession(response.agentId)).toBeDefined(); + + // Reschedule with 100ms delay (should reset the timer) + service.scheduleSessionCleanup(response.agentId, 100); + expect(service.getPendingCleanupCount()).toBe(1); + + // Advance by 75ms (past original but not new) + vi.advanceTimersByTime(75); + expect(service.getAgentSession(response.agentId)).toBeDefined(); + + // Advance by remaining 25ms + vi.advanceTimersByTime(50); + expect(service.getAgentSession(response.agentId)).toBeUndefined(); + + vi.useRealTimers(); + }); + + it("should clear cleanup timer when session is removed directly", () => { + vi.useFakeTimers(); + + const response = service.spawnAgent(createValidRequest("task-1")); + + // Schedule cleanup + service.scheduleSessionCleanup(response.agentId, 1000); + expect(service.getPendingCleanupCount()).toBe(1); + + // Remove session directly + service.removeSession(response.agentId); + + // Timer should be cleared + expect(service.getPendingCleanupCount()).toBe(0); + + vi.useRealTimers(); + }); + + it("should decrease session count after cleanup", async () => { + vi.useFakeTimers(); + + // Create service with low limit for testing + const limitedConfigService = { + get: vi.fn((key: string) => { + if (key === "orchestrator.claude.apiKey") { + return "test-api-key"; + } + if (key === "orchestrator.spawner.maxConcurrentAgents") { + return 2; + } + return undefined; + }), + } as unknown as ConfigService; + + const limitedService = new AgentSpawnerService(limitedConfigService); + + // Spawn up to the limit + const response1 = limitedService.spawnAgent(createValidRequest("task-1")); + limitedService.spawnAgent(createValidRequest("task-2")); + + // Should be at limit + expect(limitedService.listAgentSessions()).toHaveLength(2); + expect(() => limitedService.spawnAgent(createValidRequest("task-3"))).toThrow(HttpException); + + // Schedule cleanup for first agent + limitedService.scheduleSessionCleanup(response1.agentId, 100); + vi.advanceTimersByTime(150); + + // Should have freed a slot + expect(limitedService.listAgentSessions()).toHaveLength(1); + + // Should be able to spawn another agent now + const response3 = limitedService.spawnAgent(createValidRequest("task-3")); + expect(response3.agentId).toBeDefined(); + + vi.useRealTimers(); + }); + + it("should clear all timers on module destroy", () => { + vi.useFakeTimers(); + + const response1 = service.spawnAgent(createValidRequest("task-1")); + const response2 = service.spawnAgent(createValidRequest("task-2")); + + service.scheduleSessionCleanup(response1.agentId, 1000); + service.scheduleSessionCleanup(response2.agentId, 1000); + + expect(service.getPendingCleanupCount()).toBe(2); + + // Call module destroy + service.onModuleDestroy(); + + expect(service.getPendingCleanupCount()).toBe(0); + + vi.useRealTimers(); + }); + }); }); diff --git a/apps/orchestrator/src/spawner/agent-spawner.service.ts b/apps/orchestrator/src/spawner/agent-spawner.service.ts index fc6f0d4..e3ce4ba 100644 --- a/apps/orchestrator/src/spawner/agent-spawner.service.ts +++ b/apps/orchestrator/src/spawner/agent-spawner.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, HttpException, HttpStatus } from "@nestjs/common"; +import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import Anthropic from "@anthropic-ai/sdk"; import { randomUUID } from "crypto"; @@ -9,15 +9,23 @@ import { AgentType, } from "./types/agent-spawner.types"; +/** + * Default delay in milliseconds before cleaning up sessions after terminal states + * This allows time for status queries before the session is removed + */ +const DEFAULT_SESSION_CLEANUP_DELAY_MS = 30000; // 30 seconds + /** * Service responsible for spawning Claude agents using Anthropic SDK */ @Injectable() -export class AgentSpawnerService { +export class AgentSpawnerService implements OnModuleDestroy { private readonly logger = new Logger(AgentSpawnerService.name); private readonly anthropic: Anthropic; private readonly sessions = new Map(); private readonly maxConcurrentAgents: number; + private readonly sessionCleanupDelayMs: number; + private readonly cleanupTimers = new Map(); constructor(private readonly configService: ConfigService) { const apiKey = this.configService.get("orchestrator.claude.apiKey"); @@ -34,11 +42,27 @@ export class AgentSpawnerService { this.maxConcurrentAgents = this.configService.get("orchestrator.spawner.maxConcurrentAgents") ?? 20; + // Default to 30 seconds if not configured + this.sessionCleanupDelayMs = + this.configService.get("orchestrator.spawner.sessionCleanupDelayMs") ?? + DEFAULT_SESSION_CLEANUP_DELAY_MS; + this.logger.log( - `AgentSpawnerService initialized with Claude SDK (max concurrent agents: ${String(this.maxConcurrentAgents)})` + `AgentSpawnerService initialized with Claude SDK (max concurrent agents: ${String(this.maxConcurrentAgents)}, cleanup delay: ${String(this.sessionCleanupDelayMs)}ms)` ); } + /** + * Clean up all pending cleanup timers on module destroy + */ + onModuleDestroy(): void { + this.cleanupTimers.forEach((timer, agentId) => { + clearTimeout(timer); + this.logger.debug(`Cleared cleanup timer for agent ${agentId}`); + }); + this.cleanupTimers.clear(); + } + /** * Spawn a new agent with the given configuration * @param request Agent spawn request @@ -100,6 +124,59 @@ export class AgentSpawnerService { return Array.from(this.sessions.values()); } + /** + * Remove an agent session from the in-memory map + * @param agentId Unique agent identifier + * @returns true if session was removed, false if not found + */ + removeSession(agentId: string): boolean { + // Clear any pending cleanup timer for this agent + const timer = this.cleanupTimers.get(agentId); + if (timer) { + clearTimeout(timer); + this.cleanupTimers.delete(agentId); + } + + const deleted = this.sessions.delete(agentId); + if (deleted) { + this.logger.log(`Session removed for agent ${agentId}`); + } + return deleted; + } + + /** + * Schedule session cleanup after a delay + * This allows time for status queries before the session is removed + * @param agentId Unique agent identifier + * @param delayMs Optional delay in milliseconds (defaults to configured value) + */ + scheduleSessionCleanup(agentId: string, delayMs?: number): void { + const delay = delayMs ?? this.sessionCleanupDelayMs; + + // Clear any existing timer for this agent + const existingTimer = this.cleanupTimers.get(agentId); + if (existingTimer) { + clearTimeout(existingTimer); + } + + this.logger.debug(`Scheduling session cleanup for agent ${agentId} in ${String(delay)}ms`); + + const timer = setTimeout(() => { + this.removeSession(agentId); + this.cleanupTimers.delete(agentId); + }, delay); + + this.cleanupTimers.set(agentId, timer); + } + + /** + * Get the number of pending cleanup timers (for testing) + * @returns Number of pending cleanup timers + */ + getPendingCleanupCount(): number { + return this.cleanupTimers.size; + } + /** * Check if the concurrent agent limit has been reached * @throws HttpException with 429 Too Many Requests if limit reached