From dfef71b66014a91b8f5ba8250401b3434cffee75 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Feb 2026 15:25:55 -0600 Subject: [PATCH] fix(CQ-ORCH-10): Make BullMQ job retention configurable via env vars Replace hardcoded BullMQ job retention values (completed: 100 jobs / 1h, failed: 1000 jobs / 24h) with configurable env vars to prevent memory growth under load. Adds QUEUE_COMPLETED_RETENTION_COUNT, QUEUE_COMPLETED_RETENTION_AGE_S, QUEUE_FAILED_RETENTION_COUNT, and QUEUE_FAILED_RETENTION_AGE_S to orchestrator config. Defaults preserve existing behavior. Co-Authored-By: Claude Opus 4.6 --- apps/orchestrator/.env.example | 8 ++ .../src/config/orchestrator.config.ts | 9 ++ .../src/queue/queue.service.spec.ts | 91 ++++++++++++++++++- apps/orchestrator/src/queue/queue.service.ts | 26 +++++- 4 files changed, 129 insertions(+), 5 deletions(-) diff --git a/apps/orchestrator/.env.example b/apps/orchestrator/.env.example index 5c7eb68..b17fe0d 100644 --- a/apps/orchestrator/.env.example +++ b/apps/orchestrator/.env.example @@ -28,6 +28,14 @@ SANDBOX_ENABLED=true # Health endpoints (/health/*) remain unauthenticated ORCHESTRATOR_API_KEY=REPLACE_WITH_RANDOM_API_KEY_MINIMUM_32_CHARS +# Queue Job Retention +# Controls how many completed/failed jobs BullMQ retains and for how long. +# Reduce these values under high load to limit memory growth. +QUEUE_COMPLETED_RETENTION_COUNT=100 +QUEUE_COMPLETED_RETENTION_AGE_S=3600 +QUEUE_FAILED_RETENTION_COUNT=1000 +QUEUE_FAILED_RETENTION_AGE_S=86400 + # Quality Gates # YOLO mode bypasses all quality gates (default: false) # WARNING: Only enable for development/testing. Not recommended for production. diff --git a/apps/orchestrator/src/config/orchestrator.config.ts b/apps/orchestrator/src/config/orchestrator.config.ts index d7c7810..66ef1a4 100644 --- a/apps/orchestrator/src/config/orchestrator.config.ts +++ b/apps/orchestrator/src/config/orchestrator.config.ts @@ -43,4 +43,13 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({ spawner: { maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "20", 10), }, + queue: { + completedRetentionCount: parseInt(process.env.QUEUE_COMPLETED_RETENTION_COUNT ?? "100", 10), + completedRetentionAgeSeconds: parseInt( + process.env.QUEUE_COMPLETED_RETENTION_AGE_S ?? "3600", + 10 + ), + failedRetentionCount: parseInt(process.env.QUEUE_FAILED_RETENTION_COUNT ?? "1000", 10), + failedRetentionAgeSeconds: parseInt(process.env.QUEUE_FAILED_RETENTION_AGE_S ?? "86400", 10), + }, })); diff --git a/apps/orchestrator/src/queue/queue.service.spec.ts b/apps/orchestrator/src/queue/queue.service.spec.ts index 2fcf00f..8174cae 100644 --- a/apps/orchestrator/src/queue/queue.service.spec.ts +++ b/apps/orchestrator/src/queue/queue.service.spec.ts @@ -145,6 +145,49 @@ describe("QueueService", () => { expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.baseDelay", 1000); expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxDelay", 60000); }); + + it("should load retention configuration from ConfigService on init", async () => { + const { Queue, Worker } = await import("bullmq"); + const QueueMock = Queue as unknown as ReturnType; + const WorkerMock = Worker as unknown as ReturnType; + + QueueMock.mockImplementation(function (this: unknown) { + return { + add: vi.fn(), + getJobCounts: vi.fn(), + pause: vi.fn(), + resume: vi.fn(), + getJob: vi.fn(), + close: vi.fn(), + }; + } as never); + + WorkerMock.mockImplementation(function (this: unknown) { + return { + on: vi.fn().mockReturnThis(), + close: vi.fn(), + }; + } as never); + + service.onModuleInit(); + + expect(mockConfigService.get).toHaveBeenCalledWith( + "orchestrator.queue.completedRetentionAgeSeconds", + 3600 + ); + expect(mockConfigService.get).toHaveBeenCalledWith( + "orchestrator.queue.completedRetentionCount", + 100 + ); + expect(mockConfigService.get).toHaveBeenCalledWith( + "orchestrator.queue.failedRetentionAgeSeconds", + 86400 + ); + expect(mockConfigService.get).toHaveBeenCalledWith( + "orchestrator.queue.failedRetentionCount", + 1000 + ); + }); }); describe("retry configuration", () => { @@ -301,7 +344,7 @@ describe("QueueService", () => { }); describe("onModuleInit", () => { - it("should initialize BullMQ queue with correct configuration", async () => { + it("should initialize BullMQ queue with default retention configuration", async () => { await service.onModuleInit(); expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", { @@ -323,6 +366,52 @@ describe("QueueService", () => { }); }); + it("should initialize BullMQ queue with custom retention configuration", async () => { + mockConfigService.get = vi.fn((key: string, defaultValue?: unknown) => { + const config: Record = { + "orchestrator.valkey.host": "localhost", + "orchestrator.valkey.port": 6379, + "orchestrator.valkey.password": undefined, + "orchestrator.queue.name": "orchestrator-tasks", + "orchestrator.queue.maxRetries": 3, + "orchestrator.queue.baseDelay": 1000, + "orchestrator.queue.maxDelay": 60000, + "orchestrator.queue.concurrency": 5, + "orchestrator.queue.completedRetentionAgeSeconds": 1800, + "orchestrator.queue.completedRetentionCount": 50, + "orchestrator.queue.failedRetentionAgeSeconds": 43200, + "orchestrator.queue.failedRetentionCount": 500, + }; + return config[key] ?? defaultValue; + }); + + service = new QueueService( + mockValkeyService as unknown as never, + mockConfigService as unknown as never + ); + + vi.clearAllMocks(); + await service.onModuleInit(); + + expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", { + connection: { + host: "localhost", + port: 6379, + password: undefined, + }, + defaultJobOptions: { + removeOnComplete: { + age: 1800, + count: 50, + }, + removeOnFail: { + age: 43200, + count: 500, + }, + }, + }); + }); + it("should initialize BullMQ worker with correct configuration", async () => { await service.onModuleInit(); diff --git a/apps/orchestrator/src/queue/queue.service.ts b/apps/orchestrator/src/queue/queue.service.ts index b829ca6..4bfc741 100644 --- a/apps/orchestrator/src/queue/queue.service.ts +++ b/apps/orchestrator/src/queue/queue.service.ts @@ -45,17 +45,35 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { password: this.configService.get("orchestrator.valkey.password"), }; + // Read retention config + const completedRetentionAge = this.configService.get( + "orchestrator.queue.completedRetentionAgeSeconds", + 3600 + ); + const completedRetentionCount = this.configService.get( + "orchestrator.queue.completedRetentionCount", + 100 + ); + const failedRetentionAge = this.configService.get( + "orchestrator.queue.failedRetentionAgeSeconds", + 86400 + ); + const failedRetentionCount = this.configService.get( + "orchestrator.queue.failedRetentionCount", + 1000 + ); + // Create queue this.queue = new Queue(this.queueName, { connection, defaultJobOptions: { removeOnComplete: { - age: 3600, // Keep completed jobs for 1 hour - count: 100, // Keep last 100 completed jobs + age: completedRetentionAge, + count: completedRetentionCount, }, removeOnFail: { - age: 86400, // Keep failed jobs for 24 hours - count: 1000, // Keep last 1000 failed jobs + age: failedRetentionAge, + count: failedRetentionCount, }, }, });