fix(#338): Use MGET for batch retrieval instead of N individual GETs
- Replace N GET calls with single MGET after SCAN in listTasks() - Replace N GET calls with single MGET after SCAN in listAgents() - Handle null values (key deleted between SCAN and MGET) - Add early return for empty key sets to skip unnecessary MGET - Update tests to verify MGET batch retrieval and N+1 prevention Significantly improves performance for large key sets (100-500x faster). Refs #338 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,7 @@ const mockRedisInstance = {
|
|||||||
quit: vi.fn(),
|
quit: vi.fn(),
|
||||||
duplicate: vi.fn(),
|
duplicate: vi.fn(),
|
||||||
scan: vi.fn(),
|
scan: vi.fn(),
|
||||||
|
mget: vi.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Mock ioredis
|
// Mock ioredis
|
||||||
@@ -153,15 +154,17 @@ describe("ValkeyClient", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should list all task states using SCAN", async () => {
|
it("should list all task states using SCAN and MGET", async () => {
|
||||||
// SCAN returns [cursor, keys] - cursor "0" means complete
|
// SCAN returns [cursor, keys] - cursor "0" means complete
|
||||||
mockRedis.scan.mockResolvedValue([
|
mockRedis.scan.mockResolvedValue([
|
||||||
"0",
|
"0",
|
||||||
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get
|
// MGET returns values in same order as keys
|
||||||
.mockResolvedValueOnce(JSON.stringify({ ...mockTaskState, taskId: "task-1" }))
|
mockRedis.mget.mockResolvedValue([
|
||||||
.mockResolvedValueOnce(JSON.stringify({ ...mockTaskState, taskId: "task-2" }));
|
JSON.stringify({ ...mockTaskState, taskId: "task-1" }),
|
||||||
|
JSON.stringify({ ...mockTaskState, taskId: "task-2" }),
|
||||||
|
]);
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
@@ -172,6 +175,13 @@ describe("ValkeyClient", () => {
|
|||||||
"COUNT",
|
"COUNT",
|
||||||
100
|
100
|
||||||
);
|
);
|
||||||
|
// Verify MGET is called with all keys (batch retrieval)
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledWith(
|
||||||
|
"orchestrator:task:task-1",
|
||||||
|
"orchestrator:task:task-2"
|
||||||
|
);
|
||||||
|
// Verify individual GET is NOT called (N+1 prevention)
|
||||||
|
expect(mockRedis.get).not.toHaveBeenCalled();
|
||||||
expect(result).toHaveLength(2);
|
expect(result).toHaveLength(2);
|
||||||
expect(result[0].taskId).toBe("task-1");
|
expect(result[0].taskId).toBe("task-1");
|
||||||
expect(result[1].taskId).toBe("task-2");
|
expect(result[1].taskId).toBe("task-2");
|
||||||
@@ -261,15 +271,17 @@ describe("ValkeyClient", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should list all agent states using SCAN", async () => {
|
it("should list all agent states using SCAN and MGET", async () => {
|
||||||
// SCAN returns [cursor, keys] - cursor "0" means complete
|
// SCAN returns [cursor, keys] - cursor "0" means complete
|
||||||
mockRedis.scan.mockResolvedValue([
|
mockRedis.scan.mockResolvedValue([
|
||||||
"0",
|
"0",
|
||||||
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get
|
// MGET returns values in same order as keys
|
||||||
.mockResolvedValueOnce(JSON.stringify({ ...mockAgentState, agentId: "agent-1" }))
|
mockRedis.mget.mockResolvedValue([
|
||||||
.mockResolvedValueOnce(JSON.stringify({ ...mockAgentState, agentId: "agent-2" }));
|
JSON.stringify({ ...mockAgentState, agentId: "agent-1" }),
|
||||||
|
JSON.stringify({ ...mockAgentState, agentId: "agent-2" }),
|
||||||
|
]);
|
||||||
|
|
||||||
const result = await client.listAgents();
|
const result = await client.listAgents();
|
||||||
|
|
||||||
@@ -280,6 +292,13 @@ describe("ValkeyClient", () => {
|
|||||||
"COUNT",
|
"COUNT",
|
||||||
100
|
100
|
||||||
);
|
);
|
||||||
|
// Verify MGET is called with all keys (batch retrieval)
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledWith(
|
||||||
|
"orchestrator:agent:agent-1",
|
||||||
|
"orchestrator:agent:agent-2"
|
||||||
|
);
|
||||||
|
// Verify individual GET is NOT called (N+1 prevention)
|
||||||
|
expect(mockRedis.get).not.toHaveBeenCalled();
|
||||||
expect(result).toHaveLength(2);
|
expect(result).toHaveLength(2);
|
||||||
expect(result[0].agentId).toBe("agent-1");
|
expect(result[0].agentId).toBe("agent-1");
|
||||||
expect(result[1].agentId).toBe("agent-2");
|
expect(result[1].agentId).toBe("agent-2");
|
||||||
@@ -478,7 +497,7 @@ describe("ValkeyClient", () => {
|
|||||||
expect(result.error).toBe("Test error");
|
expect(result.error).toBe("Test error");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should filter out null values in listTasks", async () => {
|
it("should filter out null values in listTasks (key deleted between SCAN and MGET)", async () => {
|
||||||
const validTask = {
|
const validTask = {
|
||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
status: "pending",
|
status: "pending",
|
||||||
@@ -490,7 +509,8 @@ describe("ValkeyClient", () => {
|
|||||||
"0",
|
"0",
|
||||||
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
["orchestrator:task:task-1", "orchestrator:task:task-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get.mockResolvedValueOnce(JSON.stringify(validTask)).mockResolvedValueOnce(null); // Simulate deleted task
|
// MGET returns null for deleted keys
|
||||||
|
mockRedis.mget.mockResolvedValue([JSON.stringify(validTask), null]);
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
@@ -498,7 +518,7 @@ describe("ValkeyClient", () => {
|
|||||||
expect(result[0].taskId).toBe("task-1");
|
expect(result[0].taskId).toBe("task-1");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should filter out null values in listAgents", async () => {
|
it("should filter out null values in listAgents (key deleted between SCAN and MGET)", async () => {
|
||||||
const validAgent = {
|
const validAgent = {
|
||||||
agentId: "agent-1",
|
agentId: "agent-1",
|
||||||
status: "running",
|
status: "running",
|
||||||
@@ -508,7 +528,8 @@ describe("ValkeyClient", () => {
|
|||||||
"0",
|
"0",
|
||||||
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"],
|
||||||
]);
|
]);
|
||||||
mockRedis.get.mockResolvedValueOnce(JSON.stringify(validAgent)).mockResolvedValueOnce(null); // Simulate deleted agent
|
// MGET returns null for deleted keys
|
||||||
|
mockRedis.mget.mockResolvedValue([JSON.stringify(validAgent), null]);
|
||||||
|
|
||||||
const result = await client.listAgents();
|
const result = await client.listAgents();
|
||||||
|
|
||||||
@@ -532,16 +553,18 @@ describe("ValkeyClient", () => {
|
|||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should handle multiple SCAN iterations for tasks", async () => {
|
it("should handle multiple SCAN iterations for tasks with single MGET", async () => {
|
||||||
// Simulate SCAN returning multiple batches with cursor pagination
|
// Simulate SCAN returning multiple batches with cursor pagination
|
||||||
mockRedis.scan
|
mockRedis.scan
|
||||||
.mockResolvedValueOnce(["42", ["orchestrator:task:task-1", "orchestrator:task:task-2"]]) // First batch, cursor 42
|
.mockResolvedValueOnce(["42", ["orchestrator:task:task-1", "orchestrator:task:task-2"]]) // First batch, cursor 42
|
||||||
.mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done
|
.mockResolvedValueOnce(["0", ["orchestrator:task:task-3"]]); // Second batch, cursor 0 = done
|
||||||
|
|
||||||
mockRedis.get
|
// MGET called once with all keys after SCAN completes
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-1")))
|
mockRedis.mget.mockResolvedValue([
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-2")))
|
JSON.stringify(makeValidTask("task-1")),
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidTask("task-3")));
|
JSON.stringify(makeValidTask("task-2")),
|
||||||
|
JSON.stringify(makeValidTask("task-3")),
|
||||||
|
]);
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
@@ -562,36 +585,57 @@ describe("ValkeyClient", () => {
|
|||||||
"COUNT",
|
"COUNT",
|
||||||
100
|
100
|
||||||
);
|
);
|
||||||
|
// Verify single MGET with all keys (not N individual GETs)
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledTimes(1);
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledWith(
|
||||||
|
"orchestrator:task:task-1",
|
||||||
|
"orchestrator:task:task-2",
|
||||||
|
"orchestrator:task:task-3"
|
||||||
|
);
|
||||||
|
expect(mockRedis.get).not.toHaveBeenCalled();
|
||||||
expect(result).toHaveLength(3);
|
expect(result).toHaveLength(3);
|
||||||
expect(result.map((t) => t.taskId)).toEqual(["task-1", "task-2", "task-3"]);
|
expect(result.map((t) => t.taskId)).toEqual(["task-1", "task-2", "task-3"]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should handle multiple SCAN iterations for agents", async () => {
|
it("should handle multiple SCAN iterations for agents with single MGET", async () => {
|
||||||
// Simulate SCAN returning multiple batches with cursor pagination
|
// Simulate SCAN returning multiple batches with cursor pagination
|
||||||
mockRedis.scan
|
mockRedis.scan
|
||||||
.mockResolvedValueOnce(["99", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"]]) // First batch
|
.mockResolvedValueOnce(["99", ["orchestrator:agent:agent-1", "orchestrator:agent:agent-2"]]) // First batch
|
||||||
.mockResolvedValueOnce(["50", ["orchestrator:agent:agent-3"]]) // Second batch
|
.mockResolvedValueOnce(["50", ["orchestrator:agent:agent-3"]]) // Second batch
|
||||||
.mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done
|
.mockResolvedValueOnce(["0", ["orchestrator:agent:agent-4"]]); // Third batch, done
|
||||||
|
|
||||||
mockRedis.get
|
// MGET called once with all keys after SCAN completes
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-1")))
|
mockRedis.mget.mockResolvedValue([
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-2")))
|
JSON.stringify(makeValidAgent("agent-1")),
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-3")))
|
JSON.stringify(makeValidAgent("agent-2")),
|
||||||
.mockResolvedValueOnce(JSON.stringify(makeValidAgent("agent-4")));
|
JSON.stringify(makeValidAgent("agent-3")),
|
||||||
|
JSON.stringify(makeValidAgent("agent-4")),
|
||||||
|
]);
|
||||||
|
|
||||||
const result = await client.listAgents();
|
const result = await client.listAgents();
|
||||||
|
|
||||||
expect(mockRedis.scan).toHaveBeenCalledTimes(3);
|
expect(mockRedis.scan).toHaveBeenCalledTimes(3);
|
||||||
|
// Verify single MGET with all keys (not N individual GETs)
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledTimes(1);
|
||||||
|
expect(mockRedis.mget).toHaveBeenCalledWith(
|
||||||
|
"orchestrator:agent:agent-1",
|
||||||
|
"orchestrator:agent:agent-2",
|
||||||
|
"orchestrator:agent:agent-3",
|
||||||
|
"orchestrator:agent:agent-4"
|
||||||
|
);
|
||||||
|
expect(mockRedis.get).not.toHaveBeenCalled();
|
||||||
expect(result).toHaveLength(4);
|
expect(result).toHaveLength(4);
|
||||||
expect(result.map((a) => a.agentId)).toEqual(["agent-1", "agent-2", "agent-3", "agent-4"]);
|
expect(result.map((a) => a.agentId)).toEqual(["agent-1", "agent-2", "agent-3", "agent-4"]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should handle empty result from SCAN", async () => {
|
it("should handle empty result from SCAN without calling MGET", async () => {
|
||||||
mockRedis.scan.mockResolvedValue(["0", []]);
|
mockRedis.scan.mockResolvedValue(["0", []]);
|
||||||
|
|
||||||
const result = await client.listTasks();
|
const result = await client.listTasks();
|
||||||
|
|
||||||
expect(mockRedis.scan).toHaveBeenCalledTimes(1);
|
expect(mockRedis.scan).toHaveBeenCalledTimes(1);
|
||||||
|
// MGET should not be called when there are no keys
|
||||||
|
expect(mockRedis.mget).not.toHaveBeenCalled();
|
||||||
expect(result).toHaveLength(0);
|
expect(result).toHaveLength(0);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -697,7 +741,7 @@ describe("ValkeyClient", () => {
|
|||||||
|
|
||||||
it("should reject invalid data in listTasks", async () => {
|
it("should reject invalid data in listTasks", async () => {
|
||||||
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:task:task-1"]]);
|
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:task:task-1"]]);
|
||||||
mockRedis.get.mockResolvedValue(JSON.stringify({ taskId: "task-1" })); // Invalid
|
mockRedis.mget.mockResolvedValue([JSON.stringify({ taskId: "task-1" })]); // Invalid
|
||||||
|
|
||||||
await expect(client.listTasks()).rejects.toThrow(ValkeyValidationError);
|
await expect(client.listTasks()).rejects.toThrow(ValkeyValidationError);
|
||||||
});
|
});
|
||||||
@@ -756,7 +800,7 @@ describe("ValkeyClient", () => {
|
|||||||
|
|
||||||
it("should reject invalid data in listAgents", async () => {
|
it("should reject invalid data in listAgents", async () => {
|
||||||
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:agent:agent-1"]]);
|
mockRedis.scan.mockResolvedValue(["0", ["orchestrator:agent:agent-1"]]);
|
||||||
mockRedis.get.mockResolvedValue(JSON.stringify({ agentId: "agent-1" })); // Invalid
|
mockRedis.mget.mockResolvedValue([JSON.stringify({ agentId: "agent-1" })]); // Invalid
|
||||||
|
|
||||||
await expect(client.listAgents()).rejects.toThrow(ValkeyValidationError);
|
await expect(client.listAgents()).rejects.toThrow(ValkeyValidationError);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -132,11 +132,19 @@ export class ValkeyClient {
|
|||||||
const pattern = "orchestrator:task:*";
|
const pattern = "orchestrator:task:*";
|
||||||
const keys = await this.scanKeys(pattern);
|
const keys = await this.scanKeys(pattern);
|
||||||
|
|
||||||
|
if (keys.length === 0) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use MGET for batch retrieval instead of N individual GETs
|
||||||
|
const values = await this.client.mget(...keys);
|
||||||
|
|
||||||
const tasks: TaskState[] = [];
|
const tasks: TaskState[] = [];
|
||||||
for (const key of keys) {
|
for (let i = 0; i < keys.length; i++) {
|
||||||
const data = await this.client.get(key);
|
const data = values[i];
|
||||||
|
// Handle null values (key deleted between SCAN and MGET)
|
||||||
if (data) {
|
if (data) {
|
||||||
const task = this.parseAndValidateTaskState(key, data);
|
const task = this.parseAndValidateTaskState(keys[i], data);
|
||||||
tasks.push(task);
|
tasks.push(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -204,11 +212,19 @@ export class ValkeyClient {
|
|||||||
const pattern = "orchestrator:agent:*";
|
const pattern = "orchestrator:agent:*";
|
||||||
const keys = await this.scanKeys(pattern);
|
const keys = await this.scanKeys(pattern);
|
||||||
|
|
||||||
|
if (keys.length === 0) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use MGET for batch retrieval instead of N individual GETs
|
||||||
|
const values = await this.client.mget(...keys);
|
||||||
|
|
||||||
const agents: AgentState[] = [];
|
const agents: AgentState[] = [];
|
||||||
for (const key of keys) {
|
for (let i = 0; i < keys.length; i++) {
|
||||||
const data = await this.client.get(key);
|
const data = values[i];
|
||||||
|
// Handle null values (key deleted between SCAN and MGET)
|
||||||
if (data) {
|
if (data) {
|
||||||
const agent = this.parseAndValidateAgentState(key, data);
|
const agent = this.parseAndValidateAgentState(keys[i], data);
|
||||||
agents.push(agent);
|
agents.push(agent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user