Files
stack/packages/queue/src/adapters/bullmq.ts
2026-04-02 20:44:10 -05:00

51 lines
1.5 KiB
TypeScript

import Redis from 'ioredis';
import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js';
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
export function createBullMQAdapter(config: QueueConfig): QueueAdapter {
if (config.type !== 'bullmq') {
throw new Error(`Expected config type "bullmq", got "${config.type}"`);
}
const url = config.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
return {
name: 'bullmq',
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
await redis.lpush(queueName, JSON.stringify(payload));
},
async dequeue(queueName: string): Promise<TaskPayload | null> {
const item = await redis.rpop(queueName);
if (!item) return null;
return JSON.parse(item) as TaskPayload;
},
async length(queueName: string): Promise<number> {
return redis.llen(queueName);
},
async publish(channel: string, message: string): Promise<void> {
await redis.publish(channel, message);
},
subscribe(channel: string, handler: (message: string) => void): () => void {
const sub = redis.duplicate();
sub.subscribe(channel).catch(() => {});
sub.on('message', (_ch: string, msg: string) => handler(msg));
return () => {
sub.unsubscribe(channel).catch(() => {});
sub.disconnect();
};
},
async close(): Promise<void> {
await redis.quit();
},
};
}