Files
stack/apps/orchestrator/src/spawner/agent-lifecycle.service.ts

306 lines
9.7 KiB
TypeScript

import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common";
import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "./agent-spawner.service";
import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types";
import { isValidAgentTransition } from "../valkey/types/state.types";
/**
* Service responsible for managing agent lifecycle state transitions
*
* Manages state transitions through the agent lifecycle:
* spawning → running → completed/failed/killed
*
* - Enforces valid state transitions using state machine
* - Persists agent state changes to Valkey
* - Emits pub/sub events on state changes
* - Tracks agent metadata (startedAt, completedAt, error)
* - Uses per-agent mutex to prevent TOCTOU race conditions (CQ-ORCH-5)
*/
@Injectable()
export class AgentLifecycleService {
private readonly logger = new Logger(AgentLifecycleService.name);
/**
* Per-agent mutex map to serialize state transitions.
* Uses promise chaining so concurrent transitions to the same agent
* are queued and executed sequentially, preventing TOCTOU races
* where two concurrent requests could both read the same state,
* both validate it as valid, and both write, causing lost updates.
*/
private readonly agentLocks = new Map<string, Promise<void>>();
constructor(
private readonly valkeyService: ValkeyService,
@Inject(forwardRef(() => AgentSpawnerService))
private readonly spawnerService: AgentSpawnerService
) {
this.logger.log("AgentLifecycleService initialized");
}
/**
* Register a newly spawned agent in persistent state and emit spawned event.
*/
async registerSpawnedAgent(agentId: string, taskId: string): Promise<AgentState> {
await this.valkeyService.createAgent(agentId, taskId);
const createdState = await this.getAgentState(agentId);
const event: AgentEvent = {
type: "agent.spawned",
agentId,
taskId,
timestamp: new Date().toISOString(),
};
await this.valkeyService.publishEvent(event);
return createdState;
}
/**
* Acquire a per-agent mutex to serialize state transitions.
* Uses promise chaining: each caller chains onto the previous lock,
* ensuring transitions for the same agent are strictly sequential.
* Different agents can transition concurrently without contention.
*
* @param agentId Agent to acquire lock for
* @param fn Critical section to execute while holding the lock
* @returns Result of the critical section
*/
private async withAgentLock<T>(agentId: string, fn: () => Promise<T>): Promise<T> {
const previousLock = this.agentLocks.get(agentId) ?? Promise.resolve();
let releaseLock!: () => void;
const currentLock = new Promise<void>((resolve) => {
releaseLock = resolve;
});
this.agentLocks.set(agentId, currentLock);
try {
await previousLock;
return await fn();
} finally {
releaseLock();
// Clean up the map entry if we are the last in the chain
if (this.agentLocks.get(agentId) === currentLock) {
this.agentLocks.delete(agentId);
}
}
}
/**
* Transition agent from spawning to running state
* @param agentId Unique agent identifier
* @returns Updated agent state
* @throws Error if agent not found or invalid transition
*/
async transitionToRunning(agentId: string): Promise<AgentState> {
return this.withAgentLock(agentId, async () => {
this.logger.log(`Transitioning agent ${agentId} to running`);
const currentState = await this.getAgentState(agentId);
this.validateTransition(currentState.status, "running");
// Set startedAt timestamp if not already set
const startedAt = currentState.startedAt ?? new Date().toISOString();
// Update state in Valkey
const updatedState = await this.valkeyService.updateAgentStatus(
agentId,
"running",
undefined
);
// Ensure startedAt is set
if (!updatedState.startedAt) {
updatedState.startedAt = startedAt;
await this.valkeyService.setAgentState(updatedState);
}
// Emit event
await this.publishStateChangeEvent("agent.running", updatedState);
this.logger.log(`Agent ${agentId} transitioned to running`);
return updatedState;
});
}
/**
* Transition agent to completed state
* @param agentId Unique agent identifier
* @returns Updated agent state
* @throws Error if agent not found or invalid transition
*/
async transitionToCompleted(agentId: string): Promise<AgentState> {
return this.withAgentLock(agentId, async () => {
this.logger.log(`Transitioning agent ${agentId} to completed`);
const currentState = await this.getAgentState(agentId);
this.validateTransition(currentState.status, "completed");
// Set completedAt timestamp
const completedAt = new Date().toISOString();
// Update state in Valkey
const updatedState = await this.valkeyService.updateAgentStatus(
agentId,
"completed",
undefined
);
// Ensure completedAt is set
if (!updatedState.completedAt) {
updatedState.completedAt = completedAt;
await this.valkeyService.setAgentState(updatedState);
}
// Emit event
await this.publishStateChangeEvent("agent.completed", updatedState);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.log(`Agent ${agentId} transitioned to completed`);
return updatedState;
});
}
/**
* Transition agent to failed state with error
* @param agentId Unique agent identifier
* @param error Error message
* @returns Updated agent state
* @throws Error if agent not found or invalid transition
*/
async transitionToFailed(agentId: string, error: string): Promise<AgentState> {
return this.withAgentLock(agentId, async () => {
this.logger.log(`Transitioning agent ${agentId} to failed: ${error}`);
const currentState = await this.getAgentState(agentId);
this.validateTransition(currentState.status, "failed");
// Set completedAt timestamp
const completedAt = new Date().toISOString();
// Update state in Valkey
const updatedState = await this.valkeyService.updateAgentStatus(agentId, "failed", error);
// Ensure completedAt is set
if (!updatedState.completedAt) {
updatedState.completedAt = completedAt;
await this.valkeyService.setAgentState(updatedState);
}
// Emit event
await this.publishStateChangeEvent("agent.failed", updatedState, error);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.error(`Agent ${agentId} transitioned to failed: ${error}`);
return updatedState;
});
}
/**
* Transition agent to killed state
* @param agentId Unique agent identifier
* @returns Updated agent state
* @throws Error if agent not found or invalid transition
*/
async transitionToKilled(agentId: string): Promise<AgentState> {
return this.withAgentLock(agentId, async () => {
this.logger.log(`Transitioning agent ${agentId} to killed`);
const currentState = await this.getAgentState(agentId);
this.validateTransition(currentState.status, "killed");
// Set completedAt timestamp
const completedAt = new Date().toISOString();
// Update state in Valkey
const updatedState = await this.valkeyService.updateAgentStatus(agentId, "killed", undefined);
// Ensure completedAt is set
if (!updatedState.completedAt) {
updatedState.completedAt = completedAt;
await this.valkeyService.setAgentState(updatedState);
}
// Emit event
await this.publishStateChangeEvent("agent.killed", updatedState);
// Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId);
this.logger.warn(`Agent ${agentId} transitioned to killed`);
return updatedState;
});
}
/**
* Get current agent lifecycle state
* @param agentId Unique agent identifier
* @returns Agent state or null if not found
*/
async getAgentLifecycleState(agentId: string): Promise<AgentState | null> {
return this.valkeyService.getAgentState(agentId);
}
/**
* List all agent lifecycle states
* @returns Array of all agent states
*/
async listAgentLifecycleStates(): Promise<AgentState[]> {
return this.valkeyService.listAgents();
}
/**
* Get agent state and throw if not found
* @param agentId Unique agent identifier
* @returns Agent state
* @throws Error if agent not found
*/
private async getAgentState(agentId: string): Promise<AgentState> {
const state = await this.valkeyService.getAgentState(agentId);
if (!state) {
throw new Error(`Agent ${agentId} not found`);
}
return state;
}
/**
* Validate state transition is allowed
* @param from Current state
* @param to Target state
* @throws Error if transition is invalid
*/
private validateTransition(from: AgentStatus, to: AgentStatus): void {
if (!isValidAgentTransition(from, to)) {
throw new Error(`Invalid state transition from ${from} to ${to}`);
}
}
/**
* Publish state change event
* @param eventType Type of event
* @param state Updated agent state
* @param error Optional error message
*/
private async publishStateChangeEvent(
eventType: "agent.running" | "agent.completed" | "agent.failed" | "agent.killed",
state: AgentState,
error?: string
): Promise<void> {
const event: AgentEvent = {
type: eventType,
agentId: state.agentId,
taskId: state.taskId,
timestamp: new Date().toISOString(),
error,
};
await this.valkeyService.publishEvent(event);
}
}