diff --git a/packages/queue/src/adapters/local.test.ts b/packages/queue/src/adapters/local.test.ts new file mode 100644 index 0000000..c5f97f6 --- /dev/null +++ b/packages/queue/src/adapters/local.test.ts @@ -0,0 +1,81 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import type { TaskPayload } from '../types.js'; +import { createLocalAdapter } from './local.js'; + +function makePayload(id: string): TaskPayload { + return { id, type: 'test', data: { value: id }, createdAt: new Date().toISOString() }; +} + +describe('LocalAdapter', () => { + let dataDir: string; + + beforeEach(() => { + dataDir = mkdtempSync(join(tmpdir(), 'mosaic-queue-test-')); + }); + + afterEach(() => { + rmSync(dataDir, { recursive: true, force: true }); + }); + + it('enqueue + dequeue in FIFO order', async () => { + const adapter = createLocalAdapter({ type: 'local', dataDir }); + const a = makePayload('a'); + const b = makePayload('b'); + const c = makePayload('c'); + + await adapter.enqueue('tasks', a); + await adapter.enqueue('tasks', b); + await adapter.enqueue('tasks', c); + + expect(await adapter.dequeue('tasks')).toEqual(a); + expect(await adapter.dequeue('tasks')).toEqual(b); + expect(await adapter.dequeue('tasks')).toEqual(c); + expect(await adapter.dequeue('tasks')).toBeNull(); + }); + + it('length accuracy', async () => { + const adapter = createLocalAdapter({ type: 'local', dataDir }); + + expect(await adapter.length('q')).toBe(0); + await adapter.enqueue('q', makePayload('1')); + await adapter.enqueue('q', makePayload('2')); + expect(await adapter.length('q')).toBe(2); + await adapter.dequeue('q'); + expect(await adapter.length('q')).toBe(1); + }); + + it('publish + subscribe delivery', async () => { + const adapter = createLocalAdapter({ type: 'local', dataDir }); + const received: string[] = []; + + const unsub = adapter.subscribe('chan', (msg) => received.push(msg)); + await adapter.publish('chan', 'hello'); + await adapter.publish('chan', 'world'); + + expect(received).toEqual(['hello', 'world']); + + unsub(); + await adapter.publish('chan', 'after-unsub'); + expect(received).toEqual(['hello', 'world']); + }); + + it('persistence survives close and re-create', async () => { + const p1 = makePayload('x'); + const p2 = makePayload('y'); + + const adapter1 = createLocalAdapter({ type: 'local', dataDir }); + await adapter1.enqueue('persist-q', p1); + await adapter1.enqueue('persist-q', p2); + await adapter1.close(); + + const adapter2 = createLocalAdapter({ type: 'local', dataDir }); + expect(await adapter2.length('persist-q')).toBe(2); + expect(await adapter2.dequeue('persist-q')).toEqual(p1); + expect(await adapter2.dequeue('persist-q')).toEqual(p2); + await adapter2.close(); + }); +}); diff --git a/packages/queue/src/adapters/local.ts b/packages/queue/src/adapters/local.ts new file mode 100644 index 0000000..88c0309 --- /dev/null +++ b/packages/queue/src/adapters/local.ts @@ -0,0 +1,87 @@ +import { mkdirSync, readFileSync, readdirSync, writeFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { EventEmitter } from 'node:events'; + +import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js'; + +const DEFAULT_DATA_DIR = '.mosaic/queue'; + +export function createLocalAdapter(config: QueueConfig): QueueAdapter { + if (config.type !== 'local') { + throw new Error(`Expected config type "local", got "${config.type}"`); + } + + const dataDir = config.dataDir ?? DEFAULT_DATA_DIR; + const queues = new Map(); + const emitter = new EventEmitter(); + + mkdirSync(dataDir, { recursive: true }); + + // Load existing JSON files on startup + for (const file of readdirSync(dataDir)) { + if (!file.endsWith('.json')) continue; + const queueName = file.slice(0, -5); + try { + const raw = readFileSync(join(dataDir, file), 'utf-8'); + const items = JSON.parse(raw) as TaskPayload[]; + if (Array.isArray(items)) { + queues.set(queueName, items); + } + } catch { + // Ignore corrupt files + } + } + + function persist(queueName: string): void { + const items = queues.get(queueName) ?? []; + writeFileSync(join(dataDir, `${queueName}.json`), JSON.stringify(items), 'utf-8'); + } + + function getQueue(queueName: string): TaskPayload[] { + let q = queues.get(queueName); + if (!q) { + q = []; + queues.set(queueName, q); + } + return q; + } + + return { + name: 'local', + + async enqueue(queueName: string, payload: TaskPayload): Promise { + getQueue(queueName).push(payload); + persist(queueName); + }, + + async dequeue(queueName: string): Promise { + const q = getQueue(queueName); + const item = q.shift() ?? null; + persist(queueName); + return item; + }, + + async length(queueName: string): Promise { + return getQueue(queueName).length; + }, + + async publish(channel: string, message: string): Promise { + emitter.emit(channel, message); + }, + + subscribe(channel: string, handler: (message: string) => void): () => void { + emitter.on(channel, handler); + return () => { + emitter.off(channel, handler); + }; + }, + + async close(): Promise { + for (const queueName of queues.keys()) { + persist(queueName); + } + queues.clear(); + emitter.removeAllListeners(); + }, + }; +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 6c63dfa..0aa4089 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -10,8 +10,11 @@ export { export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js'; export { createQueueAdapter, registerQueueAdapter } from './factory.js'; export { createBullMQAdapter } from './adapters/bullmq.js'; +export { createLocalAdapter } from './adapters/local.js'; import { registerQueueAdapter } from './factory.js'; import { createBullMQAdapter } from './adapters/bullmq.js'; +import { createLocalAdapter } from './adapters/local.js'; registerQueueAdapter('bullmq', createBullMQAdapter); +registerQueueAdapter('local', createLocalAdapter);