feat(#462): add federation query source routing
This commit is contained in:
261
apps/gateway/src/federation/client/query-source.service.ts
Normal file
261
apps/gateway/src/federation/client/query-source.service.ts
Normal file
@@ -0,0 +1,261 @@
|
||||
/**
|
||||
* QuerySourceService — gateway query source router (FED-M3-09).
|
||||
*
|
||||
* Accepts the federation query-layer `source` selector and routes list-style
|
||||
* reads to local storage, one federated peer, or all active outbound peers.
|
||||
*
|
||||
* `source: "all"` is intentionally tolerant of per-peer failures: local data
|
||||
* and successful peer responses are returned, and the envelope is marked
|
||||
* `_partial: true`. Local failures still reject because there is no safe local
|
||||
* fallback and the gateway's own storage is expected to be authoritative.
|
||||
*/
|
||||
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { and, eq, federationPeers, isNotNull, type Db } from '@mosaicstack/db';
|
||||
import {
|
||||
SOURCE_LOCAL,
|
||||
tagWithSource,
|
||||
type FederationListResponse,
|
||||
type SourceTag,
|
||||
} from '@mosaicstack/types';
|
||||
import { DB } from '../../database/database.module.js';
|
||||
import { FederationClientService } from './federation-client.service.js';
|
||||
|
||||
export type QuerySource = 'local' | 'all' | `federated:${string}`;
|
||||
|
||||
export type QuerySourceErrorCode = 'INVALID_SOURCE' | 'PEER_NOT_FOUND';
|
||||
|
||||
export interface QuerySourceErrorOptions {
|
||||
code: QuerySourceErrorCode;
|
||||
message: string;
|
||||
source: string;
|
||||
}
|
||||
|
||||
export class QuerySourceError extends Error {
|
||||
readonly code: QuerySourceErrorCode;
|
||||
readonly source: string;
|
||||
|
||||
constructor(opts: QuerySourceErrorOptions) {
|
||||
super(opts.message);
|
||||
this.name = 'QuerySourceError';
|
||||
this.code = opts.code;
|
||||
this.source = opts.source;
|
||||
}
|
||||
}
|
||||
|
||||
export type LocalListExecutor<T extends object> = () => Promise<FederationListResponse<T> | T[]>;
|
||||
|
||||
export interface QuerySourceListOptions<T extends object> {
|
||||
source: QuerySource;
|
||||
resource: string;
|
||||
request?: Record<string, unknown>;
|
||||
local: LocalListExecutor<T>;
|
||||
}
|
||||
|
||||
export type QuerySourceListResponse<T extends object> = FederationListResponse<T & SourceTag>;
|
||||
|
||||
interface OutboundPeer {
|
||||
id: string;
|
||||
commonName: string;
|
||||
endpointUrl: string;
|
||||
}
|
||||
|
||||
interface TaggedList<T extends object> {
|
||||
items: Array<T & SourceTag>;
|
||||
partial: boolean;
|
||||
truncated: boolean;
|
||||
nextCursor?: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class QuerySourceService {
|
||||
private readonly logger = new Logger(QuerySourceService.name);
|
||||
|
||||
constructor(
|
||||
@Inject(DB) private readonly db: Db,
|
||||
@Inject(FederationClientService) private readonly federationClient: FederationClientService,
|
||||
) {}
|
||||
|
||||
async list<T extends object>(
|
||||
options: QuerySourceListOptions<T>,
|
||||
): Promise<QuerySourceListResponse<T>> {
|
||||
const request = options.request ?? {};
|
||||
|
||||
if (options.source === 'local') {
|
||||
const local = await this.runLocal(options.local);
|
||||
return this.toResponse(this.tagList(local, SOURCE_LOCAL));
|
||||
}
|
||||
|
||||
if (options.source === 'all') {
|
||||
return this.listAll(options.resource, request, options.local);
|
||||
}
|
||||
|
||||
if (options.source.startsWith('federated:')) {
|
||||
const host = options.source.slice('federated:'.length).trim();
|
||||
if (!host) {
|
||||
throw new QuerySourceError({
|
||||
code: 'INVALID_SOURCE',
|
||||
message: 'Federated source must include a host after federated:',
|
||||
source: options.source,
|
||||
});
|
||||
}
|
||||
|
||||
const peer = await this.findPeerByHost(host, options.source);
|
||||
const remote = await this.federationClient.list<T>(peer.id, options.resource, request);
|
||||
return this.toResponse(this.tagList(remote, peer.commonName));
|
||||
}
|
||||
|
||||
throw new QuerySourceError({
|
||||
code: 'INVALID_SOURCE',
|
||||
message: `Unsupported query source: ${options.source}`,
|
||||
source: options.source,
|
||||
});
|
||||
}
|
||||
|
||||
private async listAll<T extends object>(
|
||||
resource: string,
|
||||
request: Record<string, unknown>,
|
||||
local: LocalListExecutor<T>,
|
||||
): Promise<QuerySourceListResponse<T>> {
|
||||
const peers = await this.listActiveOutboundPeers();
|
||||
|
||||
const localPromise = this.runLocal(local).then((response) =>
|
||||
this.tagList(response, SOURCE_LOCAL),
|
||||
);
|
||||
const remotePromises = peers.map(async (peer: OutboundPeer): Promise<TaggedList<T> | null> => {
|
||||
try {
|
||||
const response = await this.federationClient.list<T>(peer.id, resource, request);
|
||||
return this.tagList(response, peer.commonName);
|
||||
} catch (error: unknown) {
|
||||
this.logger.warn(
|
||||
`Federated query to peer ${peer.commonName} (${peer.id}) failed; returning partial all-source response: ${
|
||||
error instanceof Error ? error.message : String(error)
|
||||
}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
const [localResult, ...remoteResults] = await Promise.all([localPromise, ...remotePromises]);
|
||||
const successfulRemoteResults = remoteResults.filter(
|
||||
(result: TaggedList<T> | null): result is TaggedList<T> => result !== null,
|
||||
);
|
||||
const allResults = [localResult, ...successfulRemoteResults];
|
||||
const peerFailure = successfulRemoteResults.length !== peers.length;
|
||||
|
||||
return this.mergeTaggedLists(allResults, peerFailure);
|
||||
}
|
||||
|
||||
private async runLocal<T extends object>(
|
||||
local: LocalListExecutor<T>,
|
||||
): Promise<FederationListResponse<T>> {
|
||||
const response = await local();
|
||||
if (Array.isArray(response)) {
|
||||
return { items: response };
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private tagList<T extends object>(
|
||||
response: FederationListResponse<T>,
|
||||
source: string,
|
||||
): TaggedList<T> {
|
||||
return {
|
||||
items: tagWithSource(response.items, source),
|
||||
partial: response._partial === true,
|
||||
truncated: response._truncated === true || response.nextCursor !== undefined,
|
||||
nextCursor: response.nextCursor,
|
||||
};
|
||||
}
|
||||
|
||||
private mergeTaggedLists<T extends object>(
|
||||
lists: Array<TaggedList<T>>,
|
||||
peerFailure: boolean,
|
||||
): QuerySourceListResponse<T> {
|
||||
const items = lists.flatMap((list: TaggedList<T>) => list.items);
|
||||
const partial =
|
||||
peerFailure ||
|
||||
lists.some((list: TaggedList<T>) => list.partial || list.nextCursor !== undefined);
|
||||
const truncated = lists.some((list: TaggedList<T>) => list.truncated);
|
||||
|
||||
const response: QuerySourceListResponse<T> = { items };
|
||||
if (partial) {
|
||||
response._partial = true;
|
||||
}
|
||||
if (truncated) {
|
||||
response._truncated = true;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private toResponse<T extends object>(tagged: TaggedList<T>): QuerySourceListResponse<T> {
|
||||
const response: QuerySourceListResponse<T> = {
|
||||
items: tagged.items,
|
||||
};
|
||||
if (tagged.nextCursor !== undefined) {
|
||||
response.nextCursor = tagged.nextCursor;
|
||||
}
|
||||
if (tagged.partial) {
|
||||
response._partial = true;
|
||||
}
|
||||
if (tagged.truncated) {
|
||||
response._truncated = true;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private async findPeerByHost(sourceHost: string, source: string): Promise<OutboundPeer> {
|
||||
const host = normalizeHost(sourceHost);
|
||||
const peers = await this.listActiveOutboundPeers();
|
||||
const peer = peers.find((candidate: OutboundPeer) => {
|
||||
const commonName = normalizeHost(candidate.commonName);
|
||||
const endpointHosts = endpointHostKeys(candidate.endpointUrl).map((endpointHost: string) =>
|
||||
normalizeHost(endpointHost),
|
||||
);
|
||||
return commonName === host || endpointHosts.includes(host);
|
||||
});
|
||||
|
||||
if (!peer) {
|
||||
throw new QuerySourceError({
|
||||
code: 'PEER_NOT_FOUND',
|
||||
message: `No active outbound federation peer matches source ${source}`,
|
||||
source,
|
||||
});
|
||||
}
|
||||
|
||||
return peer;
|
||||
}
|
||||
|
||||
private async listActiveOutboundPeers(): Promise<OutboundPeer[]> {
|
||||
const rows = await this.db
|
||||
.select({
|
||||
id: federationPeers.id,
|
||||
commonName: federationPeers.commonName,
|
||||
endpointUrl: federationPeers.endpointUrl,
|
||||
})
|
||||
.from(federationPeers)
|
||||
.where(
|
||||
and(
|
||||
eq(federationPeers.state, 'active'),
|
||||
isNotNull(federationPeers.endpointUrl),
|
||||
isNotNull(federationPeers.clientKeyPem),
|
||||
),
|
||||
)
|
||||
.orderBy(federationPeers.commonName);
|
||||
|
||||
return rows.filter((row): row is OutboundPeer => typeof row.endpointUrl === 'string');
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeHost(host: string): string {
|
||||
return host.trim().toLowerCase();
|
||||
}
|
||||
|
||||
function endpointHostKeys(endpointUrl: string): string[] {
|
||||
try {
|
||||
const url = new URL(endpointUrl);
|
||||
return Array.from(new Set([url.host, url.hostname].filter((host: string) => host.length > 0)));
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user