feat(queue): implement local adapter with JSON persistence
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
81
packages/queue/src/adapters/local.test.ts
Normal file
81
packages/queue/src/adapters/local.test.ts
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import { mkdtempSync, rmSync } from 'node:fs';
|
||||||
|
import { join } from 'node:path';
|
||||||
|
import { tmpdir } from 'node:os';
|
||||||
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||||
|
|
||||||
|
import type { TaskPayload } from '../types.js';
|
||||||
|
import { createLocalAdapter } from './local.js';
|
||||||
|
|
||||||
|
function makePayload(id: string): TaskPayload {
|
||||||
|
return { id, type: 'test', data: { value: id }, createdAt: new Date().toISOString() };
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('LocalAdapter', () => {
|
||||||
|
let dataDir: string;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
dataDir = mkdtempSync(join(tmpdir(), 'mosaic-queue-test-'));
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
rmSync(dataDir, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('enqueue + dequeue in FIFO order', async () => {
|
||||||
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
||||||
|
const a = makePayload('a');
|
||||||
|
const b = makePayload('b');
|
||||||
|
const c = makePayload('c');
|
||||||
|
|
||||||
|
await adapter.enqueue('tasks', a);
|
||||||
|
await adapter.enqueue('tasks', b);
|
||||||
|
await adapter.enqueue('tasks', c);
|
||||||
|
|
||||||
|
expect(await adapter.dequeue('tasks')).toEqual(a);
|
||||||
|
expect(await adapter.dequeue('tasks')).toEqual(b);
|
||||||
|
expect(await adapter.dequeue('tasks')).toEqual(c);
|
||||||
|
expect(await adapter.dequeue('tasks')).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('length accuracy', async () => {
|
||||||
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
||||||
|
|
||||||
|
expect(await adapter.length('q')).toBe(0);
|
||||||
|
await adapter.enqueue('q', makePayload('1'));
|
||||||
|
await adapter.enqueue('q', makePayload('2'));
|
||||||
|
expect(await adapter.length('q')).toBe(2);
|
||||||
|
await adapter.dequeue('q');
|
||||||
|
expect(await adapter.length('q')).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('publish + subscribe delivery', async () => {
|
||||||
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
||||||
|
const received: string[] = [];
|
||||||
|
|
||||||
|
const unsub = adapter.subscribe('chan', (msg) => received.push(msg));
|
||||||
|
await adapter.publish('chan', 'hello');
|
||||||
|
await adapter.publish('chan', 'world');
|
||||||
|
|
||||||
|
expect(received).toEqual(['hello', 'world']);
|
||||||
|
|
||||||
|
unsub();
|
||||||
|
await adapter.publish('chan', 'after-unsub');
|
||||||
|
expect(received).toEqual(['hello', 'world']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('persistence survives close and re-create', async () => {
|
||||||
|
const p1 = makePayload('x');
|
||||||
|
const p2 = makePayload('y');
|
||||||
|
|
||||||
|
const adapter1 = createLocalAdapter({ type: 'local', dataDir });
|
||||||
|
await adapter1.enqueue('persist-q', p1);
|
||||||
|
await adapter1.enqueue('persist-q', p2);
|
||||||
|
await adapter1.close();
|
||||||
|
|
||||||
|
const adapter2 = createLocalAdapter({ type: 'local', dataDir });
|
||||||
|
expect(await adapter2.length('persist-q')).toBe(2);
|
||||||
|
expect(await adapter2.dequeue('persist-q')).toEqual(p1);
|
||||||
|
expect(await adapter2.dequeue('persist-q')).toEqual(p2);
|
||||||
|
await adapter2.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
87
packages/queue/src/adapters/local.ts
Normal file
87
packages/queue/src/adapters/local.ts
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
import { mkdirSync, readFileSync, readdirSync, writeFileSync } from 'node:fs';
|
||||||
|
import { join } from 'node:path';
|
||||||
|
import { EventEmitter } from 'node:events';
|
||||||
|
|
||||||
|
import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js';
|
||||||
|
|
||||||
|
const DEFAULT_DATA_DIR = '.mosaic/queue';
|
||||||
|
|
||||||
|
export function createLocalAdapter(config: QueueConfig): QueueAdapter {
|
||||||
|
if (config.type !== 'local') {
|
||||||
|
throw new Error(`Expected config type "local", got "${config.type}"`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const dataDir = config.dataDir ?? DEFAULT_DATA_DIR;
|
||||||
|
const queues = new Map<string, TaskPayload[]>();
|
||||||
|
const emitter = new EventEmitter();
|
||||||
|
|
||||||
|
mkdirSync(dataDir, { recursive: true });
|
||||||
|
|
||||||
|
// Load existing JSON files on startup
|
||||||
|
for (const file of readdirSync(dataDir)) {
|
||||||
|
if (!file.endsWith('.json')) continue;
|
||||||
|
const queueName = file.slice(0, -5);
|
||||||
|
try {
|
||||||
|
const raw = readFileSync(join(dataDir, file), 'utf-8');
|
||||||
|
const items = JSON.parse(raw) as TaskPayload[];
|
||||||
|
if (Array.isArray(items)) {
|
||||||
|
queues.set(queueName, items);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Ignore corrupt files
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function persist(queueName: string): void {
|
||||||
|
const items = queues.get(queueName) ?? [];
|
||||||
|
writeFileSync(join(dataDir, `${queueName}.json`), JSON.stringify(items), 'utf-8');
|
||||||
|
}
|
||||||
|
|
||||||
|
function getQueue(queueName: string): TaskPayload[] {
|
||||||
|
let q = queues.get(queueName);
|
||||||
|
if (!q) {
|
||||||
|
q = [];
|
||||||
|
queues.set(queueName, q);
|
||||||
|
}
|
||||||
|
return q;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
name: 'local',
|
||||||
|
|
||||||
|
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
|
||||||
|
getQueue(queueName).push(payload);
|
||||||
|
persist(queueName);
|
||||||
|
},
|
||||||
|
|
||||||
|
async dequeue(queueName: string): Promise<TaskPayload | null> {
|
||||||
|
const q = getQueue(queueName);
|
||||||
|
const item = q.shift() ?? null;
|
||||||
|
persist(queueName);
|
||||||
|
return item;
|
||||||
|
},
|
||||||
|
|
||||||
|
async length(queueName: string): Promise<number> {
|
||||||
|
return getQueue(queueName).length;
|
||||||
|
},
|
||||||
|
|
||||||
|
async publish(channel: string, message: string): Promise<void> {
|
||||||
|
emitter.emit(channel, message);
|
||||||
|
},
|
||||||
|
|
||||||
|
subscribe(channel: string, handler: (message: string) => void): () => void {
|
||||||
|
emitter.on(channel, handler);
|
||||||
|
return () => {
|
||||||
|
emitter.off(channel, handler);
|
||||||
|
};
|
||||||
|
},
|
||||||
|
|
||||||
|
async close(): Promise<void> {
|
||||||
|
for (const queueName of queues.keys()) {
|
||||||
|
persist(queueName);
|
||||||
|
}
|
||||||
|
queues.clear();
|
||||||
|
emitter.removeAllListeners();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -10,8 +10,11 @@ export {
|
|||||||
export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js';
|
export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js';
|
||||||
export { createQueueAdapter, registerQueueAdapter } from './factory.js';
|
export { createQueueAdapter, registerQueueAdapter } from './factory.js';
|
||||||
export { createBullMQAdapter } from './adapters/bullmq.js';
|
export { createBullMQAdapter } from './adapters/bullmq.js';
|
||||||
|
export { createLocalAdapter } from './adapters/local.js';
|
||||||
|
|
||||||
import { registerQueueAdapter } from './factory.js';
|
import { registerQueueAdapter } from './factory.js';
|
||||||
import { createBullMQAdapter } from './adapters/bullmq.js';
|
import { createBullMQAdapter } from './adapters/bullmq.js';
|
||||||
|
import { createLocalAdapter } from './adapters/local.js';
|
||||||
|
|
||||||
registerQueueAdapter('bullmq', createBullMQAdapter);
|
registerQueueAdapter('bullmq', createBullMQAdapter);
|
||||||
|
registerQueueAdapter('local', createLocalAdapter);
|
||||||
|
|||||||
Reference in New Issue
Block a user