Files
stack/apps/orchestrator/src/valkey/valkey.service.ts
Jason Woltje 3880993b60
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix(SEC-ORCH-28+29): Add Valkey connection timeout + workItems MaxLength
SEC-ORCH-28: Add connectTimeout (5000ms default) and commandTimeout
(3000ms default) to Valkey/Redis client to prevent indefinite connection
hangs. Both are configurable via VALKEY_CONNECT_TIMEOUT_MS and
VALKEY_COMMAND_TIMEOUT_MS environment variables.

SEC-ORCH-29: Add @ArrayMaxSize(50) and @MaxLength(2000) to workItems
in AgentContextDto to prevent memory exhaustion from unbounded input.
Also adds @ArrayMaxSize(20) and @MaxLength(200) to skills array.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 15:19:44 -06:00

166 lines
4.5 KiB
TypeScript

import { Injectable, OnModuleDestroy, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { ValkeyClient, ValkeyClientConfig, EventErrorHandler } from "./valkey.client";
import type {
TaskState,
AgentState,
TaskStatus,
AgentStatus,
OrchestratorEvent,
EventHandler,
TaskContext,
} from "./types";
/**
* NestJS service for Valkey state management and pub/sub
*/
@Injectable()
export class ValkeyService implements OnModuleDestroy {
private readonly client: ValkeyClient;
private readonly logger = new Logger(ValkeyService.name);
constructor(private readonly configService: ConfigService) {
const config: ValkeyClientConfig = {
host: this.configService.get<string>("orchestrator.valkey.host", "localhost"),
port: this.configService.get<number>("orchestrator.valkey.port", 6379),
connectTimeout: this.configService.get<number>("orchestrator.valkey.connectTimeout", 5000),
commandTimeout: this.configService.get<number>("orchestrator.valkey.commandTimeout", 3000),
logger: {
error: (message: string, error?: unknown) => {
this.logger.error(message, error instanceof Error ? error.stack : String(error));
},
},
};
const password = this.configService.get<string>("orchestrator.valkey.password");
if (password) {
config.password = password;
} else {
// SEC-ORCH-15: Warn when Valkey password is not configured
const nodeEnv = this.configService.get<string>("NODE_ENV", "development");
const isProduction = nodeEnv === "production";
if (isProduction) {
this.logger.warn(
"SECURITY WARNING: VALKEY_PASSWORD is not configured in production environment. " +
"Valkey connections without authentication are insecure. " +
"Set VALKEY_PASSWORD environment variable to secure your Valkey instance."
);
} else {
this.logger.warn(
"VALKEY_PASSWORD is not configured. " +
"Consider setting VALKEY_PASSWORD for secure Valkey connections."
);
}
}
this.client = new ValkeyClient(config);
}
async onModuleDestroy(): Promise<void> {
await this.client.disconnect();
}
/**
* Task State Management
*/
async getTaskState(taskId: string): Promise<TaskState | null> {
return this.client.getTaskState(taskId);
}
async setTaskState(state: TaskState): Promise<void> {
return this.client.setTaskState(state);
}
async deleteTaskState(taskId: string): Promise<void> {
return this.client.deleteTaskState(taskId);
}
async updateTaskStatus(
taskId: string,
status: TaskStatus,
agentId?: string,
error?: string
): Promise<TaskState> {
return this.client.updateTaskStatus(taskId, status, agentId, error);
}
async listTasks(): Promise<TaskState[]> {
return this.client.listTasks();
}
/**
* Agent State Management
*/
async getAgentState(agentId: string): Promise<AgentState | null> {
return this.client.getAgentState(agentId);
}
async setAgentState(state: AgentState): Promise<void> {
return this.client.setAgentState(state);
}
async deleteAgentState(agentId: string): Promise<void> {
return this.client.deleteAgentState(agentId);
}
async updateAgentStatus(
agentId: string,
status: AgentStatus,
error?: string
): Promise<AgentState> {
return this.client.updateAgentStatus(agentId, status, error);
}
async listAgents(): Promise<AgentState[]> {
return this.client.listAgents();
}
/**
* Event Pub/Sub
*/
async publishEvent(event: OrchestratorEvent): Promise<void> {
return this.client.publishEvent(event);
}
async subscribeToEvents(handler: EventHandler, errorHandler?: EventErrorHandler): Promise<void> {
return this.client.subscribeToEvents(handler, errorHandler);
}
/**
* Convenience methods
*/
async createTask(taskId: string, context: TaskContext): Promise<void> {
const now = new Date().toISOString();
const state: TaskState = {
taskId,
status: "pending",
context,
createdAt: now,
updatedAt: now,
};
await this.setTaskState(state);
}
async createAgent(agentId: string, taskId: string): Promise<void> {
const state: AgentState = {
agentId,
status: "spawning",
taskId,
};
await this.setAgentState(state);
}
/**
* Check Valkey connectivity
* @returns true if connection is healthy, false otherwise
*/
async ping(): Promise<boolean> {
return this.client.ping();
}
}