Files
stack/apps/api/src/bullmq/bullmq.service.ts
Jason Woltje e09950f225 feat(#165): Implement BullMQ module setup
Create BullMQ module that shares the existing Valkey connection for job queue processing.

Files Created:
- apps/api/src/bullmq/bullmq.module.ts - Global module configuration
- apps/api/src/bullmq/bullmq.service.ts - Queue management service
- apps/api/src/bullmq/queues.ts - Queue name constants
- apps/api/src/bullmq/index.ts - Barrel exports
- apps/api/src/bullmq/bullmq.service.spec.ts - Unit tests

Files Modified:
- apps/api/src/app.module.ts - Import BullMqModule

Queue Definitions:
- mosaic-jobs (main queue)
- mosaic-jobs-runner (read-only operations)
- mosaic-jobs-weaver (write operations)
- mosaic-jobs-inspector (validation operations)

Implementation:
- Reuses VALKEY_URL from environment (shared connection)
- Follows existing Valkey module patterns
- Includes health check methods
- Proper lifecycle management (init/destroy)
- Queue names use hyphens instead of colons (BullMQ requirement)

Quality Gates:
- Unit tests: 11 passing
- TypeScript: No errors
- ESLint: No violations
- Build: Successful

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:01:25 -06:00

187 lines
4.9 KiB
TypeScript

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common";
import { Queue, QueueOptions } from "bullmq";
import { QUEUE_NAMES, QueueName } from "./queues";
/**
* Health status interface for BullMQ
*/
export interface BullMqHealthStatus {
connected: boolean;
queues: Record<string, number>;
}
/**
* BullMqService - Job queue service using BullMQ with Valkey backend
*
* This service provides job queue operations for the Mosaic Component Architecture:
* - Main queue for general purpose jobs
* - Runner queue for read-only operations
* - Weaver queue for write operations
* - Inspector queue for validation operations
*
* Shares the same Valkey connection used by ValkeyService (VALKEY_URL).
*/
@Injectable()
export class BullMqService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(BullMqService.name);
private readonly queues = new Map<string, Queue>();
async onModuleInit(): Promise<void> {
const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379";
this.logger.log(`Initializing BullMQ with Valkey at ${valkeyUrl}`);
// Parse Redis URL for connection options
const connectionOptions = this.parseRedisUrl(valkeyUrl);
const queueOptions: QueueOptions = {
connection: connectionOptions,
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
removeOnComplete: {
age: 3600, // Keep completed jobs for 1 hour
count: 1000, // Keep last 1000 completed jobs
},
removeOnFail: {
age: 86400, // Keep failed jobs for 24 hours
},
},
};
// Create all queues
await this.createQueue(QUEUE_NAMES.MAIN, queueOptions);
await this.createQueue(QUEUE_NAMES.RUNNER, queueOptions);
await this.createQueue(QUEUE_NAMES.WEAVER, queueOptions);
await this.createQueue(QUEUE_NAMES.INSPECTOR, queueOptions);
this.logger.log(`BullMQ initialized with ${this.queues.size.toString()} queues`);
}
async onModuleDestroy(): Promise<void> {
this.logger.log("Closing BullMQ queues");
for (const [name, queue] of this.queues.entries()) {
await queue.close();
this.logger.log(`Queue closed: ${name}`);
}
this.queues.clear();
}
/**
* Create a queue with the given name and options
*/
private async createQueue(name: QueueName, options: QueueOptions): Promise<Queue> {
const queue = new Queue(name, options);
// Wait for queue to be ready
await queue.waitUntilReady();
this.queues.set(name, queue);
this.logger.log(`Queue created: ${name}`);
return queue;
}
/**
* Get a queue by name
*/
getQueue(name: QueueName): Queue | null {
return this.queues.get(name) ?? null;
}
/**
* Get all queues
*/
getQueues(): Map<string, Queue> {
return this.queues;
}
/**
* Add a job to a queue
*/
async addJob(
queueName: QueueName,
jobName: string,
data: unknown,
options?: {
priority?: number;
delay?: number;
attempts?: number;
}
): Promise<ReturnType<Queue["add"]>> {
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`Queue not found: ${queueName}`);
}
const job = await queue.add(jobName, data, options);
this.logger.log(`Job added to ${queueName}: ${jobName} (id: ${job.id ?? "unknown"})`);
return job;
}
/**
* Health check - verify all queues are connected
*/
async healthCheck(): Promise<boolean> {
try {
for (const queue of this.queues.values()) {
// Check if queue client is connected
const client = await queue.client;
await client.ping();
}
return true;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error("BullMQ health check failed:", errorMessage);
return false;
}
}
/**
* Get health status with queue counts
*/
async getHealthStatus(): Promise<BullMqHealthStatus> {
const connected = await this.healthCheck();
const queues: Record<string, number> = {};
for (const [name, queue] of this.queues.entries()) {
try {
const count = await queue.count();
queues[name] = count;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to get count for queue ${name}:`, errorMessage);
queues[name] = -1;
}
}
return { connected, queues };
}
/**
* Parse Redis URL into connection options
*/
private parseRedisUrl(url: string): { host: string; port: number } {
try {
const parsed = new URL(url);
return {
host: parsed.hostname,
port: parseInt(parsed.port || "6379", 10),
};
} catch {
this.logger.warn(`Failed to parse Redis URL: ${url}, using defaults`);
return {
host: "localhost",
port: 6379,
};
}
}
}