import Redis from "ioredis"; import type { TaskState, AgentState, TaskStatus, AgentStatus, OrchestratorEvent, EventHandler, } from "./types"; import { isValidTaskTransition, isValidAgentTransition } from "./types"; export interface ValkeyClientConfig { host: string; port: number; password?: string; db?: number; logger?: { error: (message: string, error?: unknown) => void; }; } /** * Error handler for event parsing failures */ export type EventErrorHandler = (error: Error, rawMessage: string, channel: string) => void; /** * Valkey client for state management and pub/sub */ export class ValkeyClient { private readonly client: Redis; private subscriber?: Redis; private readonly logger?: { error: (message: string, error?: unknown) => void; }; constructor(config: ValkeyClientConfig) { this.client = new Redis({ host: config.host, port: config.port, password: config.password, db: config.db, }); this.logger = config.logger; } /** * Disconnect from Valkey */ async disconnect(): Promise { await this.client.quit(); if (this.subscriber) { await this.subscriber.quit(); } } /** * Task State Management */ async getTaskState(taskId: string): Promise { const key = this.getTaskKey(taskId); const data = await this.client.get(key); if (!data) { return null; } return JSON.parse(data) as TaskState; } async setTaskState(state: TaskState): Promise { const key = this.getTaskKey(state.taskId); await this.client.set(key, JSON.stringify(state)); } async deleteTaskState(taskId: string): Promise { const key = this.getTaskKey(taskId); await this.client.del(key); } async updateTaskStatus( taskId: string, status: TaskStatus, agentId?: string, error?: string ): Promise { const existing = await this.getTaskState(taskId); if (!existing) { throw new Error(`Task ${taskId} not found`); } // Validate state transition if (!isValidTaskTransition(existing.status, status)) { throw new Error(`Invalid task state transition from ${existing.status} to ${status}`); } const updated: TaskState = { ...existing, status, agentId: agentId ?? existing.agentId, updatedAt: new Date().toISOString(), metadata: { ...existing.metadata, ...(error && { error }), }, }; await this.setTaskState(updated); return updated; } async listTasks(): Promise { const pattern = "orchestrator:task:*"; const keys = await this.scanKeys(pattern); const tasks: TaskState[] = []; for (const key of keys) { const data = await this.client.get(key); if (data) { tasks.push(JSON.parse(data) as TaskState); } } return tasks; } /** * Agent State Management */ async getAgentState(agentId: string): Promise { const key = this.getAgentKey(agentId); const data = await this.client.get(key); if (!data) { return null; } return JSON.parse(data) as AgentState; } async setAgentState(state: AgentState): Promise { const key = this.getAgentKey(state.agentId); await this.client.set(key, JSON.stringify(state)); } async deleteAgentState(agentId: string): Promise { const key = this.getAgentKey(agentId); await this.client.del(key); } async updateAgentStatus( agentId: string, status: AgentStatus, error?: string ): Promise { const existing = await this.getAgentState(agentId); if (!existing) { throw new Error(`Agent ${agentId} not found`); } // Validate state transition if (!isValidAgentTransition(existing.status, status)) { throw new Error(`Invalid agent state transition from ${existing.status} to ${status}`); } const now = new Date().toISOString(); const updated: AgentState = { ...existing, status, ...(status === "running" && !existing.startedAt && { startedAt: now }), ...((["completed", "failed", "killed"] as AgentStatus[]).includes(status) && { completedAt: now, }), ...(error && { error }), }; await this.setAgentState(updated); return updated; } async listAgents(): Promise { const pattern = "orchestrator:agent:*"; const keys = await this.scanKeys(pattern); const agents: AgentState[] = []; for (const key of keys) { const data = await this.client.get(key); if (data) { agents.push(JSON.parse(data) as AgentState); } } return agents; } /** * Event Pub/Sub */ async publishEvent(event: OrchestratorEvent): Promise { const channel = "orchestrator:events"; await this.client.publish(channel, JSON.stringify(event)); } async subscribeToEvents(handler: EventHandler, errorHandler?: EventErrorHandler): Promise { this.subscriber ??= this.client.duplicate(); this.subscriber.on("message", (channel: string, message: string) => { try { const event = JSON.parse(message) as OrchestratorEvent; void handler(event); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); // Log the error if (this.logger) { this.logger.error( `Failed to parse event from channel ${channel}: ${errorObj.message}`, errorObj ); } // Invoke error handler if provided if (errorHandler) { errorHandler(errorObj, message, channel); } } }); await this.subscriber.subscribe("orchestrator:events"); } /** * Private helper methods */ /** * Scan keys using SCAN command (non-blocking alternative to KEYS) * Uses cursor-based iteration to avoid blocking Redis */ private async scanKeys(pattern: string): Promise { const keys: string[] = []; let cursor = "0"; do { const [nextCursor, batch] = await this.client.scan(cursor, "MATCH", pattern, "COUNT", 100); cursor = nextCursor; keys.push(...batch); } while (cursor !== "0"); return keys; } private getTaskKey(taskId: string): string { return `orchestrator:task:${taskId}`; } private getAgentKey(agentId: string): string { return `orchestrator:agent:${agentId}`; } }