Files
stack/apps/orchestrator/src/valkey/valkey.client.ts
Jason Woltje 3880993b60
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix(SEC-ORCH-28+29): Add Valkey connection timeout + workItems MaxLength
SEC-ORCH-28: Add connectTimeout (5000ms default) and commandTimeout
(3000ms default) to Valkey/Redis client to prevent indefinite connection
hangs. Both are configurable via VALKEY_CONNECT_TIMEOUT_MS and
VALKEY_COMMAND_TIMEOUT_MS environment variables.

SEC-ORCH-29: Add @ArrayMaxSize(50) and @MaxLength(2000) to workItems
in AgentContextDto to prevent memory exhaustion from unbounded input.
Also adds @ArrayMaxSize(20) and @MaxLength(200) to skills array.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 15:19:44 -06:00

381 lines
9.9 KiB
TypeScript

import Redis from "ioredis";
import { ZodError } from "zod";
import type {
TaskState,
AgentState,
TaskStatus,
AgentStatus,
OrchestratorEvent,
EventHandler,
} from "./types";
import { isValidTaskTransition, isValidAgentTransition } from "./types";
import { TaskStateSchema, AgentStateSchema, OrchestratorEventSchema } from "./schemas";
export interface ValkeyClientConfig {
host: string;
port: number;
password?: string;
db?: number;
/** Connection timeout in milliseconds (default: 5000) */
connectTimeout?: number;
/** Command timeout in milliseconds (default: 3000) */
commandTimeout?: number;
logger?: {
error: (message: string, error?: unknown) => void;
};
}
/**
* Error handler for event parsing failures
*/
export type EventErrorHandler = (error: Error, rawMessage: string, channel: string) => void;
/**
* Error thrown when Redis data fails validation
*/
export class ValkeyValidationError extends Error {
constructor(
message: string,
public readonly key: string,
public readonly dataSnippet: string,
public readonly validationError: ZodError
) {
super(message);
this.name = "ValkeyValidationError";
}
}
/**
* Valkey client for state management and pub/sub
*/
export class ValkeyClient {
private readonly client: Redis;
private subscriber?: Redis;
private readonly logger?: {
error: (message: string, error?: unknown) => void;
};
constructor(config: ValkeyClientConfig) {
this.client = new Redis({
host: config.host,
port: config.port,
password: config.password,
db: config.db,
connectTimeout: config.connectTimeout ?? 5000,
commandTimeout: config.commandTimeout ?? 3000,
});
this.logger = config.logger;
}
/**
* Disconnect from Valkey
*/
async disconnect(): Promise<void> {
await this.client.quit();
if (this.subscriber) {
await this.subscriber.quit();
}
}
/**
* Check Valkey connectivity
* @returns true if connection is healthy, false otherwise
*/
async ping(): Promise<boolean> {
try {
await this.client.ping();
return true;
} catch {
return false;
}
}
/**
* Task State Management
*/
async getTaskState(taskId: string): Promise<TaskState | null> {
const key = this.getTaskKey(taskId);
const data = await this.client.get(key);
if (!data) {
return null;
}
return this.parseAndValidateTaskState(key, data);
}
async setTaskState(state: TaskState): Promise<void> {
const key = this.getTaskKey(state.taskId);
await this.client.set(key, JSON.stringify(state));
}
async deleteTaskState(taskId: string): Promise<void> {
const key = this.getTaskKey(taskId);
await this.client.del(key);
}
async updateTaskStatus(
taskId: string,
status: TaskStatus,
agentId?: string,
error?: string
): Promise<TaskState> {
const existing = await this.getTaskState(taskId);
if (!existing) {
throw new Error(`Task ${taskId} not found`);
}
// Validate state transition
if (!isValidTaskTransition(existing.status, status)) {
throw new Error(`Invalid task state transition from ${existing.status} to ${status}`);
}
const updated: TaskState = {
...existing,
status,
agentId: agentId ?? existing.agentId,
updatedAt: new Date().toISOString(),
metadata: {
...existing.metadata,
...(error && { error }),
},
};
await this.setTaskState(updated);
return updated;
}
async listTasks(): Promise<TaskState[]> {
const pattern = "orchestrator:task:*";
const keys = await this.scanKeys(pattern);
if (keys.length === 0) {
return [];
}
// Use MGET for batch retrieval instead of N individual GETs
const values = await this.client.mget(...keys);
const tasks: TaskState[] = [];
for (let i = 0; i < keys.length; i++) {
const data = values[i];
// Handle null values (key deleted between SCAN and MGET)
if (data) {
const task = this.parseAndValidateTaskState(keys[i], data);
tasks.push(task);
}
}
return tasks;
}
/**
* Agent State Management
*/
async getAgentState(agentId: string): Promise<AgentState | null> {
const key = this.getAgentKey(agentId);
const data = await this.client.get(key);
if (!data) {
return null;
}
return this.parseAndValidateAgentState(key, data);
}
async setAgentState(state: AgentState): Promise<void> {
const key = this.getAgentKey(state.agentId);
await this.client.set(key, JSON.stringify(state));
}
async deleteAgentState(agentId: string): Promise<void> {
const key = this.getAgentKey(agentId);
await this.client.del(key);
}
async updateAgentStatus(
agentId: string,
status: AgentStatus,
error?: string
): Promise<AgentState> {
const existing = await this.getAgentState(agentId);
if (!existing) {
throw new Error(`Agent ${agentId} not found`);
}
// Validate state transition
if (!isValidAgentTransition(existing.status, status)) {
throw new Error(`Invalid agent state transition from ${existing.status} to ${status}`);
}
const now = new Date().toISOString();
const updated: AgentState = {
...existing,
status,
...(status === "running" && !existing.startedAt && { startedAt: now }),
...((["completed", "failed", "killed"] as AgentStatus[]).includes(status) && {
completedAt: now,
}),
...(error && { error }),
};
await this.setAgentState(updated);
return updated;
}
async listAgents(): Promise<AgentState[]> {
const pattern = "orchestrator:agent:*";
const keys = await this.scanKeys(pattern);
if (keys.length === 0) {
return [];
}
// Use MGET for batch retrieval instead of N individual GETs
const values = await this.client.mget(...keys);
const agents: AgentState[] = [];
for (let i = 0; i < keys.length; i++) {
const data = values[i];
// Handle null values (key deleted between SCAN and MGET)
if (data) {
const agent = this.parseAndValidateAgentState(keys[i], data);
agents.push(agent);
}
}
return agents;
}
/**
* Event Pub/Sub
*/
async publishEvent(event: OrchestratorEvent): Promise<void> {
const channel = "orchestrator:events";
await this.client.publish(channel, JSON.stringify(event));
}
async subscribeToEvents(handler: EventHandler, errorHandler?: EventErrorHandler): Promise<void> {
this.subscriber ??= this.client.duplicate();
this.subscriber.on("message", (channel: string, message: string) => {
try {
const parsed: unknown = JSON.parse(message);
const event = OrchestratorEventSchema.parse(parsed);
void handler(event);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
// Log the error with context
if (this.logger) {
const snippet = message.length > 100 ? `${message.substring(0, 100)}...` : message;
if (error instanceof ZodError) {
this.logger.error(
`Failed to validate event from channel ${channel}: ${errorObj.message} (data: ${snippet})`,
errorObj
);
} else {
this.logger.error(
`Failed to parse event from channel ${channel}: ${errorObj.message}`,
errorObj
);
}
}
// Invoke error handler if provided
if (errorHandler) {
errorHandler(errorObj, message, channel);
}
}
});
await this.subscriber.subscribe("orchestrator:events");
}
/**
* Private helper methods
*/
/**
* Scan keys using SCAN command (non-blocking alternative to KEYS)
* Uses cursor-based iteration to avoid blocking Redis
*/
private async scanKeys(pattern: string): Promise<string[]> {
const keys: string[] = [];
let cursor = "0";
do {
const [nextCursor, batch] = await this.client.scan(cursor, "MATCH", pattern, "COUNT", 100);
cursor = nextCursor;
keys.push(...batch);
} while (cursor !== "0");
return keys;
}
private getTaskKey(taskId: string): string {
return `orchestrator:task:${taskId}`;
}
private getAgentKey(agentId: string): string {
return `orchestrator:agent:${agentId}`;
}
/**
* Parse and validate task state data from Redis
* @throws ValkeyValidationError if data is invalid
*/
private parseAndValidateTaskState(key: string, data: string): TaskState {
try {
const parsed: unknown = JSON.parse(data);
return TaskStateSchema.parse(parsed);
} catch (error) {
if (error instanceof ZodError) {
const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data;
const validationError = new ValkeyValidationError(
`Invalid task state data at key ${key}: ${error.message}`,
key,
snippet,
error
);
if (this.logger) {
this.logger.error(validationError.message, validationError);
}
throw validationError;
}
throw error;
}
}
/**
* Parse and validate agent state data from Redis
* @throws ValkeyValidationError if data is invalid
*/
private parseAndValidateAgentState(key: string, data: string): AgentState {
try {
const parsed: unknown = JSON.parse(data);
return AgentStateSchema.parse(parsed);
} catch (error) {
if (error instanceof ZodError) {
const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data;
const validationError = new ValkeyValidationError(
`Invalid agent state data at key ${key}: ${error.message}`,
key,
snippet,
error
);
if (this.logger) {
this.logger.error(validationError.message, validationError);
}
throw validationError;
}
throw error;
}
}
}