diff --git a/packages/queue/src/adapters/bullmq.ts b/packages/queue/src/adapters/bullmq.ts new file mode 100644 index 0000000..6ccab95 --- /dev/null +++ b/packages/queue/src/adapters/bullmq.ts @@ -0,0 +1,50 @@ +import Redis from 'ioredis'; + +import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js'; + +const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; + +export function createBullMQAdapter(config: QueueConfig): QueueAdapter { + if (config.type !== 'bullmq') { + throw new Error(`Expected config type "bullmq", got "${config.type}"`); + } + + const url = config.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL; + const redis = new Redis(url, { maxRetriesPerRequest: 3 }); + + return { + name: 'bullmq', + + async enqueue(queueName: string, payload: TaskPayload): Promise { + await redis.lpush(queueName, JSON.stringify(payload)); + }, + + async dequeue(queueName: string): Promise { + const item = await redis.rpop(queueName); + if (!item) return null; + return JSON.parse(item) as TaskPayload; + }, + + async length(queueName: string): Promise { + return redis.llen(queueName); + }, + + async publish(channel: string, message: string): Promise { + await redis.publish(channel, message); + }, + + subscribe(channel: string, handler: (message: string) => void): () => void { + const sub = redis.duplicate(); + sub.subscribe(channel).catch(() => {}); + sub.on('message', (_ch: string, msg: string) => handler(msg)); + return () => { + sub.unsubscribe(channel).catch(() => {}); + sub.disconnect(); + }; + }, + + async close(): Promise { + await redis.quit(); + }, + }; +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 4fb76c4..6c63dfa 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -9,3 +9,9 @@ export { export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js'; export { createQueueAdapter, registerQueueAdapter } from './factory.js'; +export { createBullMQAdapter } from './adapters/bullmq.js'; + +import { registerQueueAdapter } from './factory.js'; +import { createBullMQAdapter } from './adapters/bullmq.js'; + +registerQueueAdapter('bullmq', createBullMQAdapter);