diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts index 6b359db..f4b815b 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts @@ -706,4 +706,233 @@ describe("AgentLifecycleService", () => { expect(mockSpawnerService.scheduleSessionCleanup).not.toHaveBeenCalled(); }); }); + + describe("TOCTOU race prevention (CQ-ORCH-5)", () => { + it("should serialize concurrent transitions to the same agent", async () => { + const executionOrder: string[] = []; + + // Simulate state that changes after first transition completes + let currentStatus: "spawning" | "running" | "completed" = "spawning"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + // Simulate delay to allow interleaving if lock is broken + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + currentStatus = status as "spawning" | "running" | "completed"; + executionOrder.push(`updated-to-${status}`); + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + ...(status === "completed" && { completedAt: "2026-02-02T11:00:00Z" }), + } as AgentState; + } + ); + + // Launch both transitions concurrently + const [result1, result2] = await Promise.allSettled([ + service.transitionToRunning(mockAgentId), + service.transitionToCompleted(mockAgentId), + ]); + + // First should succeed (spawning -> running) + expect(result1.status).toBe("fulfilled"); + + // Second should also succeed (running -> completed) because the lock + // serializes them: first one completes, updates state to running, + // then second reads the updated state and transitions to completed + expect(result2.status).toBe("fulfilled"); + + // Verify they executed in order, not interleaved + expect(executionOrder).toEqual(["updated-to-running", "updated-to-completed"]); + }); + + it("should reject second concurrent transition if first makes it invalid", async () => { + let currentStatus: "running" | "completed" | "killed" = "running"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + currentStatus = status as "running" | "completed" | "killed"; + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + completedAt: "2026-02-02T11:00:00Z", + } as AgentState; + } + ); + + // Both try to transition from running to a terminal state concurrently + const [result1, result2] = await Promise.allSettled([ + service.transitionToCompleted(mockAgentId), + service.transitionToKilled(mockAgentId), + ]); + + // First should succeed (running -> completed) + expect(result1.status).toBe("fulfilled"); + + // Second should fail because after first completes, + // agent is in "completed" state which cannot transition to "killed" + expect(result2.status).toBe("rejected"); + if (result2.status === "rejected") { + expect(result2.reason).toBeInstanceOf(Error); + expect((result2.reason as Error).message).toContain("Invalid state transition"); + } + }); + + it("should allow concurrent transitions to different agents", async () => { + const agent1Id = "agent-1"; + const agent2Id = "agent-2"; + const executionOrder: string[] = []; + + mockValkeyService.getAgentState.mockImplementation(async (agentId: string) => { + return { + agentId, + status: "spawning", + taskId: `task-for-${agentId}`, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (agentId: string, status: string) => { + executionOrder.push(`${agentId}-start`); + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + executionOrder.push(`${agentId}-end`); + return { + agentId, + status, + taskId: `task-for-${agentId}`, + startedAt: "2026-02-02T10:00:00Z", + } as AgentState; + } + ); + + // Both should run concurrently since they target different agents + const [result1, result2] = await Promise.allSettled([ + service.transitionToRunning(agent1Id), + service.transitionToRunning(agent2Id), + ]); + + expect(result1.status).toBe("fulfilled"); + expect(result2.status).toBe("fulfilled"); + + // Both should start before either finishes (concurrent, not serialized) + // The execution order should show interleaving + expect(executionOrder).toContain("agent-1-start"); + expect(executionOrder).toContain("agent-2-start"); + }); + + it("should release lock even when transition throws an error", async () => { + let callCount = 0; + + mockValkeyService.getAgentState.mockImplementation(async () => { + callCount++; + if (callCount === 1) { + // First call: throw error + return null; + } + // Second call: return valid state + return { + agentId: mockAgentId, + status: "spawning", + taskId: mockTaskId, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockResolvedValue({ + agentId: mockAgentId, + status: "running", + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + }); + + // First transition should fail (agent not found) + await expect(service.transitionToRunning(mockAgentId)).rejects.toThrow( + `Agent ${mockAgentId} not found` + ); + + // Second transition should succeed (lock was released despite error) + const result = await service.transitionToRunning(mockAgentId); + expect(result.status).toBe("running"); + }); + + it("should handle three concurrent transitions sequentially for same agent", async () => { + const executionOrder: string[] = []; + let currentStatus: "spawning" | "running" | "completed" | "failed" = "spawning"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + ...(currentStatus !== "spawning" && { startedAt: "2026-02-02T10:00:00Z" }), + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + executionOrder.push(`update-${status}`); + await new Promise((resolve) => { + setTimeout(resolve, 5); + }); + currentStatus = status as "spawning" | "running" | "completed" | "failed"; + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + ...(["completed", "failed"].includes(status) && { + completedAt: "2026-02-02T11:00:00Z", + }), + } as AgentState; + } + ); + + // Launch three transitions at once: spawning->running->completed, plus a failed attempt + const [r1, r2, r3] = await Promise.allSettled([ + service.transitionToRunning(mockAgentId), + service.transitionToCompleted(mockAgentId), + service.transitionToFailed(mockAgentId, "late error"), + ]); + + // First: spawning -> running (succeeds) + expect(r1.status).toBe("fulfilled"); + // Second: running -> completed (succeeds, serialized after first) + expect(r2.status).toBe("fulfilled"); + // Third: completed -> failed (fails, completed is terminal) + expect(r3.status).toBe("rejected"); + + // Verify sequential execution + expect(executionOrder[0]).toBe("update-running"); + expect(executionOrder[1]).toBe("update-completed"); + // Third never gets to update because validation fails + expect(executionOrder).toHaveLength(2); + }); + }); }); diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts index b2fccdc..942cb08 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts @@ -14,11 +14,21 @@ import { isValidAgentTransition } from "../valkey/types/state.types"; * - Persists agent state changes to Valkey * - Emits pub/sub events on state changes * - Tracks agent metadata (startedAt, completedAt, error) + * - Uses per-agent mutex to prevent TOCTOU race conditions (CQ-ORCH-5) */ @Injectable() export class AgentLifecycleService { private readonly logger = new Logger(AgentLifecycleService.name); + /** + * Per-agent mutex map to serialize state transitions. + * Uses promise chaining so concurrent transitions to the same agent + * are queued and executed sequentially, preventing TOCTOU races + * where two concurrent requests could both read the same state, + * both validate it as valid, and both write, causing lost updates. + */ + private readonly agentLocks = new Map>(); + constructor( private readonly valkeyService: ValkeyService, @Inject(forwardRef(() => AgentSpawnerService)) @@ -27,6 +37,37 @@ export class AgentLifecycleService { this.logger.log("AgentLifecycleService initialized"); } + /** + * Acquire a per-agent mutex to serialize state transitions. + * Uses promise chaining: each caller chains onto the previous lock, + * ensuring transitions for the same agent are strictly sequential. + * Different agents can transition concurrently without contention. + * + * @param agentId Agent to acquire lock for + * @param fn Critical section to execute while holding the lock + * @returns Result of the critical section + */ + private async withAgentLock(agentId: string, fn: () => Promise): Promise { + const previousLock = this.agentLocks.get(agentId) ?? Promise.resolve(); + + let releaseLock!: () => void; + const currentLock = new Promise((resolve) => { + releaseLock = resolve; + }); + this.agentLocks.set(agentId, currentLock); + + try { + await previousLock; + return await fn(); + } finally { + releaseLock(); + // Clean up the map entry if we are the last in the chain + if (this.agentLocks.get(agentId) === currentLock) { + this.agentLocks.delete(agentId); + } + } + } + /** * Transition agent from spawning to running state * @param agentId Unique agent identifier @@ -34,28 +75,34 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToRunning(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to running`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to running`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "running"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "running"); - // Set startedAt timestamp if not already set - const startedAt = currentState.startedAt ?? new Date().toISOString(); + // Set startedAt timestamp if not already set + const startedAt = currentState.startedAt ?? new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "running", undefined); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus( + agentId, + "running", + undefined + ); - // Ensure startedAt is set - if (!updatedState.startedAt) { - updatedState.startedAt = startedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure startedAt is set + if (!updatedState.startedAt) { + updatedState.startedAt = startedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.running", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.running", updatedState); - this.logger.log(`Agent ${agentId} transitioned to running`); - return updatedState; + this.logger.log(`Agent ${agentId} transitioned to running`); + return updatedState; + }); } /** @@ -65,35 +112,37 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToCompleted(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to completed`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to completed`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "completed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "completed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus( - agentId, - "completed", - undefined - ); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus( + agentId, + "completed", + undefined + ); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.completed", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.completed", updatedState); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.log(`Agent ${agentId} transitioned to completed`); - return updatedState; + this.logger.log(`Agent ${agentId} transitioned to completed`); + return updatedState; + }); } /** @@ -104,31 +153,33 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToFailed(agentId: string, error: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "failed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "failed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.failed", updatedState, error); + // Emit event + await this.publishStateChangeEvent("agent.failed", updatedState, error); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); - return updatedState; + this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); + return updatedState; + }); } /** @@ -138,31 +189,33 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToKilled(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to killed`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to killed`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "killed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "killed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.killed", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.killed", updatedState); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.warn(`Agent ${agentId} transitioned to killed`); - return updatedState; + this.logger.warn(`Agent ${agentId} transitioned to killed`); + return updatedState; + }); } /**