import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; import { ValkeyClient, ValkeyValidationError } from "./valkey.client"; import type { TaskState, AgentState, OrchestratorEvent } from "./types"; // Create a shared mock instance that will be used across all tests const mockRedisInstance = { get: vi.fn(), set: vi.fn(), del: vi.fn(), publish: vi.fn(), subscribe: vi.fn(), on: vi.fn(), quit: vi.fn(), duplicate: vi.fn(), scan: vi.fn(), mget: vi.fn(), }; // Capture constructor arguments for verification let lastRedisConstructorArgs: unknown[] = []; // Mock ioredis vi.mock("ioredis", () => { return { default: class { constructor(...args: unknown[]) { lastRedisConstructorArgs = args; return mockRedisInstance; } }, }; }); describe("ValkeyClient", () => { let client: ValkeyClient; let mockRedis: typeof mockRedisInstance; beforeEach(() => { // Reset all mocks vi.clearAllMocks(); // Create client instance client = new ValkeyClient({ host: "localhost", port: 6379, }); // Reference the mock instance mockRedis = mockRedisInstance; // Mock duplicate to return another mock client mockRedis.duplicate.mockReturnValue(mockRedis); }); afterEach(() => { vi.clearAllMocks(); }); describe("Connection Management", () => { it("should pass default timeout options to Redis when not configured", () => { new ValkeyClient({ host: "localhost", port: 6379 }); const options = lastRedisConstructorArgs[0] as Record; expect(options.connectTimeout).toBe(5000); expect(options.commandTimeout).toBe(3000); }); it("should pass custom timeout options to Redis when configured", () => { new ValkeyClient({ host: "localhost", port: 6379, connectTimeout: 10000, commandTimeout: 8000, }); const options = lastRedisConstructorArgs[0] as Record; expect(options.connectTimeout).toBe(10000); expect(options.commandTimeout).toBe(8000); }); it("should disconnect on close", async () => { mockRedis.quit.mockResolvedValue("OK"); await client.disconnect(); expect(mockRedis.quit).toHaveBeenCalled(); }); it("should disconnect subscriber if it exists", async () => { mockRedis.quit.mockResolvedValue("OK"); mockRedis.subscribe.mockResolvedValue(1); // Create subscriber await client.subscribeToEvents(vi.fn()); await client.disconnect(); // Should call quit twice (main client and subscriber) expect(mockRedis.quit).toHaveBeenCalledTimes(2); }); }); describe("Task State Management", () => { const mockTaskState: TaskState = { taskId: "task-123", status: "pending", context: { repository: "https://github.com/example/repo", branch: "main", workItems: ["item-1"], }, createdAt: "2026-02-02T10:00:00Z", updatedAt: "2026-02-02T10:00:00Z", }; it("should get task state", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(mockTaskState)); const result = await client.getTaskState("task-123"); expect(mockRedis.get).toHaveBeenCalledWith("orchestrator:task:task-123"); expect(result).toEqual(mockTaskState); }); it("should return null for non-existent task", async () => { mockRedis.get.mockResolvedValue(null); const result = await client.getTaskState("task-999"); expect(result).toBeNull(); }); it("should set task state", async () => { mockRedis.set.mockResolvedValue("OK"); await client.setTaskState(mockTaskState); expect(mockRedis.set).toHaveBeenCalledWith( "orchestrator:task:task-123", JSON.stringify(mockTaskState) ); }); it("should delete task state", async () => { mockRedis.del.mockResolvedValue(1); await client.deleteTaskState("task-123"); expect(mockRedis.del).toHaveBeenCalledWith("orchestrator:task:task-123"); }); it("should update task status", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(mockTaskState)); mockRedis.set.mockResolvedValue("OK"); const result = await client.updateTaskStatus("task-123", "assigned", "agent-456"); expect(mockRedis.get).toHaveBeenCalledWith("orchestrator:task:task-123"); expect(mockRedis.set).toHaveBeenCalled(); expect(result?.status).toBe("assigned"); expect(result?.agentId).toBe("agent-456"); expect(result?.updatedAt).toBeDefined(); }); it("should throw error when updating non-existent task", async () => { mockRedis.get.mockResolvedValue(null); await expect(client.updateTaskStatus("task-999", "assigned")).rejects.toThrow( "Task task-999 not found" ); }); it("should throw error for invalid task status transition", async () => { const completedTask = { ...mockTaskState, status: "completed" as const }; mockRedis.get.mockResolvedValue(JSON.stringify(completedTask)); await expect(client.updateTaskStatus("task-123", "assigned")).rejects.toThrow( "Invalid task state transition from completed to assigned" ); }); it("should list all task states using SCAN and MGET", async () => { // SCAN returns [cursor, keys] - cursor "0" means complete mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:task:task-1", "orchestrator:task:task-2"], ]); // MGET returns values in same order as keys mockRedis.mget.mockResolvedValue([ JSON.stringify({ ...mockTaskState, taskId: "task-1" }), JSON.stringify({ ...mockTaskState, taskId: "task-2" }), ]); const result = await client.listTasks(); expect(mockRedis.scan).toHaveBeenCalledWith( "0", "MATCH", "orchestrator:task:*", "COUNT", 100 ); // Verify MGET is called with all keys (batch retrieval) expect(mockRedis.mget).toHaveBeenCalledWith( "orchestrator:task:task-1", "orchestrator:task:task-2" ); // Verify individual GET is NOT called (N+1 prevention) expect(mockRedis.get).not.toHaveBeenCalled(); expect(result).toHaveLength(2); expect(result[0].taskId).toBe("task-1"); expect(result[1].taskId).toBe("task-2"); }); }); describe("Agent State Management", () => { const mockAgentState: AgentState = { agentId: "agent-456", status: "spawning", taskId: "task-123", }; it("should get agent state", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(mockAgentState)); const result = await client.getAgentState("agent-456"); expect(mockRedis.get).toHaveBeenCalledWith("orchestrator:agent:agent-456"); expect(result).toEqual(mockAgentState); }); it("should return null for non-existent agent", async () => { mockRedis.get.mockResolvedValue(null); const result = await client.getAgentState("agent-999"); expect(result).toBeNull(); }); it("should set agent state", async () => { mockRedis.set.mockResolvedValue("OK"); await client.setAgentState(mockAgentState); expect(mockRedis.set).toHaveBeenCalledWith( "orchestrator:agent:agent-456", JSON.stringify(mockAgentState) ); }); it("should delete agent state", async () => { mockRedis.del.mockResolvedValue(1); await client.deleteAgentState("agent-456"); expect(mockRedis.del).toHaveBeenCalledWith("orchestrator:agent:agent-456"); }); it("should update agent status", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(mockAgentState)); mockRedis.set.mockResolvedValue("OK"); const result = await client.updateAgentStatus("agent-456", "running"); expect(mockRedis.get).toHaveBeenCalledWith("orchestrator:agent:agent-456"); expect(mockRedis.set).toHaveBeenCalled(); expect(result?.status).toBe("running"); expect(result?.startedAt).toBeDefined(); }); it("should set completedAt when status is completed", async () => { const runningAgent = { ...mockAgentState, status: "running" as const }; mockRedis.get.mockResolvedValue(JSON.stringify(runningAgent)); mockRedis.set.mockResolvedValue("OK"); const result = await client.updateAgentStatus("agent-456", "completed"); expect(result?.status).toBe("completed"); expect(result?.completedAt).toBeDefined(); }); it("should throw error when updating non-existent agent", async () => { mockRedis.get.mockResolvedValue(null); await expect(client.updateAgentStatus("agent-999", "running")).rejects.toThrow( "Agent agent-999 not found" ); }); it("should throw error for invalid agent status transition", async () => { const completedAgent = { ...mockAgentState, status: "completed" as const }; mockRedis.get.mockResolvedValue(JSON.stringify(completedAgent)); await expect(client.updateAgentStatus("agent-456", "running")).rejects.toThrow( "Invalid agent state transition from completed to running" ); }); it("should list all agent states using SCAN and MGET", async () => { // SCAN returns [cursor, keys] - cursor "0" means complete mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"], ]); // MGET returns values in same order as keys mockRedis.mget.mockResolvedValue([ JSON.stringify({ ...mockAgentState, agentId: "agent-1" }), JSON.stringify({ ...mockAgentState, agentId: "agent-2" }), ]); const result = await client.listAgents(); expect(mockRedis.scan).toHaveBeenCalledWith( "0", "MATCH", "orchestrator:agent:*", "COUNT", 100 ); // Verify MGET is called with all keys (batch retrieval) expect(mockRedis.mget).toHaveBeenCalledWith( "orchestrator:agent:agent-1", "orchestrator:agent:agent-2" ); // Verify individual GET is NOT called (N+1 prevention) expect(mockRedis.get).not.toHaveBeenCalled(); expect(result).toHaveLength(2); expect(result[0].agentId).toBe("agent-1"); expect(result[1].agentId).toBe("agent-2"); }); }); describe("Event Pub/Sub", () => { const mockEvent: OrchestratorEvent = { type: "agent.spawned", agentId: "agent-456", taskId: "task-123", timestamp: "2026-02-02T10:00:00Z", }; it("should publish events", async () => { mockRedis.publish.mockResolvedValue(1); await client.publishEvent(mockEvent); expect(mockRedis.publish).toHaveBeenCalledWith( "orchestrator:events", JSON.stringify(mockEvent) ); }); it("should subscribe to events", async () => { mockRedis.subscribe.mockResolvedValue(1); const handler = vi.fn(); await client.subscribeToEvents(handler); expect(mockRedis.duplicate).toHaveBeenCalled(); expect(mockRedis.subscribe).toHaveBeenCalledWith("orchestrator:events"); }); it("should call handler when event is received", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); await client.subscribeToEvents(handler); // Simulate receiving a message if (messageHandler) { messageHandler("orchestrator:events", JSON.stringify(mockEvent)); } expect(handler).toHaveBeenCalledWith(mockEvent); }); it("should handle invalid JSON in events gracefully with logger", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); const loggerError = vi.fn(); // Create client with logger const clientWithLogger = new ValkeyClient({ host: "localhost", port: 6379, logger: { error: loggerError }, }); // Mock duplicate for new client mockRedis.duplicate.mockReturnValue(mockRedis); await clientWithLogger.subscribeToEvents(handler); // Simulate receiving invalid JSON if (messageHandler) { messageHandler("orchestrator:events", "invalid json"); } expect(handler).not.toHaveBeenCalled(); expect(loggerError).toHaveBeenCalled(); expect(loggerError).toHaveBeenCalledWith( expect.stringContaining("Failed to parse event from channel orchestrator:events"), expect.any(Error) ); }); it("should invoke error handler when provided", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); const errorHandler = vi.fn(); await client.subscribeToEvents(handler, errorHandler); // Simulate receiving invalid JSON if (messageHandler) { messageHandler("orchestrator:events", "invalid json"); } expect(handler).not.toHaveBeenCalled(); expect(errorHandler).toHaveBeenCalledWith( expect.any(Error), "invalid json", "orchestrator:events" ); }); it("should handle errors without logger or error handler", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); await client.subscribeToEvents(handler); // Should not throw when neither logger nor error handler is provided expect(() => { if (messageHandler) { messageHandler("orchestrator:events", "invalid json"); } }).not.toThrow(); expect(handler).not.toHaveBeenCalled(); }); }); describe("Edge Cases", () => { it("should handle task updates with error parameter", async () => { const taskState: TaskState = { taskId: "task-123", status: "pending", context: { repository: "https://github.com/example/repo", branch: "main", workItems: ["item-1"], }, createdAt: "2026-02-02T10:00:00Z", updatedAt: "2026-02-02T10:00:00Z", }; mockRedis.get.mockResolvedValue(JSON.stringify(taskState)); mockRedis.set.mockResolvedValue("OK"); const result = await client.updateTaskStatus("task-123", "failed", undefined, "Test error"); expect(result.status).toBe("failed"); expect(result.metadata?.error).toBe("Test error"); }); it("should handle agent updates with error parameter", async () => { const agentState: AgentState = { agentId: "agent-456", status: "running", taskId: "task-123", }; mockRedis.get.mockResolvedValue(JSON.stringify(agentState)); mockRedis.set.mockResolvedValue("OK"); const result = await client.updateAgentStatus("agent-456", "failed", "Test error"); expect(result.status).toBe("failed"); expect(result.error).toBe("Test error"); }); it("should filter out null values in listTasks (key deleted between SCAN and MGET)", async () => { const validTask = { taskId: "task-1", status: "pending", context: { repository: "repo", branch: "main", workItems: ["item-1"] }, createdAt: "2026-02-02T10:00:00Z", updatedAt: "2026-02-02T10:00:00Z", }; mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:task:task-1", "orchestrator:task:task-2"], ]); // MGET returns null for deleted keys mockRedis.mget.mockResolvedValue([JSON.stringify(validTask), null]); const result = await client.listTasks(); expect(result).toHaveLength(1); expect(result[0].taskId).toBe("task-1"); }); it("should filter out null values in listAgents (key deleted between SCAN and MGET)", async () => { const validAgent = { agentId: "agent-1", status: "running", taskId: "task-1", }; mockRedis.scan.mockResolvedValue([ "0", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"], ]); // MGET returns null for deleted keys mockRedis.mget.mockResolvedValue([JSON.stringify(validAgent), null]); const result = await client.listAgents(); expect(result).toHaveLength(1); expect(result[0].agentId).toBe("agent-1"); }); }); describe("SCAN-based iteration (large key sets)", () => { const makeValidTask = (taskId: string): object => ({ taskId, status: "pending", context: { repository: "repo", branch: "main", workItems: ["item-1"] }, createdAt: "2026-02-02T10:00:00Z", updatedAt: "2026-02-02T10:00:00Z", }); const makeValidAgent = (agentId: string): object => ({ agentId, status: "running", taskId: "task-1", }); it("should handle multiple SCAN iterations for tasks with single MGET", async () => { // Simulate SCAN returning multiple batches with cursor pagination mockRedis.scan .mockResolvedValueOnce(["42", ["orchestrator:task:task-1", "orchestrator:task:task-2"]]) // First batch, cursor 42 .mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done // MGET called once with all keys after SCAN completes mockRedis.mget.mockResolvedValue([ JSON.stringify(makeValidTask("task-1")), JSON.stringify(makeValidTask("task-2")), JSON.stringify(makeValidTask("task-3")), ]); const result = await client.listTasks(); expect(mockRedis.scan).toHaveBeenCalledTimes(2); expect(mockRedis.scan).toHaveBeenNthCalledWith( 1, "0", "MATCH", "orchestrator:task:*", "COUNT", 100 ); expect(mockRedis.scan).toHaveBeenNthCalledWith( 2, "42", "MATCH", "orchestrator:task:*", "COUNT", 100 ); // Verify single MGET with all keys (not N individual GETs) expect(mockRedis.mget).toHaveBeenCalledTimes(1); expect(mockRedis.mget).toHaveBeenCalledWith( "orchestrator:task:task-1", "orchestrator:task:task-2", "orchestrator:task:task-3" ); expect(mockRedis.get).not.toHaveBeenCalled(); expect(result).toHaveLength(3); expect(result.map((t) => t.taskId)).toEqual(["task-1", "task-2", "task-3"]); }); it("should handle multiple SCAN iterations for agents with single MGET", async () => { // Simulate SCAN returning multiple batches with cursor pagination mockRedis.scan .mockResolvedValueOnce(["99", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"]]) // First batch .mockResolvedValueOnce(["50", ["orchestrator:agent:agent-3"]]) // Second batch .mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done // MGET called once with all keys after SCAN completes mockRedis.mget.mockResolvedValue([ JSON.stringify(makeValidAgent("agent-1")), JSON.stringify(makeValidAgent("agent-2")), JSON.stringify(makeValidAgent("agent-3")), JSON.stringify(makeValidAgent("agent-4")), ]); const result = await client.listAgents(); expect(mockRedis.scan).toHaveBeenCalledTimes(3); // Verify single MGET with all keys (not N individual GETs) expect(mockRedis.mget).toHaveBeenCalledTimes(1); expect(mockRedis.mget).toHaveBeenCalledWith( "orchestrator:agent:agent-1", "orchestrator:agent:agent-2", "orchestrator:agent:agent-3", "orchestrator:agent:agent-4" ); expect(mockRedis.get).not.toHaveBeenCalled(); expect(result).toHaveLength(4); expect(result.map((a) => a.agentId)).toEqual(["agent-1", "agent-2", "agent-3", "agent-4"]); }); it("should handle empty result from SCAN without calling MGET", async () => { mockRedis.scan.mockResolvedValue(["0", []]); const result = await client.listTasks(); expect(mockRedis.scan).toHaveBeenCalledTimes(1); // MGET should not be called when there are no keys expect(mockRedis.mget).not.toHaveBeenCalled(); expect(result).toHaveLength(0); }); }); describe("Zod Validation (SEC-ORCH-6)", () => { describe("Task State Validation", () => { const validTaskState: TaskState = { taskId: "task-123", status: "pending", context: { repository: "https://github.com/example/repo", branch: "main", workItems: ["item-1"], }, createdAt: "2026-02-02T10:00:00Z", updatedAt: "2026-02-02T10:00:00Z", }; it("should accept valid task state data", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(validTaskState)); const result = await client.getTaskState("task-123"); expect(result).toEqual(validTaskState); }); it("should reject task with missing required fields", async () => { const invalidTask = { taskId: "task-123" }; // Missing status, context, etc. mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); }); it("should reject task with invalid status value", async () => { const invalidTask = { ...validTaskState, status: "invalid-status", // Not a valid TaskStatus }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); }); it("should reject task with missing context fields", async () => { const invalidTask = { ...validTaskState, context: { repository: "repo" }, // Missing branch and workItems }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); await expect(client.getTaskState("task-123")).rejects.toThrow(ValkeyValidationError); }); it("should reject corrupted JSON data for task", async () => { mockRedis.get.mockResolvedValue("not valid json {{{"); await expect(client.getTaskState("task-123")).rejects.toThrow(); }); it("should include key name in validation error", async () => { const invalidTask = { taskId: "task-123" }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); try { await client.getTaskState("task-123"); expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(ValkeyValidationError); expect((error as ValkeyValidationError).key).toBe("orchestrator:task:task-123"); } }); it("should include data snippet in validation error", async () => { const invalidTask = { taskId: "task-123", invalidField: "x".repeat(200) }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); try { await client.getTaskState("task-123"); expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(ValkeyValidationError); const valError = error as ValkeyValidationError; expect(valError.dataSnippet.length).toBeLessThanOrEqual(103); // 100 chars + "..." } }); it("should log validation errors with logger", async () => { const loggerError = vi.fn(); const clientWithLogger = new ValkeyClient({ host: "localhost", port: 6379, logger: { error: loggerError }, }); const invalidTask = { taskId: "task-123" }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidTask)); await expect(clientWithLogger.getTaskState("task-123")).rejects.toThrow( ValkeyValidationError ); expect(loggerError).toHaveBeenCalled(); }); it("should reject invalid data in listTasks", async () => { mockRedis.scan.mockResolvedValue(["0", ["orchestrator:task:task-1"]]); mockRedis.mget.mockResolvedValue([JSON.stringify({ taskId: "task-1" })]); // Invalid await expect(client.listTasks()).rejects.toThrow(ValkeyValidationError); }); }); describe("Agent State Validation", () => { const validAgentState: AgentState = { agentId: "agent-456", status: "spawning", taskId: "task-123", }; it("should accept valid agent state data", async () => { mockRedis.get.mockResolvedValue(JSON.stringify(validAgentState)); const result = await client.getAgentState("agent-456"); expect(result).toEqual(validAgentState); }); it("should reject agent with missing required fields", async () => { const invalidAgent = { agentId: "agent-456" }; // Missing status, taskId mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError); }); it("should reject agent with invalid status value", async () => { const invalidAgent = { ...validAgentState, status: "not-a-status", // Not a valid AgentStatus }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); await expect(client.getAgentState("agent-456")).rejects.toThrow(ValkeyValidationError); }); it("should reject corrupted JSON data for agent", async () => { mockRedis.get.mockResolvedValue("corrupted data <<<"); await expect(client.getAgentState("agent-456")).rejects.toThrow(); }); it("should include key name in agent validation error", async () => { const invalidAgent = { agentId: "agent-456" }; mockRedis.get.mockResolvedValue(JSON.stringify(invalidAgent)); try { await client.getAgentState("agent-456"); expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(ValkeyValidationError); expect((error as ValkeyValidationError).key).toBe("orchestrator:agent:agent-456"); } }); it("should reject invalid data in listAgents", async () => { mockRedis.scan.mockResolvedValue(["0", ["orchestrator:agent:agent-1"]]); mockRedis.mget.mockResolvedValue([JSON.stringify({ agentId: "agent-1" })]); // Invalid await expect(client.listAgents()).rejects.toThrow(ValkeyValidationError); }); }); describe("Event Validation", () => { it("should accept valid agent event", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); await client.subscribeToEvents(handler); const validEvent: OrchestratorEvent = { type: "agent.spawned", agentId: "agent-1", taskId: "task-1", timestamp: "2026-02-02T10:00:00Z", }; if (messageHandler) { messageHandler("orchestrator:events", JSON.stringify(validEvent)); } expect(handler).toHaveBeenCalledWith(validEvent); }); it("should reject event with invalid type", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); const errorHandler = vi.fn(); await client.subscribeToEvents(handler, errorHandler); const invalidEvent = { type: "invalid.event.type", agentId: "agent-1", taskId: "task-1", timestamp: "2026-02-02T10:00:00Z", }; if (messageHandler) { messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); } expect(handler).not.toHaveBeenCalled(); expect(errorHandler).toHaveBeenCalled(); }); it("should reject event with missing required fields", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const handler = vi.fn(); const errorHandler = vi.fn(); await client.subscribeToEvents(handler, errorHandler); const invalidEvent = { type: "agent.spawned", // Missing agentId, taskId, timestamp }; if (messageHandler) { messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); } expect(handler).not.toHaveBeenCalled(); expect(errorHandler).toHaveBeenCalled(); }); it("should log validation errors for events with logger", async () => { mockRedis.subscribe.mockResolvedValue(1); let messageHandler: ((channel: string, message: string) => void) | undefined; mockRedis.on.mockImplementation( (event: string, handler: (channel: string, message: string) => void) => { if (event === "message") { messageHandler = handler; } return mockRedis; } ); const loggerError = vi.fn(); const clientWithLogger = new ValkeyClient({ host: "localhost", port: 6379, logger: { error: loggerError }, }); mockRedis.duplicate.mockReturnValue(mockRedis); await clientWithLogger.subscribeToEvents(vi.fn()); const invalidEvent = { type: "invalid.type" }; if (messageHandler) { messageHandler("orchestrator:events", JSON.stringify(invalidEvent)); } expect(loggerError).toHaveBeenCalled(); expect(loggerError).toHaveBeenCalledWith( expect.stringContaining("Failed to validate event"), expect.any(Error) ); }); }); }); });