From 6552edaa1177bbef75e3f6b06af9a5e06628ceba Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Thu, 5 Feb 2026 15:54:48 -0600 Subject: [PATCH] fix(#337): Add Zod validation for Redis deserialization - Created Zod schemas for TaskState, AgentState, and OrchestratorEvent - Added ValkeyValidationError class for detailed error context - Validate task and agent state data after JSON.parse - Validate events in subscribeToEvents handler - Corrupted/tampered data now rejected with clear errors including: - Key name for context - Data snippet (truncated to 100 chars) - Underlying Zod validation error - Prevents silent propagation of invalid data (SEC-ORCH-6) - Added 20 new tests for validation scenarios Refs #337 Co-Authored-By: Claude Opus 4.5 --- apps/orchestrator/src/valkey/schemas/index.ts | 5 + .../src/valkey/schemas/state.schemas.ts | 123 +++++++ .../src/valkey/valkey.client.spec.ts | 347 +++++++++++++++++- apps/orchestrator/src/valkey/valkey.client.ts | 100 ++++- 4 files changed, 551 insertions(+), 24 deletions(-) create mode 100644 apps/orchestrator/src/valkey/schemas/index.ts create mode 100644 apps/orchestrator/src/valkey/schemas/state.schemas.ts diff --git a/apps/orchestrator/src/valkey/schemas/index.ts b/apps/orchestrator/src/valkey/schemas/index.ts new file mode 100644 index 0000000..4330865 --- /dev/null +++ b/apps/orchestrator/src/valkey/schemas/index.ts @@ -0,0 +1,5 @@ +/** + * Valkey schema exports + */ + +export * from "./state.schemas"; diff --git a/apps/orchestrator/src/valkey/schemas/state.schemas.ts b/apps/orchestrator/src/valkey/schemas/state.schemas.ts new file mode 100644 index 0000000..0274de5 --- /dev/null +++ b/apps/orchestrator/src/valkey/schemas/state.schemas.ts @@ -0,0 +1,123 @@ +/** + * Zod schemas for runtime validation of deserialized Redis data + * + * These schemas validate data after JSON.parse() to prevent + * corrupted or tampered data from propagating silently. + */ + +import { z } from "zod"; + +/** + * Task status enum schema + */ +export const TaskStatusSchema = z.enum(["pending", "assigned", "executing", "completed", "failed"]); + +/** + * Agent status enum schema + */ +export const AgentStatusSchema = z.enum(["spawning", "running", "completed", "failed", "killed"]); + +/** + * Task context schema + */ +export const TaskContextSchema = z.object({ + repository: z.string(), + branch: z.string(), + workItems: z.array(z.string()), + skills: z.array(z.string()).optional(), +}); + +/** + * Task state schema - validates deserialized task data from Redis + */ +export const TaskStateSchema = z.object({ + taskId: z.string(), + status: TaskStatusSchema, + agentId: z.string().optional(), + context: TaskContextSchema, + createdAt: z.string(), + updatedAt: z.string(), + metadata: z.record(z.unknown()).optional(), +}); + +/** + * Agent state schema - validates deserialized agent data from Redis + */ +export const AgentStateSchema = z.object({ + agentId: z.string(), + status: AgentStatusSchema, + taskId: z.string(), + startedAt: z.string().optional(), + completedAt: z.string().optional(), + error: z.string().optional(), + metadata: z.record(z.unknown()).optional(), +}); + +/** + * Event type enum schema + */ +export const EventTypeSchema = z.enum([ + "agent.spawned", + "agent.running", + "agent.completed", + "agent.failed", + "agent.killed", + "agent.cleanup", + "task.assigned", + "task.queued", + "task.processing", + "task.retry", + "task.executing", + "task.completed", + "task.failed", +]); + +/** + * Agent event schema + */ +export const AgentEventSchema = z.object({ + type: z.enum([ + "agent.spawned", + "agent.running", + "agent.completed", + "agent.failed", + "agent.killed", + "agent.cleanup", + ]), + timestamp: z.string(), + agentId: z.string(), + taskId: z.string(), + error: z.string().optional(), + cleanup: z + .object({ + docker: z.boolean(), + worktree: z.boolean(), + state: z.boolean(), + }) + .optional(), +}); + +/** + * Task event schema + */ +export const TaskEventSchema = z.object({ + type: z.enum([ + "task.assigned", + "task.queued", + "task.processing", + "task.retry", + "task.executing", + "task.completed", + "task.failed", + ]), + timestamp: z.string(), + taskId: z.string().optional(), + agentId: z.string().optional(), + error: z.string().optional(), + data: z.record(z.unknown()).optional(), +}); + +/** + * Combined orchestrator event schema (discriminated union) + */ +export const OrchestratorEventSchema = z.union([AgentEventSchema, TaskEventSchema]); diff --git a/apps/orchestrator/src/valkey/valkey.client.spec.ts b/apps/orchestrator/src/valkey/valkey.client.spec.ts index f6e2035..4cb996e 100644 --- a/apps/orchestrator/src/valkey/valkey.client.spec.ts +++ b/apps/orchestrator/src/valkey/valkey.client.spec.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; -import { ValkeyClient } from "./valkey.client"; +import { ValkeyClient, ValkeyValidationError } from "./valkey.client"; import type { TaskState, AgentState, OrchestratorEvent } from "./types"; // Create a shared mock instance that will be used across all tests @@ -479,13 +479,18 @@ describe("ValkeyClient", () => { }); it("should filter out null values in listTasks", async () => { + const validTask = { + taskId: "task-1", + status: "pending", + context: { repository: "repo", branch: "main", workItems: ["item-1"] }, + createdAt: "2026-02-02T10:00:00Z", + updatedAt: "2026-02-02T10:00:00Z", + }; mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:task:task-1", "orchestrator:task:task-2"], ]); - mockRedis.get - .mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" })) - .mockResolvedValueOnce(null); // Simulate deleted task + mockRedis.get.mockResolvedValueOnce(JSON.stringify(validTask)).mockResolvedValueOnce(null); // Simulate deleted task const result = await client.listTasks(); @@ -494,13 +499,16 @@ describe("ValkeyClient", () => { }); it("should filter out null values in listAgents", async () => { + const validAgent = { + agentId: "agent-1", + status: "running", + taskId: "task-1", + }; mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"], ]); - mockRedis.get - .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" })) - .mockResolvedValueOnce(null); // Simulate deleted agent + mockRedis.get.mockResolvedValueOnce(JSON.stringify(validAgent)).mockResolvedValueOnce(null); // Simulate deleted agent const result = await client.listAgents(); @@ -510,6 +518,20 @@ describe("ValkeyClient", () => { }); describe("SCAN-based iteration (large key sets)", () => { + const makeValidTask = (taskId: string): object => ({ + taskId, + status: "pending", + context: { repository: "repo", branch: "main", workItems: ["item-1"] }, + createdAt: "2026-02-02T10:00:00Z", + updatedAt: "2026-02-02T10:00:00Z", + }); + + const makeValidAgent = (agentId: string): object => ({ + agentId, + status: "running", + taskId: "task-1", + }); + it("should handle multiple SCAN iterations for tasks", async () => { // Simulate SCAN returning multiple batches with cursor pagination mockRedis.scan @@ -517,9 +539,9 @@ describe("ValkeyClient", () => { .mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done mockRedis.get - .mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" })) - .mockResolvedValueOnce(JSON.stringify({ taskId: "task-2", status: "pending" })) - .mockResolvedValueOnce(JSON.stringify({ taskId: "task-3", status: "pending" })); + .mockResolvedValueOnce(JSON.stringify(makeValidTask("task-1"))) + .mockResolvedValueOnce(JSON.stringify(makeValidTask("task-2"))) + .mockResolvedValueOnce(JSON.stringify(makeValidTask("task-3"))); const result = await client.listTasks(); @@ -552,10 +574,10 @@ describe("ValkeyClient", () => { .mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done mockRedis.get - .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" })) - .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-2", status: "running" })) - .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-3", status: "running" })) - .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-4", status: "running" })); + .mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-1"))) + .mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-2"))) + .mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-3"))) + .mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-4"))); const result = await client.listAgents(); @@ -573,4 +595,301 @@ describe("ValkeyClient", () => { expect(result).toHaveLength(0); }); }); + + describe("Zod Validation (SEC-ORCH-6)", () => { + describe("Task State Validation", () => { + const validTaskState: TaskState = { + taskId: "task-123", + status: "pending", + context: { + repository: "https://github.com/example/repo", + branch: "main", + workItems: ["item-1"], + }, + createdAt: "2026-02-02T10:00:00Z", + updatedAt: "2026-02-02T10:00:00Z", + }; + + it("should accept valid task state data", async () => { + mockRedis.get.mockResolvedValue(JSON.stringify(validTaskState)); + + const result = await client.getTaskState("task-123"); + + expect(result).toEqual(validTaskState); + }); + + it("should reject task with missing required fields", async () => { + const invalidTask = { taskId: "task-123" }; // Missing status, context, etc. + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); + }); + + it("should reject task with invalid status value", async () => { + const invalidTask = { + ...validTaskState, + status: "invalid-status", // Not a valid TaskStatus + }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); + }); + + it("should reject task with missing context fields", async () => { + const invalidTask = { + ...validTaskState, + context: { repository: "repo" }, // Missing branch and workItems + }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); + }); + + it("should reject corrupted JSON data for task", async () => { + mockRedis.get.mockResolvedValue("not valid json {{{"); + + await expect(client.getTaskState("task-123")).rejects.toThrow(); + }); + + it("should include key name in validation error", async () => { + const invalidTask = { taskId: "task-123" }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + try { + await client.getTaskState("task-123"); + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(ValkeyValidationError); + expect((error as ValkeyValidationError).key).toBe("orchestrator:task:task-123"); + } + }); + + it("should include data snippet in validation error", async () => { + const invalidTask = { taskId: "task-123", invalidField: "x".repeat(200) }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + try { + await client.getTaskState("task-123"); + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(ValkeyValidationError); + const valError = error as ValkeyValidationError; + expect(valError.dataSnippet.length).toBeLessThanOrEqual(103); // 100 chars + "..." + } + }); + + it("should log validation errors with logger", async () => { + const loggerError = vi.fn(); + const clientWithLogger = new ValkeyClient({ + host: "localhost", + port: 6379, + logger: { error: loggerError }, + }); + + const invalidTask = { taskId: "task-123" }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); + + await expect(clientWithLogger.getTaskState("task-123")).rejects.toThrow( + ValkeyValidationError + ); + expect(loggerError).toHaveBeenCalled(); + }); + + it("should reject invalid data in listTasks", async () => { + mockRedis.scan.mockResolvedValue(["0", ["orchestrator:task:task-1"]]); + mockRedis.get.mockResolvedValue(JSON.stringify({ taskId: "task-1" })); // Invalid + + await expect(client.listTasks()).rejects.toThrow(ValkeyValidationError); + }); + }); + + describe("Agent State Validation", () => { + const validAgentState: AgentState = { + agentId: "agent-456", + status: "spawning", + taskId: "task-123", + }; + + it("should accept valid agent state data", async () => { + mockRedis.get.mockResolvedValue(JSON.stringify(validAgentState)); + + const result = await client.getAgentState("agent-456"); + + expect(result).toEqual(validAgentState); + }); + + it("should reject agent with missing required fields", async () => { + const invalidAgent = { agentId: "agent-456" }; // Missing status, taskId + mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); + + await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError); + }); + + it("should reject agent with invalid status value", async () => { + const invalidAgent = { + ...validAgentState, + status: "not-a-status", // Not a valid AgentStatus + }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); + + await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError); + }); + + it("should reject corrupted JSON data for agent", async () => { + mockRedis.get.mockResolvedValue("corrupted data <<<"); + + await expect(client.getAgentState("agent-456")).rejects.toThrow(); + }); + + it("should include key name in agent validation error", async () => { + const invalidAgent = { agentId: "agent-456" }; + mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); + + try { + await client.getAgentState("agent-456"); + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(ValkeyValidationError); + expect((error as ValkeyValidationError).key).toBe("orchestrator:agent:agent-456"); + } + }); + + it("should reject invalid data in listAgents", async () => { + mockRedis.scan.mockResolvedValue(["0", ["orchestrator:agent:agent-1"]]); + mockRedis.get.mockResolvedValue(JSON.stringify({ agentId: "agent-1" })); // Invalid + + await expect(client.listAgents()).rejects.toThrow(ValkeyValidationError); + }); + }); + + describe("Event Validation", () => { + it("should accept valid agent event", async () => { + mockRedis.subscribe.mockResolvedValue(1); + let messageHandler: ((channel: string, message: string) => void) | undefined; + + mockRedis.on.mockImplementation( + (event: string, handler: (channel: string, message: string) => void) => { + if (event === "message") { + messageHandler = handler; + } + return mockRedis; + } + ); + + const handler = vi.fn(); + await client.subscribeToEvents(handler); + + const validEvent: OrchestratorEvent = { + type: "agent.spawned", + agentId: "agent-1", + taskId: "task-1", + timestamp: "2026-02-02T10:00:00Z", + }; + + if (messageHandler) { + messageHandler("orchestrator:events", JSON.stringify(validEvent)); + } + + expect(handler).toHaveBeenCalledWith(validEvent); + }); + + it("should reject event with invalid type", async () => { + mockRedis.subscribe.mockResolvedValue(1); + let messageHandler: ((channel: string, message: string) => void) | undefined; + + mockRedis.on.mockImplementation( + (event: string, handler: (channel: string, message: string) => void) => { + if (event === "message") { + messageHandler = handler; + } + return mockRedis; + } + ); + + const handler = vi.fn(); + const errorHandler = vi.fn(); + await client.subscribeToEvents(handler, errorHandler); + + const invalidEvent = { + type: "invalid.event.type", + agentId: "agent-1", + taskId: "task-1", + timestamp: "2026-02-02T10:00:00Z", + }; + + if (messageHandler) { + messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); + } + + expect(handler).not.toHaveBeenCalled(); + expect(errorHandler).toHaveBeenCalled(); + }); + + it("should reject event with missing required fields", async () => { + mockRedis.subscribe.mockResolvedValue(1); + let messageHandler: ((channel: string, message: string) => void) | undefined; + + mockRedis.on.mockImplementation( + (event: string, handler: (channel: string, message: string) => void) => { + if (event === "message") { + messageHandler = handler; + } + return mockRedis; + } + ); + + const handler = vi.fn(); + const errorHandler = vi.fn(); + await client.subscribeToEvents(handler, errorHandler); + + const invalidEvent = { + type: "agent.spawned", + // Missing agentId, taskId, timestamp + }; + + if (messageHandler) { + messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); + } + + expect(handler).not.toHaveBeenCalled(); + expect(errorHandler).toHaveBeenCalled(); + }); + + it("should log validation errors for events with logger", async () => { + mockRedis.subscribe.mockResolvedValue(1); + let messageHandler: ((channel: string, message: string) => void) | undefined; + + mockRedis.on.mockImplementation( + (event: string, handler: (channel: string, message: string) => void) => { + if (event === "message") { + messageHandler = handler; + } + return mockRedis; + } + ); + + const loggerError = vi.fn(); + const clientWithLogger = new ValkeyClient({ + host: "localhost", + port: 6379, + logger: { error: loggerError }, + }); + mockRedis.duplicate.mockReturnValue(mockRedis); + + await clientWithLogger.subscribeToEvents(vi.fn()); + + const invalidEvent = { type: "invalid.type" }; + + if (messageHandler) { + messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); + } + + expect(loggerError).toHaveBeenCalled(); + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining("Failed to validate event"), + expect.any(Error) + ); + }); + }); + }); }); diff --git a/apps/orchestrator/src/valkey/valkey.client.ts b/apps/orchestrator/src/valkey/valkey.client.ts index 5d9aeb2..81164b0 100644 --- a/apps/orchestrator/src/valkey/valkey.client.ts +++ b/apps/orchestrator/src/valkey/valkey.client.ts @@ -1,4 +1,5 @@ import Redis from "ioredis"; +import { ZodError } from "zod"; import type { TaskState, AgentState, @@ -8,6 +9,7 @@ import type { EventHandler, } from "./types"; import { isValidTaskTransition, isValidAgentTransition } from "./types"; +import { TaskStateSchema, AgentStateSchema, OrchestratorEventSchema } from "./schemas"; export interface ValkeyClientConfig { host: string; @@ -24,6 +26,21 @@ export interface ValkeyClientConfig { */ export type EventErrorHandler = (error: Error, rawMessage: string, channel: string) => void; +/** + * Error thrown when Redis data fails validation + */ +export class ValkeyValidationError extends Error { + constructor( + message: string, + public readonly key: string, + public readonly dataSnippet: string, + public readonly validationError: ZodError + ) { + super(message); + this.name = "ValkeyValidationError"; + } +} + /** * Valkey client for state management and pub/sub */ @@ -66,7 +83,7 @@ export class ValkeyClient { return null; } - return JSON.parse(data) as TaskState; + return this.parseAndValidateTaskState(key, data); } async setTaskState(state: TaskState): Promise { @@ -119,7 +136,8 @@ export class ValkeyClient { for (const key of keys) { const data = await this.client.get(key); if (data) { - tasks.push(JSON.parse(data) as TaskState); + const task = this.parseAndValidateTaskState(key, data); + tasks.push(task); } } @@ -138,7 +156,7 @@ export class ValkeyClient { return null; } - return JSON.parse(data) as AgentState; + return this.parseAndValidateAgentState(key, data); } async setAgentState(state: AgentState): Promise { @@ -190,7 +208,8 @@ export class ValkeyClient { for (const key of keys) { const data = await this.client.get(key); if (data) { - agents.push(JSON.parse(data) as AgentState); + const agent = this.parseAndValidateAgentState(key, data); + agents.push(agent); } } @@ -211,17 +230,26 @@ export class ValkeyClient { this.subscriber.on("message", (channel: string, message: string) => { try { - const event = JSON.parse(message) as OrchestratorEvent; + const parsed: unknown = JSON.parse(message); + const event = OrchestratorEventSchema.parse(parsed); void handler(event); } catch (error) { const errorObj = error instanceof Error ? error : new Error(String(error)); - // Log the error + // Log the error with context if (this.logger) { - this.logger.error( - `Failed to parse event from channel ${channel}: ${errorObj.message}`, - errorObj - ); + const snippet = message.length > 100 ? `${message.substring(0, 100)}...` : message; + if (error instanceof ZodError) { + this.logger.error( + `Failed to validate event from channel ${channel}: ${errorObj.message} (data: ${snippet})`, + errorObj + ); + } else { + this.logger.error( + `Failed to parse event from channel ${channel}: ${errorObj.message}`, + errorObj + ); + } } // Invoke error handler if provided @@ -262,4 +290,56 @@ export class ValkeyClient { private getAgentKey(agentId: string): string { return `orchestrator:agent:${agentId}`; } + + /** + * Parse and validate task state data from Redis + * @throws ValkeyValidationError if data is invalid + */ + private parseAndValidateTaskState(key: string, data: string): TaskState { + try { + const parsed: unknown = JSON.parse(data); + return TaskStateSchema.parse(parsed); + } catch (error) { + if (error instanceof ZodError) { + const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data; + const validationError = new ValkeyValidationError( + `Invalid task state data at key ${key}: ${error.message}`, + key, + snippet, + error + ); + if (this.logger) { + this.logger.error(validationError.message, validationError); + } + throw validationError; + } + throw error; + } + } + + /** + * Parse and validate agent state data from Redis + * @throws ValkeyValidationError if data is invalid + */ + private parseAndValidateAgentState(key: string, data: string): AgentState { + try { + const parsed: unknown = JSON.parse(data); + return AgentStateSchema.parse(parsed); + } catch (error) { + if (error instanceof ZodError) { + const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data; + const validationError = new ValkeyValidationError( + `Invalid agent state data at key ${key}: ${error.message}`, + key, + snippet, + error + ); + if (this.logger) { + this.logger.error(validationError.message, validationError); + } + throw validationError; + } + throw error; + } + } }