feat(#165): Implement BullMQ module setup
Create BullMQ module that shares the existing Valkey connection for job queue processing. Files Created: - apps/api/src/bullmq/bullmq.module.ts - Global module configuration - apps/api/src/bullmq/bullmq.service.ts - Queue management service - apps/api/src/bullmq/queues.ts - Queue name constants - apps/api/src/bullmq/index.ts - Barrel exports - apps/api/src/bullmq/bullmq.service.spec.ts - Unit tests Files Modified: - apps/api/src/app.module.ts - Import BullMqModule Queue Definitions: - mosaic-jobs (main queue) - mosaic-jobs-runner (read-only operations) - mosaic-jobs-weaver (write operations) - mosaic-jobs-inspector (validation operations) Implementation: - Reuses VALKEY_URL from environment (shared connection) - Follows existing Valkey module patterns - Includes health check methods - Proper lifecycle management (init/destroy) - Queue names use hyphens instead of colons (BullMQ requirement) Quality Gates: - Unit tests: 11 passing - TypeScript: No errors - ESLint: No violations - Build: Successful Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
186
apps/api/src/bullmq/bullmq.service.ts
Normal file
186
apps/api/src/bullmq/bullmq.service.ts
Normal file
@@ -0,0 +1,186 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common";
|
||||
import { Queue, QueueOptions } from "bullmq";
|
||||
import { QUEUE_NAMES, QueueName } from "./queues";
|
||||
|
||||
/**
|
||||
* Health status interface for BullMQ
|
||||
*/
|
||||
export interface BullMqHealthStatus {
|
||||
connected: boolean;
|
||||
queues: Record<string, number>;
|
||||
}
|
||||
|
||||
/**
|
||||
* BullMqService - Job queue service using BullMQ with Valkey backend
|
||||
*
|
||||
* This service provides job queue operations for the Mosaic Component Architecture:
|
||||
* - Main queue for general purpose jobs
|
||||
* - Runner queue for read-only operations
|
||||
* - Weaver queue for write operations
|
||||
* - Inspector queue for validation operations
|
||||
*
|
||||
* Shares the same Valkey connection used by ValkeyService (VALKEY_URL).
|
||||
*/
|
||||
@Injectable()
|
||||
export class BullMqService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(BullMqService.name);
|
||||
private readonly queues = new Map<string, Queue>();
|
||||
|
||||
async onModuleInit(): Promise<void> {
|
||||
const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379";
|
||||
|
||||
this.logger.log(`Initializing BullMQ with Valkey at ${valkeyUrl}`);
|
||||
|
||||
// Parse Redis URL for connection options
|
||||
const connectionOptions = this.parseRedisUrl(valkeyUrl);
|
||||
|
||||
const queueOptions: QueueOptions = {
|
||||
connection: connectionOptions,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 1000,
|
||||
},
|
||||
removeOnComplete: {
|
||||
age: 3600, // Keep completed jobs for 1 hour
|
||||
count: 1000, // Keep last 1000 completed jobs
|
||||
},
|
||||
removeOnFail: {
|
||||
age: 86400, // Keep failed jobs for 24 hours
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Create all queues
|
||||
await this.createQueue(QUEUE_NAMES.MAIN, queueOptions);
|
||||
await this.createQueue(QUEUE_NAMES.RUNNER, queueOptions);
|
||||
await this.createQueue(QUEUE_NAMES.WEAVER, queueOptions);
|
||||
await this.createQueue(QUEUE_NAMES.INSPECTOR, queueOptions);
|
||||
|
||||
this.logger.log(`BullMQ initialized with ${this.queues.size.toString()} queues`);
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
this.logger.log("Closing BullMQ queues");
|
||||
|
||||
for (const [name, queue] of this.queues.entries()) {
|
||||
await queue.close();
|
||||
this.logger.log(`Queue closed: ${name}`);
|
||||
}
|
||||
|
||||
this.queues.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a queue with the given name and options
|
||||
*/
|
||||
private async createQueue(name: QueueName, options: QueueOptions): Promise<Queue> {
|
||||
const queue = new Queue(name, options);
|
||||
|
||||
// Wait for queue to be ready
|
||||
await queue.waitUntilReady();
|
||||
|
||||
this.queues.set(name, queue);
|
||||
this.logger.log(`Queue created: ${name}`);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a queue by name
|
||||
*/
|
||||
getQueue(name: QueueName): Queue | null {
|
||||
return this.queues.get(name) ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all queues
|
||||
*/
|
||||
getQueues(): Map<string, Queue> {
|
||||
return this.queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a job to a queue
|
||||
*/
|
||||
async addJob(
|
||||
queueName: QueueName,
|
||||
jobName: string,
|
||||
data: unknown,
|
||||
options?: {
|
||||
priority?: number;
|
||||
delay?: number;
|
||||
attempts?: number;
|
||||
}
|
||||
): Promise<ReturnType<Queue["add"]>> {
|
||||
const queue = this.queues.get(queueName);
|
||||
|
||||
if (!queue) {
|
||||
throw new Error(`Queue not found: ${queueName}`);
|
||||
}
|
||||
|
||||
const job = await queue.add(jobName, data, options);
|
||||
this.logger.log(`Job added to ${queueName}: ${jobName} (id: ${job.id ?? "unknown"})`);
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Health check - verify all queues are connected
|
||||
*/
|
||||
async healthCheck(): Promise<boolean> {
|
||||
try {
|
||||
for (const queue of this.queues.values()) {
|
||||
// Check if queue client is connected
|
||||
const client = await queue.client;
|
||||
await client.ping();
|
||||
}
|
||||
return true;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error("BullMQ health check failed:", errorMessage);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get health status with queue counts
|
||||
*/
|
||||
async getHealthStatus(): Promise<BullMqHealthStatus> {
|
||||
const connected = await this.healthCheck();
|
||||
const queues: Record<string, number> = {};
|
||||
|
||||
for (const [name, queue] of this.queues.entries()) {
|
||||
try {
|
||||
const count = await queue.count();
|
||||
queues[name] = count;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Failed to get count for queue ${name}:`, errorMessage);
|
||||
queues[name] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
return { connected, queues };
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse Redis URL into connection options
|
||||
*/
|
||||
private parseRedisUrl(url: string): { host: string; port: number } {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
return {
|
||||
host: parsed.hostname,
|
||||
port: parseInt(parsed.port || "6379", 10),
|
||||
};
|
||||
} catch {
|
||||
this.logger.warn(`Failed to parse Redis URL: ${url}, using defaults`);
|
||||
return {
|
||||
host: "localhost",
|
||||
port: 6379,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user