From 2b356f6ca2bb5842813a376fc9bbae20eebad01e Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Feb 2026 14:02:40 -0600 Subject: [PATCH] fix(CQ-ORCH-5): Fix TOCTOU race in agent state transitions Add per-agent mutex using promise chaining to serialize state transitions for the same agent. This prevents the Time-of-Check-Time-of-Use race condition where two concurrent requests could both read the current state, both validate it as valid for transition, and both write, causing one to overwrite the other's transition. The mutex uses a Map> with promise chaining so that: - Concurrent transitions to the same agent are queued and executed sequentially - Different agents can still transition concurrently without contention - The lock is always released even if the transition throws an error Co-Authored-By: Claude Opus 4.6 --- .../spawner/agent-lifecycle.service.spec.ts | 229 ++++++++++++++++++ .../src/spawner/agent-lifecycle.service.ts | 201 +++++++++------ 2 files changed, 356 insertions(+), 74 deletions(-) diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts index 6b359db..f4b815b 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.spec.ts @@ -706,4 +706,233 @@ describe("AgentLifecycleService", () => { expect(mockSpawnerService.scheduleSessionCleanup).not.toHaveBeenCalled(); }); }); + + describe("TOCTOU race prevention (CQ-ORCH-5)", () => { + it("should serialize concurrent transitions to the same agent", async () => { + const executionOrder: string[] = []; + + // Simulate state that changes after first transition completes + let currentStatus: "spawning" | "running" | "completed" = "spawning"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + // Simulate delay to allow interleaving if lock is broken + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + currentStatus = status as "spawning" | "running" | "completed"; + executionOrder.push(`updated-to-${status}`); + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + ...(status === "completed" && { completedAt: "2026-02-02T11:00:00Z" }), + } as AgentState; + } + ); + + // Launch both transitions concurrently + const [result1, result2] = await Promise.allSettled([ + service.transitionToRunning(mockAgentId), + service.transitionToCompleted(mockAgentId), + ]); + + // First should succeed (spawning -> running) + expect(result1.status).toBe("fulfilled"); + + // Second should also succeed (running -> completed) because the lock + // serializes them: first one completes, updates state to running, + // then second reads the updated state and transitions to completed + expect(result2.status).toBe("fulfilled"); + + // Verify they executed in order, not interleaved + expect(executionOrder).toEqual(["updated-to-running", "updated-to-completed"]); + }); + + it("should reject second concurrent transition if first makes it invalid", async () => { + let currentStatus: "running" | "completed" | "killed" = "running"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + currentStatus = status as "running" | "completed" | "killed"; + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + completedAt: "2026-02-02T11:00:00Z", + } as AgentState; + } + ); + + // Both try to transition from running to a terminal state concurrently + const [result1, result2] = await Promise.allSettled([ + service.transitionToCompleted(mockAgentId), + service.transitionToKilled(mockAgentId), + ]); + + // First should succeed (running -> completed) + expect(result1.status).toBe("fulfilled"); + + // Second should fail because after first completes, + // agent is in "completed" state which cannot transition to "killed" + expect(result2.status).toBe("rejected"); + if (result2.status === "rejected") { + expect(result2.reason).toBeInstanceOf(Error); + expect((result2.reason as Error).message).toContain("Invalid state transition"); + } + }); + + it("should allow concurrent transitions to different agents", async () => { + const agent1Id = "agent-1"; + const agent2Id = "agent-2"; + const executionOrder: string[] = []; + + mockValkeyService.getAgentState.mockImplementation(async (agentId: string) => { + return { + agentId, + status: "spawning", + taskId: `task-for-${agentId}`, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (agentId: string, status: string) => { + executionOrder.push(`${agentId}-start`); + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + executionOrder.push(`${agentId}-end`); + return { + agentId, + status, + taskId: `task-for-${agentId}`, + startedAt: "2026-02-02T10:00:00Z", + } as AgentState; + } + ); + + // Both should run concurrently since they target different agents + const [result1, result2] = await Promise.allSettled([ + service.transitionToRunning(agent1Id), + service.transitionToRunning(agent2Id), + ]); + + expect(result1.status).toBe("fulfilled"); + expect(result2.status).toBe("fulfilled"); + + // Both should start before either finishes (concurrent, not serialized) + // The execution order should show interleaving + expect(executionOrder).toContain("agent-1-start"); + expect(executionOrder).toContain("agent-2-start"); + }); + + it("should release lock even when transition throws an error", async () => { + let callCount = 0; + + mockValkeyService.getAgentState.mockImplementation(async () => { + callCount++; + if (callCount === 1) { + // First call: throw error + return null; + } + // Second call: return valid state + return { + agentId: mockAgentId, + status: "spawning", + taskId: mockTaskId, + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockResolvedValue({ + agentId: mockAgentId, + status: "running", + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + }); + + // First transition should fail (agent not found) + await expect(service.transitionToRunning(mockAgentId)).rejects.toThrow( + `Agent ${mockAgentId} not found` + ); + + // Second transition should succeed (lock was released despite error) + const result = await service.transitionToRunning(mockAgentId); + expect(result.status).toBe("running"); + }); + + it("should handle three concurrent transitions sequentially for same agent", async () => { + const executionOrder: string[] = []; + let currentStatus: "spawning" | "running" | "completed" | "failed" = "spawning"; + + mockValkeyService.getAgentState.mockImplementation(async () => { + return { + agentId: mockAgentId, + status: currentStatus, + taskId: mockTaskId, + ...(currentStatus !== "spawning" && { startedAt: "2026-02-02T10:00:00Z" }), + } as AgentState; + }); + + mockValkeyService.updateAgentStatus.mockImplementation( + async (_agentId: string, status: string) => { + executionOrder.push(`update-${status}`); + await new Promise((resolve) => { + setTimeout(resolve, 5); + }); + currentStatus = status as "spawning" | "running" | "completed" | "failed"; + return { + agentId: mockAgentId, + status, + taskId: mockTaskId, + startedAt: "2026-02-02T10:00:00Z", + ...(["completed", "failed"].includes(status) && { + completedAt: "2026-02-02T11:00:00Z", + }), + } as AgentState; + } + ); + + // Launch three transitions at once: spawning->running->completed, plus a failed attempt + const [r1, r2, r3] = await Promise.allSettled([ + service.transitionToRunning(mockAgentId), + service.transitionToCompleted(mockAgentId), + service.transitionToFailed(mockAgentId, "late error"), + ]); + + // First: spawning -> running (succeeds) + expect(r1.status).toBe("fulfilled"); + // Second: running -> completed (succeeds, serialized after first) + expect(r2.status).toBe("fulfilled"); + // Third: completed -> failed (fails, completed is terminal) + expect(r3.status).toBe("rejected"); + + // Verify sequential execution + expect(executionOrder[0]).toBe("update-running"); + expect(executionOrder[1]).toBe("update-completed"); + // Third never gets to update because validation fails + expect(executionOrder).toHaveLength(2); + }); + }); }); diff --git a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts index b2fccdc..942cb08 100644 --- a/apps/orchestrator/src/spawner/agent-lifecycle.service.ts +++ b/apps/orchestrator/src/spawner/agent-lifecycle.service.ts @@ -14,11 +14,21 @@ import { isValidAgentTransition } from "../valkey/types/state.types"; * - Persists agent state changes to Valkey * - Emits pub/sub events on state changes * - Tracks agent metadata (startedAt, completedAt, error) + * - Uses per-agent mutex to prevent TOCTOU race conditions (CQ-ORCH-5) */ @Injectable() export class AgentLifecycleService { private readonly logger = new Logger(AgentLifecycleService.name); + /** + * Per-agent mutex map to serialize state transitions. + * Uses promise chaining so concurrent transitions to the same agent + * are queued and executed sequentially, preventing TOCTOU races + * where two concurrent requests could both read the same state, + * both validate it as valid, and both write, causing lost updates. + */ + private readonly agentLocks = new Map>(); + constructor( private readonly valkeyService: ValkeyService, @Inject(forwardRef(() => AgentSpawnerService)) @@ -27,6 +37,37 @@ export class AgentLifecycleService { this.logger.log("AgentLifecycleService initialized"); } + /** + * Acquire a per-agent mutex to serialize state transitions. + * Uses promise chaining: each caller chains onto the previous lock, + * ensuring transitions for the same agent are strictly sequential. + * Different agents can transition concurrently without contention. + * + * @param agentId Agent to acquire lock for + * @param fn Critical section to execute while holding the lock + * @returns Result of the critical section + */ + private async withAgentLock(agentId: string, fn: () => Promise): Promise { + const previousLock = this.agentLocks.get(agentId) ?? Promise.resolve(); + + let releaseLock!: () => void; + const currentLock = new Promise((resolve) => { + releaseLock = resolve; + }); + this.agentLocks.set(agentId, currentLock); + + try { + await previousLock; + return await fn(); + } finally { + releaseLock(); + // Clean up the map entry if we are the last in the chain + if (this.agentLocks.get(agentId) === currentLock) { + this.agentLocks.delete(agentId); + } + } + } + /** * Transition agent from spawning to running state * @param agentId Unique agent identifier @@ -34,28 +75,34 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToRunning(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to running`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to running`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "running"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "running"); - // Set startedAt timestamp if not already set - const startedAt = currentState.startedAt ?? new Date().toISOString(); + // Set startedAt timestamp if not already set + const startedAt = currentState.startedAt ?? new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "running", undefined); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus( + agentId, + "running", + undefined + ); - // Ensure startedAt is set - if (!updatedState.startedAt) { - updatedState.startedAt = startedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure startedAt is set + if (!updatedState.startedAt) { + updatedState.startedAt = startedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.running", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.running", updatedState); - this.logger.log(`Agent ${agentId} transitioned to running`); - return updatedState; + this.logger.log(`Agent ${agentId} transitioned to running`); + return updatedState; + }); } /** @@ -65,35 +112,37 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToCompleted(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to completed`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to completed`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "completed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "completed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus( - agentId, - "completed", - undefined - ); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus( + agentId, + "completed", + undefined + ); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.completed", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.completed", updatedState); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.log(`Agent ${agentId} transitioned to completed`); - return updatedState; + this.logger.log(`Agent ${agentId} transitioned to completed`); + return updatedState; + }); } /** @@ -104,31 +153,33 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToFailed(agentId: string, error: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "failed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "failed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.failed", updatedState, error); + // Emit event + await this.publishStateChangeEvent("agent.failed", updatedState, error); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); - return updatedState; + this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); + return updatedState; + }); } /** @@ -138,31 +189,33 @@ export class AgentLifecycleService { * @throws Error if agent not found or invalid transition */ async transitionToKilled(agentId: string): Promise { - this.logger.log(`Transitioning agent ${agentId} to killed`); + return this.withAgentLock(agentId, async () => { + this.logger.log(`Transitioning agent ${agentId} to killed`); - const currentState = await this.getAgentState(agentId); - this.validateTransition(currentState.status, "killed"); + const currentState = await this.getAgentState(agentId); + this.validateTransition(currentState.status, "killed"); - // Set completedAt timestamp - const completedAt = new Date().toISOString(); + // Set completedAt timestamp + const completedAt = new Date().toISOString(); - // Update state in Valkey - const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined); + // Update state in Valkey + const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined); - // Ensure completedAt is set - if (!updatedState.completedAt) { - updatedState.completedAt = completedAt; - await this.valkeyService.setAgentState(updatedState); - } + // Ensure completedAt is set + if (!updatedState.completedAt) { + updatedState.completedAt = completedAt; + await this.valkeyService.setAgentState(updatedState); + } - // Emit event - await this.publishStateChangeEvent("agent.killed", updatedState); + // Emit event + await this.publishStateChangeEvent("agent.killed", updatedState); - // Schedule session cleanup - this.spawnerService.scheduleSessionCleanup(agentId); + // Schedule session cleanup + this.spawnerService.scheduleSessionCleanup(agentId); - this.logger.warn(`Agent ${agentId} transitioned to killed`); - return updatedState; + this.logger.warn(`Agent ${agentId} transitioned to killed`); + return updatedState; + }); } /**