Files
stack/apps/api/src/valkey/valkey.service.spec.ts
Jason Woltje 22446acd8a fix(CQ-API-4): Remove Redis event listeners in onModuleDestroy
Add removeAllListeners() call before quit() to prevent memory leaks
from lingering event listeners on the Redis client.

Also update test mock to include removeAllListeners method.

Refs #339

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 19:16:37 -06:00

378 lines
10 KiB
TypeScript

import { Test, TestingModule } from "@nestjs/testing";
import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
import { ValkeyService } from "./valkey.service";
import { TaskStatus } from "./dto/task.dto";
// Mock ioredis module
vi.mock("ioredis", () => {
// In-memory store for mocked Redis
const store = new Map<string, string>();
const lists = new Map<string, string[]>();
// Mock Redis client class
class MockRedisClient {
// Connection methods
async ping() {
return "PONG";
}
async quit() {
return undefined;
}
on() {
return this;
}
removeAllListeners() {
return this;
}
// String operations
async setex(key: string, ttl: number, value: string) {
store.set(key, value);
return "OK";
}
async get(key: string) {
return store.get(key) || null;
}
// List operations
async rpush(key: string, ...values: string[]) {
if (!lists.has(key)) {
lists.set(key, []);
}
const list = lists.get(key)!;
list.push(...values);
return list.length;
}
async lpop(key: string) {
const list = lists.get(key);
if (!list || list.length === 0) {
return null;
}
return list.shift()!;
}
async llen(key: string) {
const list = lists.get(key);
return list ? list.length : 0;
}
async del(...keys: string[]) {
let deleted = 0;
keys.forEach((key) => {
if (store.delete(key)) deleted++;
if (lists.delete(key)) deleted++;
});
return deleted;
}
}
// Expose helper to clear store
(MockRedisClient as any).__clearStore = () => {
store.clear();
lists.clear();
};
return {
default: MockRedisClient,
};
});
describe("ValkeyService", () => {
let service: ValkeyService;
let module: TestingModule;
beforeEach(async () => {
// Clear environment
process.env.VALKEY_URL = "redis://localhost:6379";
// Clear the mock store before each test
const Redis = await import("ioredis");
(Redis.default as any).__clearStore();
module = await Test.createTestingModule({
providers: [ValkeyService],
}).compile();
service = module.get<ValkeyService>(ValkeyService);
// Initialize the service
await service.onModuleInit();
});
afterEach(async () => {
await service.onModuleDestroy();
});
describe("initialization", () => {
it("should be defined", () => {
expect(service).toBeDefined();
});
it("should connect to Valkey on module init", async () => {
expect(service).toBeDefined();
const healthCheck = await service.healthCheck();
expect(healthCheck).toBe(true);
});
});
describe("enqueue", () => {
it("should enqueue a task successfully", async () => {
const taskDto = {
type: "test-task",
data: { message: "Hello World" },
};
const result = await service.enqueue(taskDto);
expect(result).toBeDefined();
expect(result.id).toBeDefined();
expect(result.type).toBe("test-task");
expect(result.data).toEqual({ message: "Hello World" });
expect(result.status).toBe(TaskStatus.PENDING);
expect(result.createdAt).toBeDefined();
expect(result.updatedAt).toBeDefined();
});
it("should increment queue length when enqueueing", async () => {
const initialLength = await service.getQueueLength();
await service.enqueue({
type: "task-1",
data: {},
});
const newLength = await service.getQueueLength();
expect(newLength).toBe(initialLength + 1);
});
});
describe("dequeue", () => {
it("should return null when queue is empty", async () => {
const result = await service.dequeue();
expect(result).toBeNull();
});
it("should dequeue tasks in FIFO order", async () => {
const task1 = await service.enqueue({
type: "task-1",
data: { order: 1 },
});
const task2 = await service.enqueue({
type: "task-2",
data: { order: 2 },
});
const dequeued1 = await service.dequeue();
expect(dequeued1?.id).toBe(task1.id);
expect(dequeued1?.status).toBe(TaskStatus.PROCESSING);
const dequeued2 = await service.dequeue();
expect(dequeued2?.id).toBe(task2.id);
expect(dequeued2?.status).toBe(TaskStatus.PROCESSING);
});
it("should update task status to PROCESSING when dequeued", async () => {
const task = await service.enqueue({
type: "test-task",
data: {},
});
const dequeued = await service.dequeue();
expect(dequeued?.status).toBe(TaskStatus.PROCESSING);
const status = await service.getStatus(task.id);
expect(status?.status).toBe(TaskStatus.PROCESSING);
});
});
describe("getStatus", () => {
it("should return null for non-existent task", async () => {
const status = await service.getStatus("non-existent-id");
expect(status).toBeNull();
});
it("should return task status for existing task", async () => {
const task = await service.enqueue({
type: "test-task",
data: { key: "value" },
});
const status = await service.getStatus(task.id);
expect(status).toBeDefined();
expect(status?.id).toBe(task.id);
expect(status?.type).toBe("test-task");
expect(status?.data).toEqual({ key: "value" });
});
});
describe("updateStatus", () => {
it("should update task status to COMPLETED", async () => {
const task = await service.enqueue({
type: "test-task",
data: {},
});
const updated = await service.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
result: { output: "success" },
});
expect(updated).toBeDefined();
expect(updated?.status).toBe(TaskStatus.COMPLETED);
expect(updated?.completedAt).toBeDefined();
expect(updated?.data).toEqual({ output: "success" });
});
it("should update task status to FAILED with error", async () => {
const task = await service.enqueue({
type: "test-task",
data: {},
});
const updated = await service.updateStatus(task.id, {
status: TaskStatus.FAILED,
error: "Task failed due to error",
});
expect(updated).toBeDefined();
expect(updated?.status).toBe(TaskStatus.FAILED);
expect(updated?.error).toBe("Task failed due to error");
expect(updated?.completedAt).toBeDefined();
});
it("should return null when updating non-existent task", async () => {
const updated = await service.updateStatus("non-existent-id", {
status: TaskStatus.COMPLETED,
});
expect(updated).toBeNull();
});
it("should preserve existing data when updating status", async () => {
const task = await service.enqueue({
type: "test-task",
data: { original: "data" },
});
await service.updateStatus(task.id, {
status: TaskStatus.PROCESSING,
});
const status = await service.getStatus(task.id);
expect(status?.data).toEqual({ original: "data" });
});
});
describe("getQueueLength", () => {
it("should return 0 for empty queue", async () => {
const length = await service.getQueueLength();
expect(length).toBe(0);
});
it("should return correct queue length", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
await service.enqueue({ type: "task-3", data: {} });
const length = await service.getQueueLength();
expect(length).toBe(3);
});
it("should decrease when tasks are dequeued", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
expect(await service.getQueueLength()).toBe(2);
await service.dequeue();
expect(await service.getQueueLength()).toBe(1);
await service.dequeue();
expect(await service.getQueueLength()).toBe(0);
});
});
describe("clearQueue", () => {
it("should clear all tasks from queue", async () => {
await service.enqueue({ type: "task-1", data: {} });
await service.enqueue({ type: "task-2", data: {} });
expect(await service.getQueueLength()).toBe(2);
await service.clearQueue();
expect(await service.getQueueLength()).toBe(0);
});
});
describe("healthCheck", () => {
it("should return true when Valkey is healthy", async () => {
const healthy = await service.healthCheck();
expect(healthy).toBe(true);
});
});
describe("integration flow", () => {
it("should handle complete task lifecycle", async () => {
// 1. Enqueue task
const task = await service.enqueue({
type: "email-notification",
data: {
to: "user@example.com",
subject: "Test Email",
},
});
expect(task.status).toBe(TaskStatus.PENDING);
// 2. Dequeue task (worker picks it up)
const dequeuedTask = await service.dequeue();
expect(dequeuedTask?.id).toBe(task.id);
expect(dequeuedTask?.status).toBe(TaskStatus.PROCESSING);
// 3. Update to completed
const completedTask = await service.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
result: {
to: "user@example.com",
subject: "Test Email",
sentAt: new Date().toISOString(),
},
});
expect(completedTask?.status).toBe(TaskStatus.COMPLETED);
expect(completedTask?.completedAt).toBeDefined();
// 4. Verify final state
const finalStatus = await service.getStatus(task.id);
expect(finalStatus?.status).toBe(TaskStatus.COMPLETED);
expect(finalStatus?.data.sentAt).toBeDefined();
});
it("should handle multiple concurrent tasks", async () => {
const tasks = await Promise.all([
service.enqueue({ type: "task-1", data: { id: 1 } }),
service.enqueue({ type: "task-2", data: { id: 2 } }),
service.enqueue({ type: "task-3", data: { id: 3 } }),
]);
expect(await service.getQueueLength()).toBe(3);
const dequeued1 = await service.dequeue();
const dequeued2 = await service.dequeue();
const dequeued3 = await service.dequeue();
expect(dequeued1?.id).toBe(tasks[0].id);
expect(dequeued2?.id).toBe(tasks[1].id);
expect(dequeued3?.id).toBe(tasks[2].id);
expect(await service.getQueueLength()).toBe(0);
});
});
});