fix(CQ-ORCH-10): Make BullMQ job retention configurable via env vars
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

Replace hardcoded BullMQ job retention values (completed: 100 jobs / 1h,
failed: 1000 jobs / 24h) with configurable env vars to prevent memory
growth under load. Adds QUEUE_COMPLETED_RETENTION_COUNT,
QUEUE_COMPLETED_RETENTION_AGE_S, QUEUE_FAILED_RETENTION_COUNT, and
QUEUE_FAILED_RETENTION_AGE_S to orchestrator config. Defaults preserve
existing behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jason Woltje
2026-02-06 15:25:55 -06:00
parent 6934d9261c
commit dfef71b660
4 changed files with 129 additions and 5 deletions

View File

@@ -28,6 +28,14 @@ SANDBOX_ENABLED=true
# Health endpoints (/health/*) remain unauthenticated
ORCHESTRATOR_API_KEY=REPLACE_WITH_RANDOM_API_KEY_MINIMUM_32_CHARS
# Queue Job Retention
# Controls how many completed/failed jobs BullMQ retains and for how long.
# Reduce these values under high load to limit memory growth.
QUEUE_COMPLETED_RETENTION_COUNT=100
QUEUE_COMPLETED_RETENTION_AGE_S=3600
QUEUE_FAILED_RETENTION_COUNT=1000
QUEUE_FAILED_RETENTION_AGE_S=86400
# Quality Gates
# YOLO mode bypasses all quality gates (default: false)
# WARNING: Only enable for development/testing. Not recommended for production.

View File

@@ -43,4 +43,13 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({
spawner: {
maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "20", 10),
},
queue: {
completedRetentionCount: parseInt(process.env.QUEUE_COMPLETED_RETENTION_COUNT ?? "100", 10),
completedRetentionAgeSeconds: parseInt(
process.env.QUEUE_COMPLETED_RETENTION_AGE_S ?? "3600",
10
),
failedRetentionCount: parseInt(process.env.QUEUE_FAILED_RETENTION_COUNT ?? "1000", 10),
failedRetentionAgeSeconds: parseInt(process.env.QUEUE_FAILED_RETENTION_AGE_S ?? "86400", 10),
},
}));

View File

@@ -145,6 +145,49 @@ describe("QueueService", () => {
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.baseDelay", 1000);
expect(mockConfigService.get).toHaveBeenCalledWith("orchestrator.queue.maxDelay", 60000);
});
it("should load retention configuration from ConfigService on init", async () => {
const { Queue, Worker } = await import("bullmq");
const QueueMock = Queue as unknown as ReturnType<typeof vi.fn>;
const WorkerMock = Worker as unknown as ReturnType<typeof vi.fn>;
QueueMock.mockImplementation(function (this: unknown) {
return {
add: vi.fn(),
getJobCounts: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
getJob: vi.fn(),
close: vi.fn(),
};
} as never);
WorkerMock.mockImplementation(function (this: unknown) {
return {
on: vi.fn().mockReturnThis(),
close: vi.fn(),
};
} as never);
service.onModuleInit();
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.completedRetentionAgeSeconds",
3600
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.completedRetentionCount",
100
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.failedRetentionAgeSeconds",
86400
);
expect(mockConfigService.get).toHaveBeenCalledWith(
"orchestrator.queue.failedRetentionCount",
1000
);
});
});
describe("retry configuration", () => {
@@ -301,7 +344,7 @@ describe("QueueService", () => {
});
describe("onModuleInit", () => {
it("should initialize BullMQ queue with correct configuration", async () => {
it("should initialize BullMQ queue with default retention configuration", async () => {
await service.onModuleInit();
expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", {
@@ -323,6 +366,52 @@ describe("QueueService", () => {
});
});
it("should initialize BullMQ queue with custom retention configuration", async () => {
mockConfigService.get = vi.fn((key: string, defaultValue?: unknown) => {
const config: Record<string, unknown> = {
"orchestrator.valkey.host": "localhost",
"orchestrator.valkey.port": 6379,
"orchestrator.valkey.password": undefined,
"orchestrator.queue.name": "orchestrator-tasks",
"orchestrator.queue.maxRetries": 3,
"orchestrator.queue.baseDelay": 1000,
"orchestrator.queue.maxDelay": 60000,
"orchestrator.queue.concurrency": 5,
"orchestrator.queue.completedRetentionAgeSeconds": 1800,
"orchestrator.queue.completedRetentionCount": 50,
"orchestrator.queue.failedRetentionAgeSeconds": 43200,
"orchestrator.queue.failedRetentionCount": 500,
};
return config[key] ?? defaultValue;
});
service = new QueueService(
mockValkeyService as unknown as never,
mockConfigService as unknown as never
);
vi.clearAllMocks();
await service.onModuleInit();
expect(QueueMock).toHaveBeenCalledWith("orchestrator-tasks", {
connection: {
host: "localhost",
port: 6379,
password: undefined,
},
defaultJobOptions: {
removeOnComplete: {
age: 1800,
count: 50,
},
removeOnFail: {
age: 43200,
count: 500,
},
},
});
});
it("should initialize BullMQ worker with correct configuration", async () => {
await service.onModuleInit();

View File

@@ -45,17 +45,35 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
password: this.configService.get<string>("orchestrator.valkey.password"),
};
// Read retention config
const completedRetentionAge = this.configService.get<number>(
"orchestrator.queue.completedRetentionAgeSeconds",
3600
);
const completedRetentionCount = this.configService.get<number>(
"orchestrator.queue.completedRetentionCount",
100
);
const failedRetentionAge = this.configService.get<number>(
"orchestrator.queue.failedRetentionAgeSeconds",
86400
);
const failedRetentionCount = this.configService.get<number>(
"orchestrator.queue.failedRetentionCount",
1000
);
// Create queue
this.queue = new Queue<QueuedTask>(this.queueName, {
connection,
defaultJobOptions: {
removeOnComplete: {
age: 3600, // Keep completed jobs for 1 hour
count: 100, // Keep last 100 completed jobs
age: completedRetentionAge,
count: completedRetentionCount,
},
removeOnFail: {
age: 86400, // Keep failed jobs for 24 hours
count: 1000, // Keep last 1000 failed jobs
age: failedRetentionAge,
count: failedRetentionCount,
},
},
});