import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; import { ValkeyService } from "../valkey/valkey.service"; import { AgentSpawnerService } from "./agent-spawner.service"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import { isValidAgentTransition } from "../valkey/types/state.types"; /** * Service responsible for managing agent lifecycle state transitions * * Manages state transitions through the agent lifecycle: * spawning → running → completed/failed/killed * * - Enforces valid state transitions using state machine * - Persists agent state changes to Valkey * - Emits pub/sub events on state changes * - Tracks agent metadata (startedAt, completedAt, error) * - Uses per-agent mutex to prevent TOCTOU race conditions (CQ-ORCH-5) */ @Injectable() export class AgentLifecycleService { private readonly logger = new Logger(AgentLifecycleService.name); /** * Per-agent mutex map to serialize state transitions. * Uses promise chaining so concurrent transitions to the same agent * are queued and executed sequentially, preventing TOCTOU races * where two concurrent requests could both read the same state, * both validate it as valid, and both write, causing lost updates. */ private readonly agentLocks = new Map>(); constructor( private readonly valkeyService: ValkeyService, @Inject(forwardRef(() => AgentSpawnerService)) private readonly spawnerService: AgentSpawnerService ) { this.logger.log("AgentLifecycleService initialized"); } /** * Register a newly spawned agent in persistent state and emit spawned event. */ async registerSpawnedAgent(agentId: string, taskId: string): Promise { await this.valkeyService.createAgent(agentId, taskId); const createdState = await this.getAgentState(agentId); const event: AgentEvent = { type: "agent.spawned", agentId, taskId, timestamp: new Date().toISOString(), }; await this.valkeyService.publishEvent(event); return createdState; } /** * Acquire a per-agent mutex to serialize state transitions. * Uses promise chaining: each caller chains onto the previous lock, * ensuring transitions for the same agent are strictly sequential. * Different agents can transition concurrently without contention. * * @param agentId Agent to acquire lock for * @param fn Critical section to execute while holding the lock * @returns Result of the critical section */ private async withAgentLock(agentId: string, fn: () => Promise): Promise { const previousLock = this.agentLocks.get(agentId) ?? Promise.resolve(); let releaseLock!: () => void; const currentLock = new Promise((resolve) => { releaseLock = resolve; }); this.agentLocks.set(agentId, currentLock); try { await previousLock; return await fn(); } finally { releaseLock(); // Clean up the map entry if we are the last in the chain if (this.agentLocks.get(agentId) === currentLock) { this.agentLocks.delete(agentId); } } } /** * Transition agent from spawning to running state * @param agentId Unique agent identifier * @returns Updated agent state * @throws Error if agent not found or invalid transition */ async transitionToRunning(agentId: string): Promise { return this.withAgentLock(agentId, async () => { this.logger.log(`Transitioning agent ${agentId} to running`); const currentState = await this.getAgentState(agentId); this.validateTransition(currentState.status, "running"); // Set startedAt timestamp if not already set const startedAt = currentState.startedAt ?? new Date().toISOString(); // Update state in Valkey const updatedState = await this.valkeyService.updateAgentStatus( agentId, "running", undefined ); // Ensure startedAt is set if (!updatedState.startedAt) { updatedState.startedAt = startedAt; await this.valkeyService.setAgentState(updatedState); } // Emit event await this.publishStateChangeEvent("agent.running", updatedState); this.logger.log(`Agent ${agentId} transitioned to running`); return updatedState; }); } /** * Transition agent to completed state * @param agentId Unique agent identifier * @returns Updated agent state * @throws Error if agent not found or invalid transition */ async transitionToCompleted(agentId: string): Promise { return this.withAgentLock(agentId, async () => { this.logger.log(`Transitioning agent ${agentId} to completed`); const currentState = await this.getAgentState(agentId); this.validateTransition(currentState.status, "completed"); // Set completedAt timestamp const completedAt = new Date().toISOString(); // Update state in Valkey const updatedState = await this.valkeyService.updateAgentStatus( agentId, "completed", undefined ); // Ensure completedAt is set if (!updatedState.completedAt) { updatedState.completedAt = completedAt; await this.valkeyService.setAgentState(updatedState); } // Emit event await this.publishStateChangeEvent("agent.completed", updatedState); // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); this.logger.log(`Agent ${agentId} transitioned to completed`); return updatedState; }); } /** * Transition agent to failed state with error * @param agentId Unique agent identifier * @param error Error message * @returns Updated agent state * @throws Error if agent not found or invalid transition */ async transitionToFailed(agentId: string, error: string): Promise { return this.withAgentLock(agentId, async () => { this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`); const currentState = await this.getAgentState(agentId); this.validateTransition(currentState.status, "failed"); // Set completedAt timestamp const completedAt = new Date().toISOString(); // Update state in Valkey const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error); // Ensure completedAt is set if (!updatedState.completedAt) { updatedState.completedAt = completedAt; await this.valkeyService.setAgentState(updatedState); } // Emit event await this.publishStateChangeEvent("agent.failed", updatedState, error); // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`); return updatedState; }); } /** * Transition agent to killed state * @param agentId Unique agent identifier * @returns Updated agent state * @throws Error if agent not found or invalid transition */ async transitionToKilled(agentId: string): Promise { return this.withAgentLock(agentId, async () => { this.logger.log(`Transitioning agent ${agentId} to killed`); const currentState = await this.getAgentState(agentId); this.validateTransition(currentState.status, "killed"); // Set completedAt timestamp const completedAt = new Date().toISOString(); // Update state in Valkey const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined); // Ensure completedAt is set if (!updatedState.completedAt) { updatedState.completedAt = completedAt; await this.valkeyService.setAgentState(updatedState); } // Emit event await this.publishStateChangeEvent("agent.killed", updatedState); // Schedule session cleanup this.spawnerService.scheduleSessionCleanup(agentId); this.logger.warn(`Agent ${agentId} transitioned to killed`); return updatedState; }); } /** * Get current agent lifecycle state * @param agentId Unique agent identifier * @returns Agent state or null if not found */ async getAgentLifecycleState(agentId: string): Promise { return this.valkeyService.getAgentState(agentId); } /** * List all agent lifecycle states * @returns Array of all agent states */ async listAgentLifecycleStates(): Promise { return this.valkeyService.listAgents(); } /** * Get agent state and throw if not found * @param agentId Unique agent identifier * @returns Agent state * @throws Error if agent not found */ private async getAgentState(agentId: string): Promise { const state = await this.valkeyService.getAgentState(agentId); if (!state) { throw new Error(`Agent ${agentId} not found`); } return state; } /** * Validate state transition is allowed * @param from Current state * @param to Target state * @throws Error if transition is invalid */ private validateTransition(from: AgentStatus, to: AgentStatus): void { if (!isValidAgentTransition(from, to)) { throw new Error(`Invalid state transition from ${from} to ${to}`); } } /** * Publish state change event * @param eventType Type of event * @param state Updated agent state * @param error Optional error message */ private async publishStateChangeEvent( eventType: "agent.running" | "agent.completed" | "agent.failed" | "agent.killed", state: AgentState, error?: string ): Promise { const event: AgentEvent = { type: eventType, agentId: state.agentId, taskId: state.taskId, timestamp: new Date().toISOString(), error, }; await this.valkeyService.publishEvent(event); } }