/** * QuerySourceService — gateway query source router (FED-M3-09). * * Accepts the federation query-layer `source` selector and routes list-style * reads to local storage, one federated peer, or all active outbound peers. * * `source: "all"` is intentionally tolerant of per-peer failures: local data * and successful peer responses are returned, and the envelope is marked * `_partial: true`. Local failures still reject because there is no safe local * fallback and the gateway's own storage is expected to be authoritative. */ import { Inject, Injectable, Logger } from '@nestjs/common'; import { and, eq, federationPeers, isNotNull, type Db } from '@mosaicstack/db'; import { SOURCE_LOCAL, tagWithSource, type FederationListResponse, type SourceTag, } from '@mosaicstack/types'; import { DB } from '../../database/database.module.js'; import { FederationClientService } from './federation-client.service.js'; export type QuerySource = 'local' | 'all' | `federated:${string}`; export type QuerySourceErrorCode = 'INVALID_SOURCE' | 'PEER_NOT_FOUND'; export interface QuerySourceErrorOptions { code: QuerySourceErrorCode; message: string; source: string; } export class QuerySourceError extends Error { readonly code: QuerySourceErrorCode; readonly source: string; constructor(opts: QuerySourceErrorOptions) { super(opts.message); this.name = 'QuerySourceError'; this.code = opts.code; this.source = opts.source; } } export type LocalListExecutor = () => Promise | T[]>; export interface QuerySourceListOptions { source: QuerySource; resource: string; request?: Record; local: LocalListExecutor; } export type QuerySourceListResponse = FederationListResponse; interface OutboundPeer { id: string; commonName: string; endpointUrl: string; } interface TaggedList { items: Array; partial: boolean; truncated: boolean; nextCursor?: string; } @Injectable() export class QuerySourceService { private readonly logger = new Logger(QuerySourceService.name); constructor( @Inject(DB) private readonly db: Db, @Inject(FederationClientService) private readonly federationClient: FederationClientService, ) {} async list( options: QuerySourceListOptions, ): Promise> { const request = options.request ?? {}; if (options.source === 'local') { const local = await this.runLocal(options.local); return this.toResponse(this.tagList(local, SOURCE_LOCAL)); } if (options.source === 'all') { return this.listAll(options.resource, request, options.local); } if (options.source.startsWith('federated:')) { const host = options.source.slice('federated:'.length).trim(); if (!host) { throw new QuerySourceError({ code: 'INVALID_SOURCE', message: 'Federated source must include a host after federated:', source: options.source, }); } const peer = await this.findPeerByHost(host, options.source); const remote = await this.federationClient.list(peer.id, options.resource, request); return this.toResponse(this.tagList(remote, peer.commonName)); } throw new QuerySourceError({ code: 'INVALID_SOURCE', message: `Unsupported query source: ${options.source}`, source: options.source, }); } private async listAll( resource: string, request: Record, local: LocalListExecutor, ): Promise> { const peers = await this.listActiveOutboundPeers(); const localPromise = this.runLocal(local).then((response) => this.tagList(response, SOURCE_LOCAL), ); const remotePromises = peers.map(async (peer: OutboundPeer): Promise | null> => { try { const response = await this.federationClient.list(peer.id, resource, request); return this.tagList(response, peer.commonName); } catch (error: unknown) { this.logger.warn( `Federated query to peer ${peer.commonName} (${peer.id}) failed; returning partial all-source response: ${ error instanceof Error ? error.message : String(error) }`, ); return null; } }); const [localResult, ...remoteResults] = await Promise.all([localPromise, ...remotePromises]); const successfulRemoteResults = remoteResults.filter( (result: TaggedList | null): result is TaggedList => result !== null, ); const allResults = [localResult, ...successfulRemoteResults]; const peerFailure = successfulRemoteResults.length !== peers.length; return this.mergeTaggedLists(allResults, peerFailure); } private async runLocal( local: LocalListExecutor, ): Promise> { const response = await local(); if (Array.isArray(response)) { return { items: response }; } return response; } private tagList( response: FederationListResponse, source: string, ): TaggedList { return { items: tagWithSource(response.items, source), partial: response._partial === true, truncated: response._truncated === true || response.nextCursor !== undefined, nextCursor: response.nextCursor, }; } private mergeTaggedLists( lists: Array>, peerFailure: boolean, ): QuerySourceListResponse { const items = lists.flatMap((list: TaggedList) => list.items); const partial = peerFailure || lists.some((list: TaggedList) => list.partial || list.nextCursor !== undefined); const truncated = lists.some((list: TaggedList) => list.truncated); const response: QuerySourceListResponse = { items }; if (partial) { response._partial = true; } if (truncated) { response._truncated = true; } return response; } private toResponse(tagged: TaggedList): QuerySourceListResponse { const response: QuerySourceListResponse = { items: tagged.items, }; if (tagged.nextCursor !== undefined) { response.nextCursor = tagged.nextCursor; } if (tagged.partial) { response._partial = true; } if (tagged.truncated) { response._truncated = true; } return response; } private async findPeerByHost(sourceHost: string, source: string): Promise { const host = normalizeHost(sourceHost); const peers = await this.listActiveOutboundPeers(); const peer = peers.find((candidate: OutboundPeer) => { const commonName = normalizeHost(candidate.commonName); const endpointHosts = endpointHostKeys(candidate.endpointUrl).map((endpointHost: string) => normalizeHost(endpointHost), ); return commonName === host || endpointHosts.includes(host); }); if (!peer) { throw new QuerySourceError({ code: 'PEER_NOT_FOUND', message: `No active outbound federation peer matches source ${source}`, source, }); } return peer; } private async listActiveOutboundPeers(): Promise { const rows = await this.db .select({ id: federationPeers.id, commonName: federationPeers.commonName, endpointUrl: federationPeers.endpointUrl, }) .from(federationPeers) .where( and( eq(federationPeers.state, 'active'), isNotNull(federationPeers.endpointUrl), isNotNull(federationPeers.clientKeyPem), ), ) .orderBy(federationPeers.commonName); return rows.filter((row): row is OutboundPeer => typeof row.endpointUrl === 'string'); } } function normalizeHost(host: string): string { return host.trim().toLowerCase(); } function endpointHostKeys(endpointUrl: string): string[] { try { const url = new URL(endpointUrl); return Array.from(new Set([url.host, url.hostname].filter((host: string) => host.length > 0))); } catch { return []; } }