feat(#462): add federation query source routing (#673)
Some checks are pending
ci/woodpecker/push/ci Pipeline is pending
ci/woodpecker/push/publish Pipeline is pending

This commit was merged in pull request #673.
This commit is contained in:
2026-06-24 21:39:45 +00:00
parent 3eeed04e17
commit 838c44086c
5 changed files with 596 additions and 1 deletions

View File

@@ -0,0 +1,255 @@
import 'reflect-metadata';
import { describe, expect, it, vi } from 'vitest';
import type { Db } from '@mosaicstack/db';
import type { FederationListResponse } from '@mosaicstack/types';
import {
FederationClientError,
type FederationClientService,
} from '../federation-client.service.js';
import { type QuerySourceError, QuerySourceService } from '../query-source.service.js';
interface TestRow {
id: string;
title: string;
}
interface PeerRow {
id: string;
commonName: string;
endpointUrl: string | null;
clientKeyPem: string | null;
state: 'active' | 'pending' | 'suspended' | 'revoked';
}
const LOCAL_ROWS: TestRow[] = [
{ id: 'local-1', title: 'Local One' },
{ id: 'local-2', title: 'Local Two' },
];
const PEER_A: PeerRow = {
id: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
commonName: 'peer-a',
endpointUrl: 'https://peer-a.example.com',
clientKeyPem: 'sealed-key-a',
state: 'active',
};
const PEER_B: PeerRow = {
id: 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
commonName: 'peer-b',
endpointUrl: 'https://peer-b.example.com',
clientKeyPem: 'sealed-key-b',
state: 'active',
};
const PEER_LOCALHOST: PeerRow = {
id: 'cccccccc-cccc-cccc-cccc-cccccccccccc',
commonName: 'peer-localhost',
endpointUrl: 'https://localhost:3001',
clientKeyPem: 'sealed-key-c',
state: 'active',
};
function makeDb(activePeers: PeerRow[]): Db {
const orderBy = vi.fn().mockResolvedValue(activePeers);
const where = vi.fn().mockReturnValue({ orderBy });
const from = vi.fn().mockReturnValue({ where });
const select = vi.fn().mockReturnValue({ from });
return {
select,
insert: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
transaction: vi.fn(),
} as unknown as Db;
}
function makeFederationClient(
list: (
peerId: string,
resource: string,
request: Record<string, unknown>,
) => Promise<FederationListResponse<TestRow>>,
): FederationClientService {
return {
list: list as unknown as FederationClientService['list'],
} as FederationClientService;
}
function makeLocalResponse(rows: TestRow[] = LOCAL_ROWS): Promise<FederationListResponse<TestRow>> {
return Promise.resolve({ items: rows });
}
describe('QuerySourceService', () => {
it('routes source="local" to the local executor and tags rows as local', async () => {
const list = vi.fn(async (): Promise<FederationListResponse<TestRow>> => ({ items: [] }));
const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'local',
resource: 'tasks',
request: { cursor: 'ignored-for-local-test' },
local: () => makeLocalResponse(),
});
expect(result).toEqual({
items: [
{ id: 'local-1', title: 'Local One', _source: 'local' },
{ id: 'local-2', title: 'Local Two', _source: 'local' },
],
});
expect(list).not.toHaveBeenCalled();
});
it('routes source="federated:<host>" to the matching active peer and tags rows with peer commonName', async () => {
const list = vi.fn(
async (): Promise<FederationListResponse<TestRow>> => ({
items: [{ id: 'remote-1', title: 'Remote One' }],
}),
);
const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'federated:peer-b.example.com',
resource: 'tasks',
request: { status: 'open' },
local: () => makeLocalResponse(),
});
expect(result).toEqual({
items: [{ id: 'remote-1', title: 'Remote One', _source: 'peer-b' }],
});
expect(list).toHaveBeenCalledWith(PEER_B.id, 'tasks', { status: 'open' });
});
it('matches federated hosts by endpoint host including non-default port', async () => {
const list = vi.fn(
async (): Promise<FederationListResponse<TestRow>> => ({
items: [{ id: 'remote-port', title: 'Remote Port' }],
}),
);
const service = new QuerySourceService(makeDb([PEER_LOCALHOST]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'federated:localhost:3001',
resource: 'tasks',
request: {},
local: () => makeLocalResponse(),
});
expect(result).toEqual({
items: [{ id: 'remote-port', title: 'Remote Port', _source: 'peer-localhost' }],
});
expect(list).toHaveBeenCalledWith(PEER_LOCALHOST.id, 'tasks', {});
});
it('fans out source="all" to local plus every active outbound peer in parallel and merges tagged rows', async () => {
const callOrder: string[] = [];
const list = vi.fn(async (peerId: string): Promise<FederationListResponse<TestRow>> => {
callOrder.push(`remote-start:${peerId}`);
await Promise.resolve();
return {
items: [{ id: `remote-${peerId.slice(0, 1)}`, title: `Remote ${peerId.slice(0, 1)}` }],
};
});
const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'all',
resource: 'tasks',
request: { limit: 25 },
local: async () => {
callOrder.push('local-start');
await Promise.resolve();
return { items: [{ id: 'local-1', title: 'Local One' }] };
},
});
expect(result).toEqual({
items: [
{ id: 'local-1', title: 'Local One', _source: 'local' },
{ id: 'remote-a', title: 'Remote a', _source: 'peer-a' },
{ id: 'remote-b', title: 'Remote b', _source: 'peer-b' },
],
});
expect(list).toHaveBeenCalledTimes(2);
expect(callOrder).toEqual([
'local-start',
`remote-start:${PEER_A.id}`,
`remote-start:${PEER_B.id}`,
]);
});
it('marks source="all" as partial and truncated when any subquery returns a cursor', async () => {
const list = vi.fn(
async (): Promise<FederationListResponse<TestRow>> => ({
items: [{ id: 'remote-a', title: 'Remote A' }],
nextCursor: 'remote-next',
}),
);
const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'all',
resource: 'tasks',
request: {},
local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]),
});
expect(result).toEqual({
items: [
{ id: 'local-1', title: 'Local One', _source: 'local' },
{ id: 'remote-a', title: 'Remote A', _source: 'peer-a' },
],
_partial: true,
_truncated: true,
});
});
it('returns _partial=true for source="all" when one peer fails without dropping successful sources', async () => {
const list = vi.fn(async (peerId: string): Promise<FederationListResponse<TestRow>> => {
if (peerId === PEER_B.id) {
throw new FederationClientError({
code: 'NETWORK',
message: 'peer unavailable',
peerId,
});
}
return { items: [{ id: 'remote-a', title: 'Remote A' }] };
});
const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list));
const result = await service.list<TestRow>({
source: 'all',
resource: 'tasks',
request: {},
local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]),
});
expect(result).toEqual({
items: [
{ id: 'local-1', title: 'Local One', _source: 'local' },
{ id: 'remote-a', title: 'Remote A', _source: 'peer-a' },
],
_partial: true,
});
});
it('throws QuerySourceError when a federated host does not match an active outbound peer', async () => {
const list = vi.fn(async (): Promise<FederationListResponse<TestRow>> => ({ items: [] }));
const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list));
await expect(
service.list<TestRow>({
source: 'federated:missing.example.com',
resource: 'tasks',
request: {},
local: () => makeLocalResponse(),
}),
).rejects.toMatchObject({
name: 'QuerySourceError',
code: 'PEER_NOT_FOUND',
} satisfies Partial<QuerySourceError>);
});
});