Files
stack/apps/gateway/src/federation/client/query-source.service.ts
jason.woltje 838c44086c
Some checks are pending
ci/woodpecker/push/ci Pipeline is pending
ci/woodpecker/push/publish Pipeline is pending
feat(#462): add federation query source routing (#673)
2026-06-24 21:39:45 +00:00

262 lines
8.1 KiB
TypeScript

/**
* QuerySourceService — gateway query source router (FED-M3-09).
*
* Accepts the federation query-layer `source` selector and routes list-style
* reads to local storage, one federated peer, or all active outbound peers.
*
* `source: "all"` is intentionally tolerant of per-peer failures: local data
* and successful peer responses are returned, and the envelope is marked
* `_partial: true`. Local failures still reject because there is no safe local
* fallback and the gateway's own storage is expected to be authoritative.
*/
import { Inject, Injectable, Logger } from '@nestjs/common';
import { and, eq, federationPeers, isNotNull, type Db } from '@mosaicstack/db';
import {
SOURCE_LOCAL,
tagWithSource,
type FederationListResponse,
type SourceTag,
} from '@mosaicstack/types';
import { DB } from '../../database/database.module.js';
import { FederationClientService } from './federation-client.service.js';
export type QuerySource = 'local' | 'all' | `federated:${string}`;
export type QuerySourceErrorCode = 'INVALID_SOURCE' | 'PEER_NOT_FOUND';
export interface QuerySourceErrorOptions {
code: QuerySourceErrorCode;
message: string;
source: string;
}
export class QuerySourceError extends Error {
readonly code: QuerySourceErrorCode;
readonly source: string;
constructor(opts: QuerySourceErrorOptions) {
super(opts.message);
this.name = 'QuerySourceError';
this.code = opts.code;
this.source = opts.source;
}
}
export type LocalListExecutor<T extends object> = () => Promise<FederationListResponse<T> | T[]>;
export interface QuerySourceListOptions<T extends object> {
source: QuerySource;
resource: string;
request?: Record<string, unknown>;
local: LocalListExecutor<T>;
}
export type QuerySourceListResponse<T extends object> = FederationListResponse<T & SourceTag>;
interface OutboundPeer {
id: string;
commonName: string;
endpointUrl: string;
}
interface TaggedList<T extends object> {
items: Array<T & SourceTag>;
partial: boolean;
truncated: boolean;
nextCursor?: string;
}
@Injectable()
export class QuerySourceService {
private readonly logger = new Logger(QuerySourceService.name);
constructor(
@Inject(DB) private readonly db: Db,
@Inject(FederationClientService) private readonly federationClient: FederationClientService,
) {}
async list<T extends object>(
options: QuerySourceListOptions<T>,
): Promise<QuerySourceListResponse<T>> {
const request = options.request ?? {};
if (options.source === 'local') {
const local = await this.runLocal(options.local);
return this.toResponse(this.tagList(local, SOURCE_LOCAL));
}
if (options.source === 'all') {
return this.listAll(options.resource, request, options.local);
}
if (options.source.startsWith('federated:')) {
const host = options.source.slice('federated:'.length).trim();
if (!host) {
throw new QuerySourceError({
code: 'INVALID_SOURCE',
message: 'Federated source must include a host after federated:',
source: options.source,
});
}
const peer = await this.findPeerByHost(host, options.source);
const remote = await this.federationClient.list<T>(peer.id, options.resource, request);
return this.toResponse(this.tagList(remote, peer.commonName));
}
throw new QuerySourceError({
code: 'INVALID_SOURCE',
message: `Unsupported query source: ${options.source}`,
source: options.source,
});
}
private async listAll<T extends object>(
resource: string,
request: Record<string, unknown>,
local: LocalListExecutor<T>,
): Promise<QuerySourceListResponse<T>> {
const peers = await this.listActiveOutboundPeers();
const localPromise = this.runLocal(local).then((response) =>
this.tagList(response, SOURCE_LOCAL),
);
const remotePromises = peers.map(async (peer: OutboundPeer): Promise<TaggedList<T> | null> => {
try {
const response = await this.federationClient.list<T>(peer.id, resource, request);
return this.tagList(response, peer.commonName);
} catch (error: unknown) {
this.logger.warn(
`Federated query to peer ${peer.commonName} (${peer.id}) failed; returning partial all-source response: ${
error instanceof Error ? error.message : String(error)
}`,
);
return null;
}
});
const [localResult, ...remoteResults] = await Promise.all([localPromise, ...remotePromises]);
const successfulRemoteResults = remoteResults.filter(
(result: TaggedList<T> | null): result is TaggedList<T> => result !== null,
);
const allResults = [localResult, ...successfulRemoteResults];
const peerFailure = successfulRemoteResults.length !== peers.length;
return this.mergeTaggedLists(allResults, peerFailure);
}
private async runLocal<T extends object>(
local: LocalListExecutor<T>,
): Promise<FederationListResponse<T>> {
const response = await local();
if (Array.isArray(response)) {
return { items: response };
}
return response;
}
private tagList<T extends object>(
response: FederationListResponse<T>,
source: string,
): TaggedList<T> {
return {
items: tagWithSource(response.items, source),
partial: response._partial === true,
truncated: response._truncated === true || response.nextCursor !== undefined,
nextCursor: response.nextCursor,
};
}
private mergeTaggedLists<T extends object>(
lists: Array<TaggedList<T>>,
peerFailure: boolean,
): QuerySourceListResponse<T> {
const items = lists.flatMap((list: TaggedList<T>) => list.items);
const partial =
peerFailure ||
lists.some((list: TaggedList<T>) => list.partial || list.nextCursor !== undefined);
const truncated = lists.some((list: TaggedList<T>) => list.truncated);
const response: QuerySourceListResponse<T> = { items };
if (partial) {
response._partial = true;
}
if (truncated) {
response._truncated = true;
}
return response;
}
private toResponse<T extends object>(tagged: TaggedList<T>): QuerySourceListResponse<T> {
const response: QuerySourceListResponse<T> = {
items: tagged.items,
};
if (tagged.nextCursor !== undefined) {
response.nextCursor = tagged.nextCursor;
}
if (tagged.partial) {
response._partial = true;
}
if (tagged.truncated) {
response._truncated = true;
}
return response;
}
private async findPeerByHost(sourceHost: string, source: string): Promise<OutboundPeer> {
const host = normalizeHost(sourceHost);
const peers = await this.listActiveOutboundPeers();
const peer = peers.find((candidate: OutboundPeer) => {
const commonName = normalizeHost(candidate.commonName);
const endpointHosts = endpointHostKeys(candidate.endpointUrl).map((endpointHost: string) =>
normalizeHost(endpointHost),
);
return commonName === host || endpointHosts.includes(host);
});
if (!peer) {
throw new QuerySourceError({
code: 'PEER_NOT_FOUND',
message: `No active outbound federation peer matches source ${source}`,
source,
});
}
return peer;
}
private async listActiveOutboundPeers(): Promise<OutboundPeer[]> {
const rows = await this.db
.select({
id: federationPeers.id,
commonName: federationPeers.commonName,
endpointUrl: federationPeers.endpointUrl,
})
.from(federationPeers)
.where(
and(
eq(federationPeers.state, 'active'),
isNotNull(federationPeers.endpointUrl),
isNotNull(federationPeers.clientKeyPem),
),
)
.orderBy(federationPeers.commonName);
return rows.filter((row): row is OutboundPeer => typeof row.endpointUrl === 'string');
}
}
function normalizeHost(host: string): string {
return host.trim().toLowerCase();
}
function endpointHostKeys(endpointUrl: string): string[] {
try {
const url = new URL(endpointUrl);
return Array.from(new Set([url.host, url.hostname].filter((host: string) => host.length > 0)));
} catch {
return [];
}
}