refactor(queue): wrap ioredis as bullmq adapter behind QueueAdapter interface
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
50
packages/queue/src/adapters/bullmq.ts
Normal file
50
packages/queue/src/adapters/bullmq.ts
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import Redis from 'ioredis';
|
||||||
|
|
||||||
|
import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js';
|
||||||
|
|
||||||
|
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
|
||||||
|
|
||||||
|
export function createBullMQAdapter(config: QueueConfig): QueueAdapter {
|
||||||
|
if (config.type !== 'bullmq') {
|
||||||
|
throw new Error(`Expected config type "bullmq", got "${config.type}"`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const url = config.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
|
||||||
|
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
|
||||||
|
|
||||||
|
return {
|
||||||
|
name: 'bullmq',
|
||||||
|
|
||||||
|
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
|
||||||
|
await redis.lpush(queueName, JSON.stringify(payload));
|
||||||
|
},
|
||||||
|
|
||||||
|
async dequeue(queueName: string): Promise<TaskPayload | null> {
|
||||||
|
const item = await redis.rpop(queueName);
|
||||||
|
if (!item) return null;
|
||||||
|
return JSON.parse(item) as TaskPayload;
|
||||||
|
},
|
||||||
|
|
||||||
|
async length(queueName: string): Promise<number> {
|
||||||
|
return redis.llen(queueName);
|
||||||
|
},
|
||||||
|
|
||||||
|
async publish(channel: string, message: string): Promise<void> {
|
||||||
|
await redis.publish(channel, message);
|
||||||
|
},
|
||||||
|
|
||||||
|
subscribe(channel: string, handler: (message: string) => void): () => void {
|
||||||
|
const sub = redis.duplicate();
|
||||||
|
sub.subscribe(channel).catch(() => {});
|
||||||
|
sub.on('message', (_ch: string, msg: string) => handler(msg));
|
||||||
|
return () => {
|
||||||
|
sub.unsubscribe(channel).catch(() => {});
|
||||||
|
sub.disconnect();
|
||||||
|
};
|
||||||
|
},
|
||||||
|
|
||||||
|
async close(): Promise<void> {
|
||||||
|
await redis.quit();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -9,3 +9,9 @@ export {
|
|||||||
|
|
||||||
export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js';
|
export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js';
|
||||||
export { createQueueAdapter, registerQueueAdapter } from './factory.js';
|
export { createQueueAdapter, registerQueueAdapter } from './factory.js';
|
||||||
|
export { createBullMQAdapter } from './adapters/bullmq.js';
|
||||||
|
|
||||||
|
import { registerQueueAdapter } from './factory.js';
|
||||||
|
import { createBullMQAdapter } from './adapters/bullmq.js';
|
||||||
|
|
||||||
|
registerQueueAdapter('bullmq', createBullMQAdapter);
|
||||||
|
|||||||
Reference in New Issue
Block a user