import 'reflect-metadata'; import { describe, expect, it, vi } from 'vitest'; import type { Db } from '@mosaicstack/db'; import type { FederationListResponse } from '@mosaicstack/types'; import { FederationClientError, type FederationClientService, } from '../federation-client.service.js'; import { type QuerySourceError, QuerySourceService } from '../query-source.service.js'; interface TestRow { id: string; title: string; } interface PeerRow { id: string; commonName: string; endpointUrl: string | null; clientKeyPem: string | null; state: 'active' | 'pending' | 'suspended' | 'revoked'; } const LOCAL_ROWS: TestRow[] = [ { id: 'local-1', title: 'Local One' }, { id: 'local-2', title: 'Local Two' }, ]; const PEER_A: PeerRow = { id: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', commonName: 'peer-a', endpointUrl: 'https://peer-a.example.com', clientKeyPem: 'sealed-key-a', state: 'active', }; const PEER_B: PeerRow = { id: 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', commonName: 'peer-b', endpointUrl: 'https://peer-b.example.com', clientKeyPem: 'sealed-key-b', state: 'active', }; const PEER_LOCALHOST: PeerRow = { id: 'cccccccc-cccc-cccc-cccc-cccccccccccc', commonName: 'peer-localhost', endpointUrl: 'https://localhost:3001', clientKeyPem: 'sealed-key-c', state: 'active', }; function makeDb(activePeers: PeerRow[]): Db { const orderBy = vi.fn().mockResolvedValue(activePeers); const where = vi.fn().mockReturnValue({ orderBy }); const from = vi.fn().mockReturnValue({ where }); const select = vi.fn().mockReturnValue({ from }); return { select, insert: vi.fn(), update: vi.fn(), delete: vi.fn(), transaction: vi.fn(), } as unknown as Db; } function makeFederationClient( list: ( peerId: string, resource: string, request: Record, ) => Promise>, ): FederationClientService { return { list: list as unknown as FederationClientService['list'], } as FederationClientService; } function makeLocalResponse(rows: TestRow[] = LOCAL_ROWS): Promise> { return Promise.resolve({ items: rows }); } describe('QuerySourceService', () => { it('routes source="local" to the local executor and tags rows as local', async () => { const list = vi.fn(async (): Promise> => ({ items: [] })); const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); const result = await service.list({ source: 'local', resource: 'tasks', request: { cursor: 'ignored-for-local-test' }, local: () => makeLocalResponse(), }); expect(result).toEqual({ items: [ { id: 'local-1', title: 'Local One', _source: 'local' }, { id: 'local-2', title: 'Local Two', _source: 'local' }, ], }); expect(list).not.toHaveBeenCalled(); }); it('routes source="federated:" to the matching active peer and tags rows with peer commonName', async () => { const list = vi.fn( async (): Promise> => ({ items: [{ id: 'remote-1', title: 'Remote One' }], }), ); const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); const result = await service.list({ source: 'federated:peer-b.example.com', resource: 'tasks', request: { status: 'open' }, local: () => makeLocalResponse(), }); expect(result).toEqual({ items: [{ id: 'remote-1', title: 'Remote One', _source: 'peer-b' }], }); expect(list).toHaveBeenCalledWith(PEER_B.id, 'tasks', { status: 'open' }); }); it('matches federated hosts by endpoint host including non-default port', async () => { const list = vi.fn( async (): Promise> => ({ items: [{ id: 'remote-port', title: 'Remote Port' }], }), ); const service = new QuerySourceService(makeDb([PEER_LOCALHOST]), makeFederationClient(list)); const result = await service.list({ source: 'federated:localhost:3001', resource: 'tasks', request: {}, local: () => makeLocalResponse(), }); expect(result).toEqual({ items: [{ id: 'remote-port', title: 'Remote Port', _source: 'peer-localhost' }], }); expect(list).toHaveBeenCalledWith(PEER_LOCALHOST.id, 'tasks', {}); }); it('fans out source="all" to local plus every active outbound peer in parallel and merges tagged rows', async () => { const callOrder: string[] = []; const list = vi.fn(async (peerId: string): Promise> => { callOrder.push(`remote-start:${peerId}`); await Promise.resolve(); return { items: [{ id: `remote-${peerId.slice(0, 1)}`, title: `Remote ${peerId.slice(0, 1)}` }], }; }); const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); const result = await service.list({ source: 'all', resource: 'tasks', request: { limit: 25 }, local: async () => { callOrder.push('local-start'); await Promise.resolve(); return { items: [{ id: 'local-1', title: 'Local One' }] }; }, }); expect(result).toEqual({ items: [ { id: 'local-1', title: 'Local One', _source: 'local' }, { id: 'remote-a', title: 'Remote a', _source: 'peer-a' }, { id: 'remote-b', title: 'Remote b', _source: 'peer-b' }, ], }); expect(list).toHaveBeenCalledTimes(2); expect(callOrder).toEqual([ 'local-start', `remote-start:${PEER_A.id}`, `remote-start:${PEER_B.id}`, ]); }); it('marks source="all" as partial and truncated when any subquery returns a cursor', async () => { const list = vi.fn( async (): Promise> => ({ items: [{ id: 'remote-a', title: 'Remote A' }], nextCursor: 'remote-next', }), ); const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); const result = await service.list({ source: 'all', resource: 'tasks', request: {}, local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]), }); expect(result).toEqual({ items: [ { id: 'local-1', title: 'Local One', _source: 'local' }, { id: 'remote-a', title: 'Remote A', _source: 'peer-a' }, ], _partial: true, _truncated: true, }); }); it('returns _partial=true for source="all" when one peer fails without dropping successful sources', async () => { const list = vi.fn(async (peerId: string): Promise> => { if (peerId === PEER_B.id) { throw new FederationClientError({ code: 'NETWORK', message: 'peer unavailable', peerId, }); } return { items: [{ id: 'remote-a', title: 'Remote A' }] }; }); const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); const result = await service.list({ source: 'all', resource: 'tasks', request: {}, local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]), }); expect(result).toEqual({ items: [ { id: 'local-1', title: 'Local One', _source: 'local' }, { id: 'remote-a', title: 'Remote A', _source: 'peer-a' }, ], _partial: true, }); }); it('throws QuerySourceError when a federated host does not match an active outbound peer', async () => { const list = vi.fn(async (): Promise> => ({ items: [] })); const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); await expect( service.list({ source: 'federated:missing.example.com', resource: 'tasks', request: {}, local: () => makeLocalResponse(), }), ).rejects.toMatchObject({ name: 'QuerySourceError', code: 'PEER_NOT_FOUND', } satisfies Partial); }); });