Files
stack/apps/orchestrator/src/queue/queue.service.spec.ts

1182 lines
35 KiB
TypeScript

import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
import { QueueService } from "./queue.service";
import type { QueuedTask, TaskProcessingResult } from "./types";
import type { Job } from "bullmq";
// Mock BullMQ
vi.mock("bullmq", () => {
return {
Queue: vi.fn(),
Worker: vi.fn(),
Job: vi.fn(),
};
});
describe("QueueService", () => {
describe("calculateBackoffDelay", () => {
let service: QueueService;
beforeEach(() => {
// Create a minimal instance for testing pure functions
const mockValkeyService = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
});
it("should calculate exponential backoff delay", () => {
const baseDelay = 1000;
const maxDelay = 60000;
// Attempt 1: 2000ms (1000 * 2^1)
const delay1 = service.calculateBackoffDelay(1, baseDelay, maxDelay);
expect(delay1).toBe(2000);
// Attempt 2: 4000ms (1000 * 2^2)
const delay2 = service.calculateBackoffDelay(2, baseDelay, maxDelay);
expect(delay2).toBe(4000);
// Attempt 3: 8000ms (1000 * 2^3)
const delay3 = service.calculateBackoffDelay(3, baseDelay, maxDelay);
expect(delay3).toBe(8000);
// Attempt 4: 16000ms (1000 * 2^4)
const delay4 = service.calculateBackoffDelay(4, baseDelay, maxDelay);
expect(delay4).toBe(16000);
});
it("should cap delay at maxDelay", () => {
const baseDelay = 1000;
const maxDelay = 60000;
// Attempt 10 would be 1024000ms, but should be capped at 60000ms
const delay10 = service.calculateBackoffDelay(10, baseDelay, maxDelay);
expect(delay10).toBe(maxDelay);
// Attempt 7 would be 128000ms, should be capped at 60000ms
const delay7 = service.calculateBackoffDelay(7, baseDelay, maxDelay);
expect(delay7).toBe(maxDelay);
});
it("should handle zero baseDelay", () => {
const delay = service.calculateBackoffDelay(3, 0, 60000);
expect(delay).toBe(0);
});
it("should handle attempt 0", () => {
const delay = service.calculateBackoffDelay(0, 1000, 60000);
expect(delay).toBe(1000); // 1000 * 2^0 = 1000
});
it("should handle large attempt numbers", () => {
const baseDelay = 1000;
const maxDelay = 100000;
const delay = service.calculateBackoffDelay(20, baseDelay, maxDelay);
expect(delay).toBe(maxDelay);
});
it("should work with different base delays", () => {
const maxDelay = 100000;
// 500ms base
const delay1 = service.calculateBackoffDelay(2, 500, maxDelay);
expect(delay1).toBe(2000); // 500 * 2^2
// 2000ms base
const delay2 = service.calculateBackoffDelay(2, 2000, maxDelay);
expect(delay2).toBe(8000); // 2000 * 2^2
});
});
describe("validation logic", () => {
let service: QueueService;
let mockValkeyService: {
updateTaskStatus: ReturnType<typeof vi.fn>;
publishEvent: ReturnType<typeof vi.fn>;
};
let mockConfigService: {
get: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
mockValkeyService = {
updateTaskStatus: vi.fn().mockResolvedValue(undefined),
publishEvent: vi.fn().mockResolvedValue(undefined),
};
mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
"orchestrator.queue.concurrency": 5,
};
return config[key] ?? defaultValue;
}),
};
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
});
it("should be defined", () => {
expect(service).toBeDefined();
expect(service.calculateBackoffDelay).toBeDefined();
});
it("should load configuration from ConfigService", () => {
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.name",
"orchestrator-tasks"
);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxRetries", 3);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.baseDelay", 1000);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxDelay", 60000);
});
it("should load retention configuration from ConfigService on init", async () => {
const { Queue, Worker } = await import("bullmq");
const QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
const WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return {
add: vi.fn(),
getJobCounts: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
getJob: vi.fn(),
close: vi.fn(),
};
} as never);
WorkerMock.mockImplementation(function (this: unknown) {
return {
on: vi.fn().mockReturnThis(),
close: vi.fn(),
};
} as never);
service.onModuleInit();
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.completedRetentionAgeSeconds",
3600
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.completedRetentionCount",
100
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.failedRetentionAgeSeconds",
86400
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.failedRetentionCount",
1000
);
});
});
describe("retry configuration", () => {
it("should use default retry configuration", () => {
const mockValkeyService = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
const service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
// Verify defaults were requested
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxRetries", 3);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.baseDelay", 1000);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxDelay", 60000);
});
it("should use custom retry configuration from env", () => {
const mockValkeyService = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
if (key === "orchestrator.queue.maxRetries") return 5;
if (key === "orchestrator.queue.baseDelay") return 2000;
if (key === "orchestrator.queue.maxDelay") return 120000;
return defaultValue;
}),
};
const service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
// Verify custom values were used
const delay1 = service.calculateBackoffDelay(1, 2000, 120000);
expect(delay1).toBe(4000); // 2000 * 2^1
});
});
describe("Module Lifecycle Integration", () => {
let service: QueueService;
let mockValkeyService: {
updateTaskStatus: ReturnType<typeof vi.fn>;
publishEvent: ReturnType<typeof vi.fn>;
};
let mockConfigService: {
get: ReturnType<typeof vi.fn>;
};
let mockQueue: {
add: ReturnType<typeof vi.fn>;
getJobCounts: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
getJob: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
let mockWorker: {
on: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
let workerProcessFn: ((job: Job<QueuedTask>) => Promise<TaskProcessingResult>) | null;
let workerEventHandlers: Record<string, (job?: Job<QueuedTask>, err?: Error) => Promise<void>>;
let QueueMock: ReturnType<typeof vi.fn>;
let WorkerMock: ReturnType<typeof vi.fn>;
beforeEach(async () => {
workerProcessFn = null;
workerEventHandlers = {};
mockValkeyService = {
updateTaskStatus: vi.fn().mockResolvedValue(undefined),
publishEvent: vi.fn().mockResolvedValue(undefined),
};
mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.valkey.password": undefined,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
"orchestrator.queue.concurrency": 5,
};
return config[key] ?? defaultValue;
}),
};
mockQueue = {
add: vi.fn().mockResolvedValue({ id: "job-123" }),
getJobCounts: vi.fn().mockResolvedValue({
waiting: 0,
active: 0,
completed: 0,
failed: 0,
delayed: 0,
}),
pause: vi.fn().mockResolvedValue(undefined),
resume: vi.fn().mockResolvedValue(undefined),
getJob: vi.fn().mockResolvedValue(null),
close: vi.fn().mockResolvedValue(undefined),
};
mockWorker = {
on: vi.fn(
(event: string, handler: (job?: Job<QueuedTask>, err?: Error) => Promise<void>) => {
workerEventHandlers[event] = handler;
return mockWorker;
}
),
close: vi.fn().mockResolvedValue(undefined),
};
// Get mocked modules
const { Queue, Worker } = await import("bullmq");
QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
// Mock Queue constructor
QueueMock.mockImplementation(function (this: unknown, name: string, options?: unknown) {
return mockQueue;
} as never);
// Mock Worker constructor
WorkerMock.mockImplementation(function (
this: unknown,
name: string,
processFn: (job: Job<QueuedTask>) => Promise<TaskProcessingResult>,
options?: unknown
) {
workerProcessFn = processFn;
return mockWorker;
} as never);
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("onModuleInit", () => {
it("should initialize BullMQ queue with default retention configuration", async () => {
await service.onModuleInit();
expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", {
connection: {
host: "localhost",
port: 6379,
password: undefined,
},
defaultJobOptions: {
removeOnComplete: {
age: 3600,
count: 100,
},
removeOnFail: {
age: 86400,
count: 1000,
},
},
});
});
it("should initialize BullMQ queue with custom retention configuration", async () => {
mockConfigService.get = vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.valkey.password": undefined,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
"orchestrator.queue.concurrency": 5,
"orchestrator.queue.completedRetentionAgeSeconds": 1800,
"orchestrator.queue.completedRetentionCount": 50,
"orchestrator.queue.failedRetentionAgeSeconds": 43200,
"orchestrator.queue.failedRetentionCount": 500,
};
return config[key] ?? defaultValue;
});
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
vi.clearAllMocks();
await service.onModuleInit();
expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", {
connection: {
host: "localhost",
port: 6379,
password: undefined,
},
defaultJobOptions: {
removeOnComplete: {
age: 1800,
count: 50,
},
removeOnFail: {
age: 43200,
count: 500,
},
},
});
});
it("should initialize BullMQ worker with correct configuration", async () => {
await service.onModuleInit();
expect(WorkerMock).toHaveBeenCalledWith("orchestrator-tasks", expect.any(Function), {
connection: {
host: "localhost",
port: 6379,
password: undefined,
},
concurrency: 5,
});
});
it("should setup worker event handlers", async () => {
await service.onModuleInit();
expect(mockWorker.on).toHaveBeenCalledWith("failed", expect.any(Function));
expect(mockWorker.on).toHaveBeenCalledWith("completed", expect.any(Function));
});
it("should use password if configured", async () => {
mockConfigService.get = vi.fn((key: string, defaultValue?: unknown) => {
if (key === "orchestrator.valkey.password") return "secret123";
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.concurrency": 5,
};
return config[key] ?? defaultValue;
});
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
vi.clearAllMocks();
await service.onModuleInit();
expect(QueueMock).toHaveBeenCalledWith(
"orchestrator-tasks",
expect.objectContaining({
connection: expect.objectContaining({
password: "secret123",
}),
})
);
});
});
describe("onModuleDestroy", () => {
it("should close worker and queue", async () => {
await service.onModuleInit();
await service.onModuleDestroy();
expect(mockWorker.close).toHaveBeenCalledOnce();
expect(mockQueue.close).toHaveBeenCalledOnce();
});
});
});
describe("addTask Integration", () => {
let service: QueueService;
let mockValkeyService: {
updateTaskStatus: ReturnType<typeof vi.fn>;
publishEvent: ReturnType<typeof vi.fn>;
};
let mockConfigService: {
get: ReturnType<typeof vi.fn>;
};
let mockQueue: {
add: ReturnType<typeof vi.fn>;
getJobCounts: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
getJob: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
let QueueMock: ReturnType<typeof vi.fn>;
let WorkerMock: ReturnType<typeof vi.fn>;
beforeEach(async () => {
mockValkeyService = {
updateTaskStatus: vi.fn().mockResolvedValue(undefined),
publishEvent: vi.fn().mockResolvedValue(undefined),
};
mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
"orchestrator.queue.concurrency": 5,
};
return config[key] ?? defaultValue;
}),
};
mockQueue = {
add: vi.fn().mockResolvedValue({ id: "job-123" }),
getJobCounts: vi.fn().mockResolvedValue({}),
pause: vi.fn().mockResolvedValue(undefined),
resume: vi.fn().mockResolvedValue(undefined),
getJob: vi.fn().mockResolvedValue(null),
close: vi.fn().mockResolvedValue(undefined),
};
const { Queue, Worker } = await import("bullmq");
QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return mockQueue;
} as never);
WorkerMock.mockImplementation(function (this: unknown) {
return {
on: vi.fn().mockReturnThis(),
close: vi.fn().mockResolvedValue(undefined),
};
} as never);
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
await service.onModuleInit();
});
afterEach(() => {
vi.clearAllMocks();
});
it("should add task with default options", async () => {
const taskId = "task-123";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-001"],
};
await service.addTask(taskId, context);
expect(mockQueue.add).toHaveBeenCalledWith(
taskId,
{
taskId,
priority: 5,
retries: 0,
maxRetries: 3,
context,
},
{
priority: 6, // 10 - 5 + 1
attempts: 4, // 3 + 1
backoff: { type: "custom" },
delay: 0,
}
);
});
it("should add task with custom priority", async () => {
const taskId = "task-456";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-002"],
};
await service.addTask(taskId, context, { priority: 8 });
expect(mockQueue.add).toHaveBeenCalledWith(
taskId,
expect.objectContaining({
priority: 8,
}),
expect.objectContaining({
priority: 3, // 10 - 8 + 1
})
);
});
it("should add task with custom maxRetries", async () => {
const taskId = "task-789";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-003"],
};
await service.addTask(taskId, context, { maxRetries: 5 });
expect(mockQueue.add).toHaveBeenCalledWith(
taskId,
expect.objectContaining({
maxRetries: 5,
}),
expect.objectContaining({
attempts: 6, // 5 + 1
})
);
});
it("should add task with delay", async () => {
const taskId = "task-delayed";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-004"],
};
await service.addTask(taskId, context, { delay: 5000 });
expect(mockQueue.add).toHaveBeenCalledWith(
taskId,
expect.any(Object),
expect.objectContaining({
delay: 5000,
})
);
});
it("should throw error if priority is less than 1", async () => {
const taskId = "task-invalid";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-005"],
};
await expect(service.addTask(taskId, context, { priority: 0 })).rejects.toThrow(
"Priority must be between 1 and 10"
);
});
it("should throw error if priority is greater than 10", async () => {
const taskId = "task-invalid";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-006"],
};
await expect(service.addTask(taskId, context, { priority: 11 })).rejects.toThrow(
"Priority must be between 1 and 10"
);
});
it("should throw error if maxRetries is negative", async () => {
const taskId = "task-invalid";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-007"],
};
await expect(service.addTask(taskId, context, { maxRetries: -1 })).rejects.toThrow(
"maxRetries must be non-negative"
);
});
it("should update Valkey task status to pending", async () => {
const taskId = "task-status";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-008"],
};
await service.addTask(taskId, context);
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(taskId, "pending");
});
it("should publish task.queued event", async () => {
const taskId = "task-event";
const context = {
repository: "test-repo",
branch: "main",
workItems: ["US-009"],
};
await service.addTask(taskId, context, { priority: 7 });
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.queued",
timestamp: expect.any(String),
taskId,
data: { priority: 7 },
});
});
});
describe("getStats Integration", () => {
let service: QueueService;
let mockQueue: {
add: ReturnType<typeof vi.fn>;
getJobCounts: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
getJob: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
let QueueMock: ReturnType<typeof vi.fn>;
let WorkerMock: ReturnType<typeof vi.fn>;
beforeEach(async () => {
const mockValkeyService = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
mockQueue = {
add: vi.fn(),
getJobCounts: vi.fn().mockResolvedValue({
waiting: 5,
active: 2,
completed: 10,
failed: 1,
delayed: 3,
}),
pause: vi.fn(),
resume: vi.fn(),
getJob: vi.fn(),
close: vi.fn(),
};
const { Queue, Worker } = await import("bullmq");
QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return mockQueue;
} as never);
WorkerMock.mockImplementation(function (this: unknown) {
return {
on: vi.fn().mockReturnThis(),
close: vi.fn(),
};
} as never);
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
await service.onModuleInit();
});
it("should return correct queue statistics", async () => {
const stats = await service.getStats();
expect(stats).toEqual({
pending: 5,
active: 2,
completed: 10,
failed: 1,
delayed: 3,
});
});
it("should handle zero counts gracefully", async () => {
mockQueue.getJobCounts = vi.fn().mockResolvedValue({});
const stats = await service.getStats();
expect(stats).toEqual({
pending: 0,
active: 0,
completed: 0,
failed: 0,
delayed: 0,
});
});
it("should call getJobCounts with correct status parameters", async () => {
await service.getStats();
expect(mockQueue.getJobCounts).toHaveBeenCalledWith(
"waiting",
"active",
"completed",
"failed",
"delayed"
);
});
});
describe("Queue Control Integration", () => {
let service: QueueService;
let mockQueue: {
add: ReturnType<typeof vi.fn>;
getJobCounts: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
getJob: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
let QueueMock: ReturnType<typeof vi.fn>;
let WorkerMock: ReturnType<typeof vi.fn>;
beforeEach(async () => {
const mockValkeyService = {
updateTaskStatus: vi.fn(),
publishEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => defaultValue),
};
mockQueue = {
add: vi.fn(),
getJobCounts: vi.fn(),
pause: vi.fn().mockResolvedValue(undefined),
resume: vi.fn().mockResolvedValue(undefined),
getJob: vi.fn().mockResolvedValue(null),
close: vi.fn(),
};
const { Queue, Worker } = await import("bullmq");
QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return mockQueue;
} as never);
WorkerMock.mockImplementation(function (this: unknown) {
return {
on: vi.fn().mockReturnThis(),
close: vi.fn(),
};
} as never);
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
await service.onModuleInit();
});
it("should pause queue", async () => {
await service.pause();
expect(mockQueue.pause).toHaveBeenCalledOnce();
});
it("should resume queue", async () => {
await service.resume();
expect(mockQueue.resume).toHaveBeenCalledOnce();
});
it("should remove task from queue when job exists", async () => {
const mockJob = {
remove: vi.fn().mockResolvedValue(undefined),
};
mockQueue.getJob = vi.fn().mockResolvedValue(mockJob);
await service.removeTask("task-123");
expect(mockQueue.getJob).toHaveBeenCalledWith("task-123");
expect(mockJob.remove).toHaveBeenCalledOnce();
});
it("should handle removeTask when job does not exist", async () => {
mockQueue.getJob = vi.fn().mockResolvedValue(null);
await expect(service.removeTask("non-existent")).resolves.not.toThrow();
expect(mockQueue.getJob).toHaveBeenCalledWith("non-existent");
});
});
describe("Task Processing Integration", () => {
let service: QueueService;
let mockValkeyService: {
updateTaskStatus: ReturnType<typeof vi.fn>;
publishEvent: ReturnType<typeof vi.fn>;
};
let workerProcessFn: ((job: Job<QueuedTask>) => Promise<TaskProcessingResult>) | null;
let workerEventHandlers: Record<string, (job?: Job<QueuedTask>, err?: Error) => Promise<void>>;
let QueueMock: ReturnType<typeof vi.fn>;
let WorkerMock: ReturnType<typeof vi.fn>;
beforeEach(async () => {
workerProcessFn = null;
workerEventHandlers = {};
mockValkeyService = {
updateTaskStatus: vi.fn().mockResolvedValue(undefined),
publishEvent: vi.fn().mockResolvedValue(undefined),
};
const mockConfigService = {
get: vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
};
return config[key] ?? defaultValue;
}),
};
const mockQueue = {
add: vi.fn(),
getJobCounts: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
getJob: vi.fn(),
close: vi.fn(),
};
const mockWorker = {
on: vi.fn(
(event: string, handler: (job?: Job<QueuedTask>, err?: Error) => Promise<void>) => {
workerEventHandlers[event] = handler;
return mockWorker;
}
),
close: vi.fn(),
};
const { Queue, Worker } = await import("bullmq");
QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return mockQueue;
} as never);
WorkerMock.mockImplementation(function (
this: unknown,
name: string,
processFn: (job: Job<QueuedTask>) => Promise<TaskProcessingResult>,
options?: unknown
) {
workerProcessFn = processFn;
return mockWorker;
} as never);
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
await service.onModuleInit();
});
it("should process task successfully", async () => {
const mockJob = {
data: {
taskId: "task-123",
priority: 5,
retries: 0,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-001"],
},
},
attemptsMade: 0,
updateData: vi.fn(),
} as unknown as Job<QueuedTask>;
const result = await workerProcessFn!(mockJob);
expect(result).toEqual({
success: true,
metadata: { attempt: 1 },
});
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(
"task-123",
"executing",
undefined
);
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.executing",
timestamp: expect.any(String),
taskId: "task-123",
agentId: undefined,
data: { attempt: 1, dispatchedByQueue: true },
});
});
it("should handle task completion", async () => {
const mockJob = {
data: {
taskId: "task-completed",
priority: 5,
retries: 0,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-002"],
},
},
} as Job<QueuedTask>;
await workerEventHandlers["completed"](mockJob);
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(
"task-completed",
"completed"
);
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.completed",
timestamp: expect.any(String),
taskId: "task-completed",
});
});
it("should handle task failure", async () => {
const mockJob = {
data: {
taskId: "task-failed",
priority: 5,
retries: 0,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-003"],
},
},
} as Job<QueuedTask>;
const error = new Error("Processing failed");
await workerEventHandlers["failed"](mockJob, error);
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(
"task-failed",
"failed",
undefined,
"Processing failed"
);
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.failed",
timestamp: expect.any(String),
taskId: "task-failed",
error: "Processing failed",
});
});
it("should handle retry on failure", async () => {
const mockJob = {
data: {
taskId: "task-retry",
priority: 5,
retries: 0,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-004"],
},
},
attemptsMade: 1,
updateData: vi.fn().mockResolvedValue(undefined),
} as unknown as Job<QueuedTask>;
// Mock processTask to throw error
const error = new Error("Temporary failure");
try {
await workerProcessFn!(mockJob);
} catch (err) {
// Expected to throw
}
// Manually trigger retry logic by calling processTask again
mockValkeyService.updateTaskStatus.mockImplementation(() => {
throw error;
});
await expect(workerProcessFn!(mockJob)).rejects.toThrow("Temporary failure");
expect(mockJob.updateData).toHaveBeenCalledWith({
...mockJob.data,
retries: 2,
});
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "task.retry",
taskId: "task-retry",
data: expect.objectContaining({
attempt: 2,
nextDelay: expect.any(Number),
}),
})
);
});
it("should calculate correct backoff delay on retry", async () => {
const mockJob = {
data: {
taskId: "task-backoff",
priority: 5,
retries: 0,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-005"],
},
},
attemptsMade: 2,
updateData: vi.fn().mockResolvedValue(undefined),
} as unknown as Job<QueuedTask>;
mockValkeyService.updateTaskStatus.mockImplementation(() => {
throw new Error("Retry test");
});
await expect(workerProcessFn!(mockJob)).rejects.toThrow();
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "task.retry",
data: expect.objectContaining({
nextDelay: 8000, // 1000 * 2^3
}),
})
);
});
it("should not retry after max retries exceeded", async () => {
const mockJob = {
data: {
taskId: "task-max-retry",
priority: 5,
retries: 3,
maxRetries: 3,
context: {
repository: "test-repo",
branch: "main",
workItems: ["US-006"],
},
},
attemptsMade: 3,
updateData: vi.fn(),
} as unknown as Job<QueuedTask>;
mockValkeyService.updateTaskStatus.mockImplementation(() => {
throw new Error("Max retries exceeded");
});
await expect(workerProcessFn!(mockJob)).rejects.toThrow();
// Should not publish retry event
expect(mockValkeyService.publishEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
type: "task.retry",
})
);
});
});
});