From 6a4f58dc1c2606e8627c231ed2b5a7379459e6a8 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Thu, 5 Feb 2026 15:49:08 -0600 Subject: [PATCH] fix(#337): Replace blocking KEYS command with SCAN in Valkey client - Use SCAN with cursor for non-blocking iteration - Prevents Redis DoS under high key counts - Same API, safer implementation Refs #337 Co-Authored-By: Claude Opus 4.5 --- .../src/valkey/valkey.client.spec.ts | 111 ++++++++++++++++-- apps/orchestrator/src/valkey/valkey.client.ts | 21 +++- 2 files changed, 117 insertions(+), 15 deletions(-) diff --git a/apps/orchestrator/src/valkey/valkey.client.spec.ts b/apps/orchestrator/src/valkey/valkey.client.spec.ts index ad68318..f6e2035 100644 --- a/apps/orchestrator/src/valkey/valkey.client.spec.ts +++ b/apps/orchestrator/src/valkey/valkey.client.spec.ts @@ -12,7 +12,7 @@ const mockRedisInstance = { on: vi.fn(), quit: vi.fn(), duplicate: vi.fn(), - keys: vi.fn(), + scan: vi.fn(), }; // Mock ioredis @@ -153,15 +153,25 @@ describe("ValkeyClient", () => { ); }); - it("should list all task states", async () => { - mockRedis.keys.mockResolvedValue(["orchestrator:task:task-1", "orchestrator:task:task-2"]); + it("should list all task states using SCAN", async () => { + // SCAN returns [cursor, keys] - cursor "0" means complete + mockRedis.scan.mockResolvedValue([ + "0", + ["orchestrator:task:task-1", "orchestrator:task:task-2"], + ]); mockRedis.get .mockResolvedValueOnce(JSON.stringify({ ...mockTaskState, taskId: "task-1" })) .mockResolvedValueOnce(JSON.stringify({ ...mockTaskState, taskId: "task-2" })); const result = await client.listTasks(); - expect(mockRedis.keys).toHaveBeenCalledWith("orchestrator:task:*"); + expect(mockRedis.scan).toHaveBeenCalledWith( + "0", + "MATCH", + "orchestrator:task:*", + "COUNT", + 100 + ); expect(result).toHaveLength(2); expect(result[0].taskId).toBe("task-1"); expect(result[1].taskId).toBe("task-2"); @@ -251,10 +261,11 @@ describe("ValkeyClient", () => { ); }); - it("should list all agent states", async () => { - mockRedis.keys.mockResolvedValue([ - "orchestrator:agent:agent-1", - "orchestrator:agent:agent-2", + it("should list all agent states using SCAN", async () => { + // SCAN returns [cursor, keys] - cursor "0" means complete + mockRedis.scan.mockResolvedValue([ + "0", + ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"], ]); mockRedis.get .mockResolvedValueOnce(JSON.stringify({ ...mockAgentState, agentId: "agent-1" })) @@ -262,7 +273,13 @@ describe("ValkeyClient", () => { const result = await client.listAgents(); - expect(mockRedis.keys).toHaveBeenCalledWith("orchestrator:agent:*"); + expect(mockRedis.scan).toHaveBeenCalledWith( + "0", + "MATCH", + "orchestrator:agent:*", + "COUNT", + 100 + ); expect(result).toHaveLength(2); expect(result[0].agentId).toBe("agent-1"); expect(result[1].agentId).toBe("agent-2"); @@ -462,7 +479,10 @@ describe("ValkeyClient", () => { }); it("should filter out null values in listTasks", async () => { - mockRedis.keys.mockResolvedValue(["orchestrator:task:task-1", "orchestrator:task:task-2"]); + mockRedis.scan.mockResolvedValue([ + "0", + ["orchestrator:task:task-1", "orchestrator:task:task-2"], + ]); mockRedis.get .mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" })) .mockResolvedValueOnce(null); // Simulate deleted task @@ -474,9 +494,9 @@ describe("ValkeyClient", () => { }); it("should filter out null values in listAgents", async () => { - mockRedis.keys.mockResolvedValue([ - "orchestrator:agent:agent-1", - "orchestrator:agent:agent-2", + mockRedis.scan.mockResolvedValue([ + "0", + ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"], ]); mockRedis.get .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" })) @@ -488,4 +508,69 @@ describe("ValkeyClient", () => { expect(result[0].agentId).toBe("agent-1"); }); }); + + describe("SCAN-based iteration (large key sets)", () => { + it("should handle multiple SCAN iterations for tasks", async () => { + // Simulate SCAN returning multiple batches with cursor pagination + mockRedis.scan + .mockResolvedValueOnce(["42", ["orchestrator:task:task-1", "orchestrator:task:task-2"]]) // First batch, cursor 42 + .mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done + + mockRedis.get + .mockResolvedValueOnce(JSON.stringify({ taskId: "task-1", status: "pending" })) + .mockResolvedValueOnce(JSON.stringify({ taskId: "task-2", status: "pending" })) + .mockResolvedValueOnce(JSON.stringify({ taskId: "task-3", status: "pending" })); + + const result = await client.listTasks(); + + expect(mockRedis.scan).toHaveBeenCalledTimes(2); + expect(mockRedis.scan).toHaveBeenNthCalledWith( + 1, + "0", + "MATCH", + "orchestrator:task:*", + "COUNT", + 100 + ); + expect(mockRedis.scan).toHaveBeenNthCalledWith( + 2, + "42", + "MATCH", + "orchestrator:task:*", + "COUNT", + 100 + ); + expect(result).toHaveLength(3); + expect(result.map((t) => t.taskId)).toEqual(["task-1", "task-2", "task-3"]); + }); + + it("should handle multiple SCAN iterations for agents", async () => { + // Simulate SCAN returning multiple batches with cursor pagination + mockRedis.scan + .mockResolvedValueOnce(["99", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"]]) // First batch + .mockResolvedValueOnce(["50", ["orchestrator:agent:agent-3"]]) // Second batch + .mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done + + mockRedis.get + .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-1", status: "running" })) + .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-2", status: "running" })) + .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-3", status: "running" })) + .mockResolvedValueOnce(JSON.stringify({ agentId: "agent-4", status: "running" })); + + const result = await client.listAgents(); + + expect(mockRedis.scan).toHaveBeenCalledTimes(3); + expect(result).toHaveLength(4); + expect(result.map((a) => a.agentId)).toEqual(["agent-1", "agent-2", "agent-3", "agent-4"]); + }); + + it("should handle empty result from SCAN", async () => { + mockRedis.scan.mockResolvedValue(["0", []]); + + const result = await client.listTasks(); + + expect(mockRedis.scan).toHaveBeenCalledTimes(1); + expect(result).toHaveLength(0); + }); + }); }); diff --git a/apps/orchestrator/src/valkey/valkey.client.ts b/apps/orchestrator/src/valkey/valkey.client.ts index 0619774..5d9aeb2 100644 --- a/apps/orchestrator/src/valkey/valkey.client.ts +++ b/apps/orchestrator/src/valkey/valkey.client.ts @@ -113,7 +113,7 @@ export class ValkeyClient { async listTasks(): Promise { const pattern = "orchestrator:task:*"; - const keys = await this.client.keys(pattern); + const keys = await this.scanKeys(pattern); const tasks: TaskState[] = []; for (const key of keys) { @@ -184,7 +184,7 @@ export class ValkeyClient { async listAgents(): Promise { const pattern = "orchestrator:agent:*"; - const keys = await this.client.keys(pattern); + const keys = await this.scanKeys(pattern); const agents: AgentState[] = []; for (const key of keys) { @@ -238,6 +238,23 @@ export class ValkeyClient { * Private helper methods */ + /** + * Scan keys using SCAN command (non-blocking alternative to KEYS) + * Uses cursor-based iteration to avoid blocking Redis + */ + private async scanKeys(pattern: string): Promise { + const keys: string[] = []; + let cursor = "0"; + + do { + const [nextCursor, batch] = await this.client.scan(cursor, "MATCH", pattern, "COUNT", 100); + cursor = nextCursor; + keys.push(...batch); + } while (cursor !== "0"); + + return keys; + } + private getTaskKey(taskId: string): string { return `orchestrator:task:${taskId}`; }