import { mkdirSync, readFileSync, readdirSync, writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { EventEmitter } from 'node:events'; import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js'; const DEFAULT_DATA_DIR = '.mosaic/queue'; export function createLocalAdapter(config: QueueConfig): QueueAdapter { if (config.type !== 'local') { throw new Error(`Expected config type "local", got "${config.type}"`); } const dataDir = config.dataDir ?? DEFAULT_DATA_DIR; const queues = new Map(); const emitter = new EventEmitter(); mkdirSync(dataDir, { recursive: true }); // Load existing JSON files on startup for (const file of readdirSync(dataDir)) { if (!file.endsWith('.json')) continue; const queueName = file.slice(0, -5); try { const raw = readFileSync(join(dataDir, file), 'utf-8'); const items = JSON.parse(raw) as TaskPayload[]; if (Array.isArray(items)) { queues.set(queueName, items); } } catch { // Ignore corrupt files } } function persist(queueName: string): void { const items = queues.get(queueName) ?? []; writeFileSync(join(dataDir, `${queueName}.json`), JSON.stringify(items), 'utf-8'); } function getQueue(queueName: string): TaskPayload[] { let q = queues.get(queueName); if (!q) { q = []; queues.set(queueName, q); } return q; } return { name: 'local', async enqueue(queueName: string, payload: TaskPayload): Promise { getQueue(queueName).push(payload); persist(queueName); }, async dequeue(queueName: string): Promise { const q = getQueue(queueName); const item = q.shift() ?? null; persist(queueName); return item; }, async length(queueName: string): Promise { return getQueue(queueName).length; }, async publish(channel: string, message: string): Promise { emitter.emit(channel, message); }, subscribe(channel: string, handler: (message: string) => void): () => void { emitter.on(channel, handler); return () => { emitter.off(channel, handler); }; }, async close(): Promise { for (const queueName of queues.keys()) { persist(queueName); } queues.clear(); emitter.removeAllListeners(); }, }; }