fix(#337): Add Zod validation for Redis deserialization
- Created Zod schemas for TaskState, AgentState, and OrchestratorEvent - Added ValkeyValidationError class for detailed error context - Validate task and agent state data after JSON.parse - Validate events in subscribeToEvents handler - Corrupted/tampered data now rejected with clear errors including: - Key name for context - Data snippet (truncated to 100 chars) - Underlying Zod validation error - Prevents silent propagation of invalid data (SEC-ORCH-6) - Added 20 new tests for validation scenarios Refs #337 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
5
apps/orchestrator/src/valkey/schemas/index.ts
Normal file
5
apps/orchestrator/src/valkey/schemas/index.ts
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
/**
|
||||||
|
* Valkey schema exports
|
||||||
|
*/
|
||||||
|
|
||||||
|
export * from "./state.schemas";
|
||||||
123
apps/orchestrator/src/valkey/schemas/state.schemas.ts
Normal file
123
apps/orchestrator/src/valkey/schemas/state.schemas.ts
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
/**
|
||||||
|
* Zod schemas for runtime validation of deserialized Redis data
|
||||||
|
*
|
||||||
|
* These schemas validate data after JSON.parse() to prevent
|
||||||
|
* corrupted or tampered data from propagating silently.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { z } from "zod";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task status enum schema
|
||||||
|
*/
|
||||||
|
export const TaskStatusSchema = z.enum(["pending", "assigned", "executing", "completed", "failed"]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Agent status enum schema
|
||||||
|
*/
|
||||||
|
export const AgentStatusSchema = z.enum(["spawning", "running", "completed", "failed", "killed"]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task context schema
|
||||||
|
*/
|
||||||
|
export const TaskContextSchema = z.object({
|
||||||
|
repository: z.string(),
|
||||||
|
branch: z.string(),
|
||||||
|
workItems: z.array(z.string()),
|
||||||
|
skills: z.array(z.string()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task state schema - validates deserialized task data from Redis
|
||||||
|
*/
|
||||||
|
export const TaskStateSchema = z.object({
|
||||||
|
taskId: z.string(),
|
||||||
|
status: TaskStatusSchema,
|
||||||
|
agentId: z.string().optional(),
|
||||||
|
context: TaskContextSchema,
|
||||||
|
createdAt: z.string(),
|
||||||
|
updatedAt: z.string(),
|
||||||
|
metadata: z.record(z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Agent state schema - validates deserialized agent data from Redis
|
||||||
|
*/
|
||||||
|
export const AgentStateSchema = z.object({
|
||||||
|
agentId: z.string(),
|
||||||
|
status: AgentStatusSchema,
|
||||||
|
taskId: z.string(),
|
||||||
|
startedAt: z.string().optional(),
|
||||||
|
completedAt: z.string().optional(),
|
||||||
|
error: z.string().optional(),
|
||||||
|
metadata: z.record(z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event type enum schema
|
||||||
|
*/
|
||||||
|
export const EventTypeSchema = z.enum([
|
||||||
|
"agent.spawned",
|
||||||
|
"agent.running",
|
||||||
|
"agent.completed",
|
||||||
|
"agent.failed",
|
||||||
|
"agent.killed",
|
||||||
|
"agent.cleanup",
|
||||||
|
"task.assigned",
|
||||||
|
"task.queued",
|
||||||
|
"task.processing",
|
||||||
|
"task.retry",
|
||||||
|
"task.executing",
|
||||||
|
"task.completed",
|
||||||
|
"task.failed",
|
||||||
|
]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Agent event schema
|
||||||
|
*/
|
||||||
|
export const AgentEventSchema = z.object({
|
||||||
|
type: z.enum([
|
||||||
|
"agent.spawned",
|
||||||
|
"agent.running",
|
||||||
|
"agent.completed",
|
||||||
|
"agent.failed",
|
||||||
|
"agent.killed",
|
||||||
|
"agent.cleanup",
|
||||||
|
]),
|
||||||
|
timestamp: z.string(),
|
||||||
|
agentId: z.string(),
|
||||||
|
taskId: z.string(),
|
||||||
|
error: z.string().optional(),
|
||||||
|
cleanup: z
|
||||||
|
.object({
|
||||||
|
docker: z.boolean(),
|
||||||
|
worktree: z.boolean(),
|
||||||
|
state: z.boolean(),
|
||||||
|
})
|
||||||
|
.optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Task event schema
|
||||||
|
*/
|
||||||
|
export const TaskEventSchema = z.object({
|
||||||
|
type: z.enum([
|
||||||
|
"task.assigned",
|
||||||
|
"task.queued",
|
||||||
|
"task.processing",
|
||||||
|
"task.retry",
|
||||||
|
"task.executing",
|
||||||
|
"task.completed",
|
||||||
|
"task.failed",
|
||||||
|
]),
|
||||||
|
timestamp: z.string(),
|
||||||
|
taskId: z.string().optional(),
|
||||||
|
agentId: z.string().optional(),
|
||||||
|
error: z.string().optional(),
|
||||||
|
data: z.record(z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combined orchestrator event schema (discriminated union)
|
||||||
|
*/
|
||||||
|
export const OrchestratorEventSchema = z.union([AgentEventSchema, TaskEventSchema]);
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
|
import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
|
||||||
import { ValkeyClient } from "./valkey.client";
|
import { ValkeyClient, ValkeyValidationError } from "./valkey.client";
|
||||||
import type { TaskState, AgentState, OrchestratorEvent } from "./types";
|
import type { TaskState, AgentState, OrchestratorEvent } from "./types";
|
||||||
|
|
||||||
// Create a shared mock instance that will be used across all tests
|
// Create a shared mock instance that will be used across all tests
|
||||||
@@ -479,13 +479,18 @@ describe("ValkeyClient", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("should filter out null values in listTasks", async () => {
|
it("should filter out null values in listTasks", async () => {
|
||||||
|
const validTask = {
|
||||||
|
taskId: "task-1",
|
||||||
|
status: "pending",
|
||||||
|
context: { repository: "repo", branch: "main", workItems: ["item-1"] },
|
||||||
|
createdAt: "2026-02-02T10:00:00Z",
|
||||||
|
updatedAt: "2026-02-02T10:00:00Z",
|
||||||
|
};
|
||||||
mockRedis.scan.mockResolvedValue([
|
mockRedis.scan.mockResolvedValue([
|
||||||
"0",
|
"0",
|
||||||
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get
|
mockRedis.get.mockResolvedValueOnce(JSON.stringify(validTask)).mockResolvedValueOnce(null); // Simulate deleted task
|
||||||
.mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" }))
|
|
||||||
.mockResolvedValueOnce(null); // Simulate deleted task
|
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
@@ -494,13 +499,16 @@ describe("ValkeyClient", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("should filter out null values in listAgents", async () => {
|
it("should filter out null values in listAgents", async () => {
|
||||||
|
const validAgent = {
|
||||||
|
agentId: "agent-1",
|
||||||
|
status: "running",
|
||||||
|
taskId: "task-1",
|
||||||
|
};
|
||||||
mockRedis.scan.mockResolvedValue([
|
mockRedis.scan.mockResolvedValue([
|
||||||
"0",
|
"0",
|
||||||
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get
|
mockRedis.get.mockResolvedValueOnce(JSON.stringify(validAgent)).mockResolvedValueOnce(null); // Simulate deleted agent
|
||||||
.mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" }))
|
|
||||||
.mockResolvedValueOnce(null); // Simulate deleted agent
|
|
||||||
|
|
||||||
const result = await client.listAgents();
|
const result = await client.listAgents();
|
||||||
|
|
||||||
@@ -510,6 +518,20 @@ describe("ValkeyClient", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("SCAN-based iteration (large key sets)", () => {
|
describe("SCAN-based iteration (large key sets)", () => {
|
||||||
|
const makeValidTask = (taskId: string): object => ({
|
||||||
|
taskId,
|
||||||
|
status: "pending",
|
||||||
|
context: { repository: "repo", branch: "main", workItems: ["item-1"] },
|
||||||
|
createdAt: "2026-02-02T10:00:00Z",
|
||||||
|
updatedAt: "2026-02-02T10:00:00Z",
|
||||||
|
});
|
||||||
|
|
||||||
|
const makeValidAgent = (agentId: string): object => ({
|
||||||
|
agentId,
|
||||||
|
status: "running",
|
||||||
|
taskId: "task-1",
|
||||||
|
});
|
||||||
|
|
||||||
it("should handle multiple SCAN iterations for tasks", async () => {
|
it("should handle multiple SCAN iterations for tasks", async () => {
|
||||||
// Simulate SCAN returning multiple batches with cursor pagination
|
// Simulate SCAN returning multiple batches with cursor pagination
|
||||||
mockRedis.scan
|
mockRedis.scan
|
||||||
@@ -517,9 +539,9 @@ describe("ValkeyClient", () => {
|
|||||||
.mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done
|
.mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done
|
||||||
|
|
||||||
mockRedis.get
|
mockRedis.get
|
||||||
.mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" }))
|
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-1")))
|
||||||
.mockResolvedValueOnce(JSON.stringify({ taskId: "task-2", status: "pending" }))
|
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-2")))
|
||||||
.mockResolvedValueOnce(JSON.stringify({ taskId: "task-3", status: "pending" }));
|
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-3")));
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
@@ -552,10 +574,10 @@ describe("ValkeyClient", () => {
|
|||||||
.mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done
|
.mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done
|
||||||
|
|
||||||
mockRedis.get
|
mockRedis.get
|
||||||
.mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" }))
|
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-1")))
|
||||||
.mockResolvedValueOnce(JSON.stringify({ agentId: "agent-2", status: "running" }))
|
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-2")))
|
||||||
.mockResolvedValueOnce(JSON.stringify({ agentId: "agent-3", status: "running" }))
|
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-3")))
|
||||||
.mockResolvedValueOnce(JSON.stringify({ agentId: "agent-4", status: "running" }));
|
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-4")));
|
||||||
|
|
||||||
const result = await client.listAgents();
|
const result = await client.listAgents();
|
||||||
|
|
||||||
@@ -573,4 +595,301 @@ describe("ValkeyClient", () => {
|
|||||||
expect(result).toHaveLength(0);
|
expect(result).toHaveLength(0);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Zod Validation (SEC-ORCH-6)", () => {
|
||||||
|
describe("Task State Validation", () => {
|
||||||
|
const validTaskState: TaskState = {
|
||||||
|
taskId: "task-123",
|
||||||
|
status: "pending",
|
||||||
|
context: {
|
||||||
|
repository: "https://github.com/example/repo",
|
||||||
|
branch: "main",
|
||||||
|
workItems: ["item-1"],
|
||||||
|
},
|
||||||
|
createdAt: "2026-02-02T10:00:00Z",
|
||||||
|
updatedAt: "2026-02-02T10:00:00Z",
|
||||||
|
};
|
||||||
|
|
||||||
|
it("should accept valid task state data", async () => {
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(validTaskState));
|
||||||
|
|
||||||
|
const result = await client.getTaskState("task-123");
|
||||||
|
|
||||||
|
expect(result).toEqual(validTaskState);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject task with missing required fields", async () => {
|
||||||
|
const invalidTask = { taskId: "task-123" }; // Missing status, context, etc.
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject task with invalid status value", async () => {
|
||||||
|
const invalidTask = {
|
||||||
|
...validTaskState,
|
||||||
|
status: "invalid-status", // Not a valid TaskStatus
|
||||||
|
};
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject task with missing context fields", async () => {
|
||||||
|
const invalidTask = {
|
||||||
|
...validTaskState,
|
||||||
|
context: { repository: "repo" }, // Missing branch and workItems
|
||||||
|
};
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject corrupted JSON data for task", async () => {
|
||||||
|
mockRedis.get.mockResolvedValue("not valid json {{{");
|
||||||
|
|
||||||
|
await expect(client.getTaskState("task-123")).rejects.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should include key name in validation error", async () => {
|
||||||
|
const invalidTask = { taskId: "task-123" };
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.getTaskState("task-123");
|
||||||
|
expect.fail("Should have thrown");
|
||||||
|
} catch (error) {
|
||||||
|
expect(error).toBeInstanceOf(ValkeyValidationError);
|
||||||
|
expect((error as ValkeyValidationError).key).toBe("orchestrator:task:task-123");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should include data snippet in validation error", async () => {
|
||||||
|
const invalidTask = { taskId: "task-123", invalidField: "x".repeat(200) };
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.getTaskState("task-123");
|
||||||
|
expect.fail("Should have thrown");
|
||||||
|
} catch (error) {
|
||||||
|
expect(error).toBeInstanceOf(ValkeyValidationError);
|
||||||
|
const valError = error as ValkeyValidationError;
|
||||||
|
expect(valError.dataSnippet.length).toBeLessThanOrEqual(103); // 100 chars + "..."
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should log validation errors with logger", async () => {
|
||||||
|
const loggerError = vi.fn();
|
||||||
|
const clientWithLogger = new ValkeyClient({
|
||||||
|
host: "localhost",
|
||||||
|
port: 6379,
|
||||||
|
logger: { error: loggerError },
|
||||||
|
});
|
||||||
|
|
||||||
|
const invalidTask = { taskId: "task-123" };
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask));
|
||||||
|
|
||||||
|
await expect(clientWithLogger.getTaskState("task-123")).rejects.toThrow(
|
||||||
|
ValkeyValidationError
|
||||||
|
);
|
||||||
|
expect(loggerError).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject invalid data in listTasks", async () => {
|
||||||
|
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:task:task-1"]]);
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify({ taskId: "task-1" })); // Invalid
|
||||||
|
|
||||||
|
await expect(client.listTasks()).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Agent State Validation", () => {
|
||||||
|
const validAgentState: AgentState = {
|
||||||
|
agentId: "agent-456",
|
||||||
|
status: "spawning",
|
||||||
|
taskId: "task-123",
|
||||||
|
};
|
||||||
|
|
||||||
|
it("should accept valid agent state data", async () => {
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(validAgentState));
|
||||||
|
|
||||||
|
const result = await client.getAgentState("agent-456");
|
||||||
|
|
||||||
|
expect(result).toEqual(validAgentState);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject agent with missing required fields", async () => {
|
||||||
|
const invalidAgent = { agentId: "agent-456" }; // Missing status, taskId
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent));
|
||||||
|
|
||||||
|
await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject agent with invalid status value", async () => {
|
||||||
|
const invalidAgent = {
|
||||||
|
...validAgentState,
|
||||||
|
status: "not-a-status", // Not a valid AgentStatus
|
||||||
|
};
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent));
|
||||||
|
|
||||||
|
await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject corrupted JSON data for agent", async () => {
|
||||||
|
mockRedis.get.mockResolvedValue("corrupted data <<<");
|
||||||
|
|
||||||
|
await expect(client.getAgentState("agent-456")).rejects.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should include key name in agent validation error", async () => {
|
||||||
|
const invalidAgent = { agentId: "agent-456" };
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.getAgentState("agent-456");
|
||||||
|
expect.fail("Should have thrown");
|
||||||
|
} catch (error) {
|
||||||
|
expect(error).toBeInstanceOf(ValkeyValidationError);
|
||||||
|
expect((error as ValkeyValidationError).key).toBe("orchestrator:agent:agent-456");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject invalid data in listAgents", async () => {
|
||||||
|
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:agent:agent-1"]]);
|
||||||
|
mockRedis.get.mockResolvedValue(JSON.stringify({ agentId: "agent-1" })); // Invalid
|
||||||
|
|
||||||
|
await expect(client.listAgents()).rejects.toThrow(ValkeyValidationError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Event Validation", () => {
|
||||||
|
it("should accept valid agent event", async () => {
|
||||||
|
mockRedis.subscribe.mockResolvedValue(1);
|
||||||
|
let messageHandler: ((channel: string, message: string) => void) | undefined;
|
||||||
|
|
||||||
|
mockRedis.on.mockImplementation(
|
||||||
|
(event: string, handler: (channel: string, message: string) => void) => {
|
||||||
|
if (event === "message") {
|
||||||
|
messageHandler = handler;
|
||||||
|
}
|
||||||
|
return mockRedis;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const handler = vi.fn();
|
||||||
|
await client.subscribeToEvents(handler);
|
||||||
|
|
||||||
|
const validEvent: OrchestratorEvent = {
|
||||||
|
type: "agent.spawned",
|
||||||
|
agentId: "agent-1",
|
||||||
|
taskId: "task-1",
|
||||||
|
timestamp: "2026-02-02T10:00:00Z",
|
||||||
|
};
|
||||||
|
|
||||||
|
if (messageHandler) {
|
||||||
|
messageHandler("orchestrator:events", JSON.stringify(validEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(handler).toHaveBeenCalledWith(validEvent);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject event with invalid type", async () => {
|
||||||
|
mockRedis.subscribe.mockResolvedValue(1);
|
||||||
|
let messageHandler: ((channel: string, message: string) => void) | undefined;
|
||||||
|
|
||||||
|
mockRedis.on.mockImplementation(
|
||||||
|
(event: string, handler: (channel: string, message: string) => void) => {
|
||||||
|
if (event === "message") {
|
||||||
|
messageHandler = handler;
|
||||||
|
}
|
||||||
|
return mockRedis;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const handler = vi.fn();
|
||||||
|
const errorHandler = vi.fn();
|
||||||
|
await client.subscribeToEvents(handler, errorHandler);
|
||||||
|
|
||||||
|
const invalidEvent = {
|
||||||
|
type: "invalid.event.type",
|
||||||
|
agentId: "agent-1",
|
||||||
|
taskId: "task-1",
|
||||||
|
timestamp: "2026-02-02T10:00:00Z",
|
||||||
|
};
|
||||||
|
|
||||||
|
if (messageHandler) {
|
||||||
|
messageHandler("orchestrator:events", JSON.stringify(invalidEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(handler).not.toHaveBeenCalled();
|
||||||
|
expect(errorHandler).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should reject event with missing required fields", async () => {
|
||||||
|
mockRedis.subscribe.mockResolvedValue(1);
|
||||||
|
let messageHandler: ((channel: string, message: string) => void) | undefined;
|
||||||
|
|
||||||
|
mockRedis.on.mockImplementation(
|
||||||
|
(event: string, handler: (channel: string, message: string) => void) => {
|
||||||
|
if (event === "message") {
|
||||||
|
messageHandler = handler;
|
||||||
|
}
|
||||||
|
return mockRedis;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const handler = vi.fn();
|
||||||
|
const errorHandler = vi.fn();
|
||||||
|
await client.subscribeToEvents(handler, errorHandler);
|
||||||
|
|
||||||
|
const invalidEvent = {
|
||||||
|
type: "agent.spawned",
|
||||||
|
// Missing agentId, taskId, timestamp
|
||||||
|
};
|
||||||
|
|
||||||
|
if (messageHandler) {
|
||||||
|
messageHandler("orchestrator:events", JSON.stringify(invalidEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(handler).not.toHaveBeenCalled();
|
||||||
|
expect(errorHandler).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should log validation errors for events with logger", async () => {
|
||||||
|
mockRedis.subscribe.mockResolvedValue(1);
|
||||||
|
let messageHandler: ((channel: string, message: string) => void) | undefined;
|
||||||
|
|
||||||
|
mockRedis.on.mockImplementation(
|
||||||
|
(event: string, handler: (channel: string, message: string) => void) => {
|
||||||
|
if (event === "message") {
|
||||||
|
messageHandler = handler;
|
||||||
|
}
|
||||||
|
return mockRedis;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const loggerError = vi.fn();
|
||||||
|
const clientWithLogger = new ValkeyClient({
|
||||||
|
host: "localhost",
|
||||||
|
port: 6379,
|
||||||
|
logger: { error: loggerError },
|
||||||
|
});
|
||||||
|
mockRedis.duplicate.mockReturnValue(mockRedis);
|
||||||
|
|
||||||
|
await clientWithLogger.subscribeToEvents(vi.fn());
|
||||||
|
|
||||||
|
const invalidEvent = { type: "invalid.type" };
|
||||||
|
|
||||||
|
if (messageHandler) {
|
||||||
|
messageHandler("orchestrator:events", JSON.stringify(invalidEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(loggerError).toHaveBeenCalled();
|
||||||
|
expect(loggerError).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Failed to validate event"),
|
||||||
|
expect.any(Error)
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
|
import { ZodError } from "zod";
|
||||||
import type {
|
import type {
|
||||||
TaskState,
|
TaskState,
|
||||||
AgentState,
|
AgentState,
|
||||||
@@ -8,6 +9,7 @@ import type {
|
|||||||
EventHandler,
|
EventHandler,
|
||||||
} from "./types";
|
} from "./types";
|
||||||
import { isValidTaskTransition, isValidAgentTransition } from "./types";
|
import { isValidTaskTransition, isValidAgentTransition } from "./types";
|
||||||
|
import { TaskStateSchema, AgentStateSchema, OrchestratorEventSchema } from "./schemas";
|
||||||
|
|
||||||
export interface ValkeyClientConfig {
|
export interface ValkeyClientConfig {
|
||||||
host: string;
|
host: string;
|
||||||
@@ -24,6 +26,21 @@ export interface ValkeyClientConfig {
|
|||||||
*/
|
*/
|
||||||
export type EventErrorHandler = (error: Error, rawMessage: string, channel: string) => void;
|
export type EventErrorHandler = (error: Error, rawMessage: string, channel: string) => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Error thrown when Redis data fails validation
|
||||||
|
*/
|
||||||
|
export class ValkeyValidationError extends Error {
|
||||||
|
constructor(
|
||||||
|
message: string,
|
||||||
|
public readonly key: string,
|
||||||
|
public readonly dataSnippet: string,
|
||||||
|
public readonly validationError: ZodError
|
||||||
|
) {
|
||||||
|
super(message);
|
||||||
|
this.name = "ValkeyValidationError";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Valkey client for state management and pub/sub
|
* Valkey client for state management and pub/sub
|
||||||
*/
|
*/
|
||||||
@@ -66,7 +83,7 @@ export class ValkeyClient {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return JSON.parse(data) as TaskState;
|
return this.parseAndValidateTaskState(key, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async setTaskState(state: TaskState): Promise<void> {
|
async setTaskState(state: TaskState): Promise<void> {
|
||||||
@@ -119,7 +136,8 @@ export class ValkeyClient {
|
|||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
const data = await this.client.get(key);
|
const data = await this.client.get(key);
|
||||||
if (data) {
|
if (data) {
|
||||||
tasks.push(JSON.parse(data) as TaskState);
|
const task = this.parseAndValidateTaskState(key, data);
|
||||||
|
tasks.push(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +156,7 @@ export class ValkeyClient {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return JSON.parse(data) as AgentState;
|
return this.parseAndValidateAgentState(key, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async setAgentState(state: AgentState): Promise<void> {
|
async setAgentState(state: AgentState): Promise<void> {
|
||||||
@@ -190,7 +208,8 @@ export class ValkeyClient {
|
|||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
const data = await this.client.get(key);
|
const data = await this.client.get(key);
|
||||||
if (data) {
|
if (data) {
|
||||||
agents.push(JSON.parse(data) as AgentState);
|
const agent = this.parseAndValidateAgentState(key, data);
|
||||||
|
agents.push(agent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -211,17 +230,26 @@ export class ValkeyClient {
|
|||||||
|
|
||||||
this.subscriber.on("message", (channel: string, message: string) => {
|
this.subscriber.on("message", (channel: string, message: string) => {
|
||||||
try {
|
try {
|
||||||
const event = JSON.parse(message) as OrchestratorEvent;
|
const parsed: unknown = JSON.parse(message);
|
||||||
|
const event = OrchestratorEventSchema.parse(parsed);
|
||||||
void handler(event);
|
void handler(event);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorObj = error instanceof Error ? error : new Error(String(error));
|
const errorObj = error instanceof Error ? error : new Error(String(error));
|
||||||
|
|
||||||
// Log the error
|
// Log the error with context
|
||||||
if (this.logger) {
|
if (this.logger) {
|
||||||
this.logger.error(
|
const snippet = message.length > 100 ? `${message.substring(0, 100)}...` : message;
|
||||||
`Failed to parse event from channel ${channel}: ${errorObj.message}`,
|
if (error instanceof ZodError) {
|
||||||
errorObj
|
this.logger.error(
|
||||||
);
|
`Failed to validate event from channel ${channel}: ${errorObj.message} (data: ${snippet})`,
|
||||||
|
errorObj
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to parse event from channel ${channel}: ${errorObj.message}`,
|
||||||
|
errorObj
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invoke error handler if provided
|
// Invoke error handler if provided
|
||||||
@@ -262,4 +290,56 @@ export class ValkeyClient {
|
|||||||
private getAgentKey(agentId: string): string {
|
private getAgentKey(agentId: string): string {
|
||||||
return `orchestrator:agent:${agentId}`;
|
return `orchestrator:agent:${agentId}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse and validate task state data from Redis
|
||||||
|
* @throws ValkeyValidationError if data is invalid
|
||||||
|
*/
|
||||||
|
private parseAndValidateTaskState(key: string, data: string): TaskState {
|
||||||
|
try {
|
||||||
|
const parsed: unknown = JSON.parse(data);
|
||||||
|
return TaskStateSchema.parse(parsed);
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof ZodError) {
|
||||||
|
const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data;
|
||||||
|
const validationError = new ValkeyValidationError(
|
||||||
|
`Invalid task state data at key ${key}: ${error.message}`,
|
||||||
|
key,
|
||||||
|
snippet,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
if (this.logger) {
|
||||||
|
this.logger.error(validationError.message, validationError);
|
||||||
|
}
|
||||||
|
throw validationError;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse and validate agent state data from Redis
|
||||||
|
* @throws ValkeyValidationError if data is invalid
|
||||||
|
*/
|
||||||
|
private parseAndValidateAgentState(key: string, data: string): AgentState {
|
||||||
|
try {
|
||||||
|
const parsed: unknown = JSON.parse(data);
|
||||||
|
return AgentStateSchema.parse(parsed);
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof ZodError) {
|
||||||
|
const snippet = data.length > 100 ? `${data.substring(0, 100)}...` : data;
|
||||||
|
const validationError = new ValkeyValidationError(
|
||||||
|
`Invalid agent state data at key ${key}: ${error.message}`,
|
||||||
|
key,
|
||||||
|
snippet,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
if (this.logger) {
|
||||||
|
this.logger.error(validationError.message, validationError);
|
||||||
|
}
|
||||||
|
throw validationError;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user