82 lines
2.6 KiB
TypeScript
82 lines
2.6 KiB
TypeScript
import { mkdtempSync, rmSync } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
import { tmpdir } from 'node:os';
|
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
|
|
|
import type { TaskPayload } from '../types.js';
|
|
import { createLocalAdapter } from './local.js';
|
|
|
|
function makePayload(id: string): TaskPayload {
|
|
return { id, type: 'test', data: { value: id }, createdAt: new Date().toISOString() };
|
|
}
|
|
|
|
describe('LocalAdapter', () => {
|
|
let dataDir: string;
|
|
|
|
beforeEach(() => {
|
|
dataDir = mkdtempSync(join(tmpdir(), 'mosaic-queue-test-'));
|
|
});
|
|
|
|
afterEach(() => {
|
|
rmSync(dataDir, { recursive: true, force: true });
|
|
});
|
|
|
|
it('enqueue + dequeue in FIFO order', async () => {
|
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
|
const a = makePayload('a');
|
|
const b = makePayload('b');
|
|
const c = makePayload('c');
|
|
|
|
await adapter.enqueue('tasks', a);
|
|
await adapter.enqueue('tasks', b);
|
|
await adapter.enqueue('tasks', c);
|
|
|
|
expect(await adapter.dequeue('tasks')).toEqual(a);
|
|
expect(await adapter.dequeue('tasks')).toEqual(b);
|
|
expect(await adapter.dequeue('tasks')).toEqual(c);
|
|
expect(await adapter.dequeue('tasks')).toBeNull();
|
|
});
|
|
|
|
it('length accuracy', async () => {
|
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
|
|
|
expect(await adapter.length('q')).toBe(0);
|
|
await adapter.enqueue('q', makePayload('1'));
|
|
await adapter.enqueue('q', makePayload('2'));
|
|
expect(await adapter.length('q')).toBe(2);
|
|
await adapter.dequeue('q');
|
|
expect(await adapter.length('q')).toBe(1);
|
|
});
|
|
|
|
it('publish + subscribe delivery', async () => {
|
|
const adapter = createLocalAdapter({ type: 'local', dataDir });
|
|
const received: string[] = [];
|
|
|
|
const unsub = adapter.subscribe('chan', (msg) => received.push(msg));
|
|
await adapter.publish('chan', 'hello');
|
|
await adapter.publish('chan', 'world');
|
|
|
|
expect(received).toEqual(['hello', 'world']);
|
|
|
|
unsub();
|
|
await adapter.publish('chan', 'after-unsub');
|
|
expect(received).toEqual(['hello', 'world']);
|
|
});
|
|
|
|
it('persistence survives close and re-create', async () => {
|
|
const p1 = makePayload('x');
|
|
const p2 = makePayload('y');
|
|
|
|
const adapter1 = createLocalAdapter({ type: 'local', dataDir });
|
|
await adapter1.enqueue('persist-q', p1);
|
|
await adapter1.enqueue('persist-q', p2);
|
|
await adapter1.close();
|
|
|
|
const adapter2 = createLocalAdapter({ type: 'local', dataDir });
|
|
expect(await adapter2.length('persist-q')).toBe(2);
|
|
expect(await adapter2.dequeue('persist-q')).toEqual(p1);
|
|
expect(await adapter2.dequeue('persist-q')).toEqual(p2);
|
|
await adapter2.close();
|
|
});
|
|
});
|