fix(#338): Add session cleanup on terminal states

- Add removeSession and scheduleSessionCleanup methods to AgentSpawnerService
- Schedule session cleanup after completed/failed/killed transitions
- Default 30 second delay before cleanup to allow status queries
- Implement OnModuleDestroy to clean up pending timers
- Add forwardRef injection to avoid circular dependency
- Add comprehensive tests for cleanup functionality

Refs #338
This commit is contained in:
Jason Woltje
2026-02-05 18:47:14 -06:00
parent 8d57191a91
commit a42f88d64c
4 changed files with 347 additions and 7 deletions

View File

@@ -1,5 +1,6 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentLifecycleService } from "./agent-lifecycle.service"; import { AgentLifecycleService } from "./agent-lifecycle.service";
import { AgentSpawnerService } from "./agent-spawner.service";
import { ValkeyService } from "../valkey/valkey.service"; import { ValkeyService } from "../valkey/valkey.service";
import type { AgentState } from "../valkey/types"; import type { AgentState } from "../valkey/types";
@@ -12,6 +13,9 @@ describe("AgentLifecycleService", () => {
publishEvent: ReturnType<typeof vi.fn>; publishEvent: ReturnType<typeof vi.fn>;
listAgents: ReturnType<typeof vi.fn>; listAgents: ReturnType<typeof vi.fn>;
}; };
let mockSpawnerService: {
scheduleSessionCleanup: ReturnType<typeof vi.fn>;
};
const mockAgentId = "test-agent-123"; const mockAgentId = "test-agent-123";
const mockTaskId = "test-task-456"; const mockTaskId = "test-task-456";
@@ -26,8 +30,15 @@ describe("AgentLifecycleService", () => {
listAgents: vi.fn(), listAgents: vi.fn(),
}; };
// Create service with mock mockSpawnerService = {
service = new AgentLifecycleService(mockValkeyService as unknown as ValkeyService); scheduleSessionCleanup: vi.fn(),
};
// Create service with mocks
service = new AgentLifecycleService(
mockValkeyService as unknown as ValkeyService,
mockSpawnerService as unknown as AgentSpawnerService
);
}); });
afterEach(() => { afterEach(() => {
@@ -612,4 +623,87 @@ describe("AgentLifecycleService", () => {
); );
}); });
}); });
describe("session cleanup on terminal states", () => {
it("should schedule session cleanup when transitioning to completed", async () => {
const mockState: AgentState = {
agentId: mockAgentId,
status: "running",
taskId: mockTaskId,
startedAt: "2026-02-02T10:00:00Z",
};
mockValkeyService.getAgentState.mockResolvedValue(mockState);
mockValkeyService.updateAgentStatus.mockResolvedValue({
...mockState,
status: "completed",
completedAt: "2026-02-02T11:00:00Z",
});
await service.transitionToCompleted(mockAgentId);
expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId);
});
it("should schedule session cleanup when transitioning to failed", async () => {
const mockState: AgentState = {
agentId: mockAgentId,
status: "running",
taskId: mockTaskId,
startedAt: "2026-02-02T10:00:00Z",
};
const errorMessage = "Runtime error occurred";
mockValkeyService.getAgentState.mockResolvedValue(mockState);
mockValkeyService.updateAgentStatus.mockResolvedValue({
...mockState,
status: "failed",
error: errorMessage,
completedAt: "2026-02-02T11:00:00Z",
});
await service.transitionToFailed(mockAgentId, errorMessage);
expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId);
});
it("should schedule session cleanup when transitioning to killed", async () => {
const mockState: AgentState = {
agentId: mockAgentId,
status: "running",
taskId: mockTaskId,
startedAt: "2026-02-02T10:00:00Z",
};
mockValkeyService.getAgentState.mockResolvedValue(mockState);
mockValkeyService.updateAgentStatus.mockResolvedValue({
...mockState,
status: "killed",
completedAt: "2026-02-02T11:00:00Z",
});
await service.transitionToKilled(mockAgentId);
expect(mockSpawnerService.scheduleSessionCleanup).toHaveBeenCalledWith(mockAgentId);
});
it("should not schedule session cleanup when transitioning to running", async () => {
const mockState: AgentState = {
agentId: mockAgentId,
status: "spawning",
taskId: mockTaskId,
};
mockValkeyService.getAgentState.mockResolvedValue(mockState);
mockValkeyService.updateAgentStatus.mockResolvedValue({
...mockState,
status: "running",
startedAt: "2026-02-02T10:00:00Z",
});
await service.transitionToRunning(mockAgentId);
expect(mockSpawnerService.scheduleSessionCleanup).not.toHaveBeenCalled();
});
});
}); });

View File

@@ -1,5 +1,6 @@
import { Injectable, Logger } from "@nestjs/common"; import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common";
import { ValkeyService } from "../valkey/valkey.service"; import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "./agent-spawner.service";
import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types";
import { isValidAgentTransition } from "../valkey/types/state.types"; import { isValidAgentTransition } from "../valkey/types/state.types";
@@ -18,7 +19,11 @@ import { isValidAgentTransition } from "../valkey/types/state.types";
export class AgentLifecycleService { export class AgentLifecycleService {
private readonly logger = new Logger(AgentLifecycleService.name); private readonly logger = new Logger(AgentLifecycleService.name);
constructor(private readonly valkeyService: ValkeyService) { constructor(
private readonly valkeyService: ValkeyService,
@Inject(forwardRef(() => AgentSpawnerService))
private readonly spawnerService: AgentSpawnerService
) {
this.logger.log("AgentLifecycleService initialized"); this.logger.log("AgentLifecycleService initialized");
} }
@@ -84,6 +89,9 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.completed", updatedState); await this.publishStateChangeEvent("agent.completed", updatedState);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.log(`Agent ${agentId} transitioned to completed`); this.logger.log(`Agent ${agentId} transitioned to completed`);
return updatedState; return updatedState;
} }
@@ -116,6 +124,9 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.failed", updatedState, error); await this.publishStateChangeEvent("agent.failed", updatedState, error);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`);
return updatedState; return updatedState;
} }
@@ -147,6 +158,9 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.killed", updatedState); await this.publishStateChangeEvent("agent.killed", updatedState);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.warn(`Agent ${agentId} transitioned to killed`); this.logger.warn(`Agent ${agentId} transitioned to killed`);
return updatedState; return updatedState;
} }

View File

@@ -401,4 +401,159 @@ describe("AgentSpawnerService", () => {
} }
}); });
}); });
describe("session cleanup", () => {
const createValidRequest = (taskId: string): SpawnAgentRequest => ({
taskId,
agentType: "worker",
context: {
repository: "https://github.com/test/repo.git",
branch: "main",
workItems: ["Implement feature X"],
},
});
it("should remove session immediately", () => {
const response = service.spawnAgent(createValidRequest("task-1"));
expect(service.getAgentSession(response.agentId)).toBeDefined();
const removed = service.removeSession(response.agentId);
expect(removed).toBe(true);
expect(service.getAgentSession(response.agentId)).toBeUndefined();
});
it("should return false when removing non-existent session", () => {
const removed = service.removeSession("non-existent-id");
expect(removed).toBe(false);
});
it("should schedule session cleanup with delay", async () => {
vi.useFakeTimers();
const response = service.spawnAgent(createValidRequest("task-1"));
expect(service.getAgentSession(response.agentId)).toBeDefined();
// Schedule cleanup with short delay
service.scheduleSessionCleanup(response.agentId, 100);
// Session should still exist before delay
expect(service.getAgentSession(response.agentId)).toBeDefined();
expect(service.getPendingCleanupCount()).toBe(1);
// Advance timer past the delay
vi.advanceTimersByTime(150);
// Session should be cleaned up
expect(service.getAgentSession(response.agentId)).toBeUndefined();
expect(service.getPendingCleanupCount()).toBe(0);
vi.useRealTimers();
});
it("should replace existing cleanup timer when rescheduled", async () => {
vi.useFakeTimers();
const response = service.spawnAgent(createValidRequest("task-1"));
// Schedule cleanup with 100ms delay
service.scheduleSessionCleanup(response.agentId, 100);
expect(service.getPendingCleanupCount()).toBe(1);
// Advance by 50ms (halfway)
vi.advanceTimersByTime(50);
expect(service.getAgentSession(response.agentId)).toBeDefined();
// Reschedule with 100ms delay (should reset the timer)
service.scheduleSessionCleanup(response.agentId, 100);
expect(service.getPendingCleanupCount()).toBe(1);
// Advance by 75ms (past original but not new)
vi.advanceTimersByTime(75);
expect(service.getAgentSession(response.agentId)).toBeDefined();
// Advance by remaining 25ms
vi.advanceTimersByTime(50);
expect(service.getAgentSession(response.agentId)).toBeUndefined();
vi.useRealTimers();
});
it("should clear cleanup timer when session is removed directly", () => {
vi.useFakeTimers();
const response = service.spawnAgent(createValidRequest("task-1"));
// Schedule cleanup
service.scheduleSessionCleanup(response.agentId, 1000);
expect(service.getPendingCleanupCount()).toBe(1);
// Remove session directly
service.removeSession(response.agentId);
// Timer should be cleared
expect(service.getPendingCleanupCount()).toBe(0);
vi.useRealTimers();
});
it("should decrease session count after cleanup", async () => {
vi.useFakeTimers();
// Create service with low limit for testing
const limitedConfigService = {
get: vi.fn((key: string) => {
if (key === "orchestrator.claude.apiKey") {
return "test-api-key";
}
if (key === "orchestrator.spawner.maxConcurrentAgents") {
return 2;
}
return undefined;
}),
} as unknown as ConfigService;
const limitedService = new AgentSpawnerService(limitedConfigService);
// Spawn up to the limit
const response1 = limitedService.spawnAgent(createValidRequest("task-1"));
limitedService.spawnAgent(createValidRequest("task-2"));
// Should be at limit
expect(limitedService.listAgentSessions()).toHaveLength(2);
expect(() => limitedService.spawnAgent(createValidRequest("task-3"))).toThrow(HttpException);
// Schedule cleanup for first agent
limitedService.scheduleSessionCleanup(response1.agentId, 100);
vi.advanceTimersByTime(150);
// Should have freed a slot
expect(limitedService.listAgentSessions()).toHaveLength(1);
// Should be able to spawn another agent now
const response3 = limitedService.spawnAgent(createValidRequest("task-3"));
expect(response3.agentId).toBeDefined();
vi.useRealTimers();
});
it("should clear all timers on module destroy", () => {
vi.useFakeTimers();
const response1 = service.spawnAgent(createValidRequest("task-1"));
const response2 = service.spawnAgent(createValidRequest("task-2"));
service.scheduleSessionCleanup(response1.agentId, 1000);
service.scheduleSessionCleanup(response2.agentId, 1000);
expect(service.getPendingCleanupCount()).toBe(2);
// Call module destroy
service.onModuleDestroy();
expect(service.getPendingCleanupCount()).toBe(0);
vi.useRealTimers();
});
});
}); });

View File

@@ -1,4 +1,4 @@
import { Injectable, Logger, HttpException, HttpStatus } from "@nestjs/common"; import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import Anthropic from "@anthropic-ai/sdk"; import Anthropic from "@anthropic-ai/sdk";
import { randomUUID } from "crypto"; import { randomUUID } from "crypto";
@@ -9,15 +9,23 @@ import {
AgentType, AgentType,
} from "./types/agent-spawner.types"; } from "./types/agent-spawner.types";
/**
* Default delay in milliseconds before cleaning up sessions after terminal states
* This allows time for status queries before the session is removed
*/
const DEFAULT_SESSION_CLEANUP_DELAY_MS = 30000; // 30 seconds
/** /**
* Service responsible for spawning Claude agents using Anthropic SDK * Service responsible for spawning Claude agents using Anthropic SDK
*/ */
@Injectable() @Injectable()
export class AgentSpawnerService { export class AgentSpawnerService implements OnModuleDestroy {
private readonly logger = new Logger(AgentSpawnerService.name); private readonly logger = new Logger(AgentSpawnerService.name);
private readonly anthropic: Anthropic; private readonly anthropic: Anthropic;
private readonly sessions = new Map<string, AgentSession>(); private readonly sessions = new Map<string, AgentSession>();
private readonly maxConcurrentAgents: number; private readonly maxConcurrentAgents: number;
private readonly sessionCleanupDelayMs: number;
private readonly cleanupTimers = new Map<string, NodeJS.Timeout>();
constructor(private readonly configService: ConfigService) { constructor(private readonly configService: ConfigService) {
const apiKey = this.configService.get<string>("orchestrator.claude.apiKey"); const apiKey = this.configService.get<string>("orchestrator.claude.apiKey");
@@ -34,11 +42,27 @@ export class AgentSpawnerService {
this.maxConcurrentAgents = this.maxConcurrentAgents =
this.configService.get<number>("orchestrator.spawner.maxConcurrentAgents") ?? 20; this.configService.get<number>("orchestrator.spawner.maxConcurrentAgents") ?? 20;
// Default to 30 seconds if not configured
this.sessionCleanupDelayMs =
this.configService.get<number>("orchestrator.spawner.sessionCleanupDelayMs") ??
DEFAULT_SESSION_CLEANUP_DELAY_MS;
this.logger.log( this.logger.log(
`AgentSpawnerService initialized with Claude SDK (max concurrent agents: ${String(this.maxConcurrentAgents)})` `AgentSpawnerService initialized with Claude SDK (max concurrent agents: ${String(this.maxConcurrentAgents)}, cleanup delay: ${String(this.sessionCleanupDelayMs)}ms)`
); );
} }
/**
* Clean up all pending cleanup timers on module destroy
*/
onModuleDestroy(): void {
this.cleanupTimers.forEach((timer, agentId) => {
clearTimeout(timer);
this.logger.debug(`Cleared cleanup timer for agent ${agentId}`);
});
this.cleanupTimers.clear();
}
/** /**
* Spawn a new agent with the given configuration * Spawn a new agent with the given configuration
* @param request Agent spawn request * @param request Agent spawn request
@@ -100,6 +124,59 @@ export class AgentSpawnerService {
return Array.from(this.sessions.values()); return Array.from(this.sessions.values());
} }
/**
* Remove an agent session from the in-memory map
* @param agentId Unique agent identifier
* @returns true if session was removed, false if not found
*/
removeSession(agentId: string): boolean {
// Clear any pending cleanup timer for this agent
const timer = this.cleanupTimers.get(agentId);
if (timer) {
clearTimeout(timer);
this.cleanupTimers.delete(agentId);
}
const deleted = this.sessions.delete(agentId);
if (deleted) {
this.logger.log(`Session removed for agent ${agentId}`);
}
return deleted;
}
/**
* Schedule session cleanup after a delay
* This allows time for status queries before the session is removed
* @param agentId Unique agent identifier
* @param delayMs Optional delay in milliseconds (defaults to configured value)
*/
scheduleSessionCleanup(agentId: string, delayMs?: number): void {
const delay = delayMs ?? this.sessionCleanupDelayMs;
// Clear any existing timer for this agent
const existingTimer = this.cleanupTimers.get(agentId);
if (existingTimer) {
clearTimeout(existingTimer);
}
this.logger.debug(`Scheduling session cleanup for agent ${agentId} in ${String(delay)}ms`);
const timer = setTimeout(() => {
this.removeSession(agentId);
this.cleanupTimers.delete(agentId);
}, delay);
this.cleanupTimers.set(agentId, timer);
}
/**
* Get the number of pending cleanup timers (for testing)
* @returns Number of pending cleanup timers
*/
getPendingCleanupCount(): number {
return this.cleanupTimers.size;
}
/** /**
* Check if the concurrent agent limit has been reached * Check if the concurrent agent limit has been reached
* @throws HttpException with 429 Too Many Requests if limit reached * @throws HttpException with 429 Too Many Requests if limit reached