/** * FederationClientService — outbound mTLS client for federation requests (FED-M3-08). * * Dials peer gateways over mTLS using the cert+sealed-key stored in `federation_peers`, * invokes federation verbs (list / get / capabilities), and surfaces all failure modes * as typed `FederationClientError` instances. * * ## Error code taxonomy * * | Code | When | * | ------------------ | ------------------------------------------------------------- | * | PEER_NOT_FOUND | No row in federation_peers for the given peerId | * | PEER_INACTIVE | Peer row exists but state !== 'active' | * | PEER_MISCONFIGURED | Peer row is active but missing endpointUrl or clientKeyPem | * | NETWORK | undici threw a connection / TLS / timeout error | * | HTTP_{status} | Peer returned a non-2xx response (e.g. HTTP_403, HTTP_404) | * | FORBIDDEN | Peer returned 403 (convenience alias alongside HTTP_403) | * | INVALID_RESPONSE | Response body failed Zod schema validation | * * ## Cache strategy * * Per-peer `undici.Agent` instances are cached in a `Map` for * the lifetime of the service instance. The cache is keyed on peerId (UUID). * * Cache invalidation: * - `flushPeer(peerId)` — removes the entry immediately. M5/M6 MUST call this on * cert rotation or peer revocation events so the next request re-reads the DB and * builds a fresh TLS Agent with the new cert material. * - On cache miss: re-reads the DB, checks state === 'active', rebuilds Agent. * * Cache does NOT auto-expire. The service is expected to be a singleton scoped to the * NestJS application lifecycle; flushing on revocation/rotation is the only invalidation * path by design (avoids redundant DB round-trips on the hot path). */ import { Injectable, Inject, Logger } from '@nestjs/common'; import { readFileSync } from 'node:fs'; import { Agent, fetch as undiciFetch } from 'undici'; import type { Dispatcher } from 'undici'; import { z } from 'zod'; import { type Db, eq, federationPeers } from '@mosaicstack/db'; import { FederationListResponseSchema, FederationGetResponseSchema, FederationCapabilitiesResponseSchema, FederationErrorEnvelopeSchema, type FederationListResponse, type FederationGetResponse, type FederationCapabilitiesResponse, } from '@mosaicstack/types'; import { DB } from '../../database/database.module.js'; import { unsealClientKey } from '../peer-key.util.js'; // --------------------------------------------------------------------------- // Error taxonomy // --------------------------------------------------------------------------- /** * Client-side error code set. Distinct from the server-side `FederationErrorCode` * (which lives in `@mosaicstack/types`) because the client has additional failure * modes (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK) that the * server never emits. */ export type FederationClientErrorCode = | 'PEER_NOT_FOUND' | 'PEER_INACTIVE' | 'PEER_MISCONFIGURED' | 'NETWORK' | 'FORBIDDEN' | 'INVALID_RESPONSE' | `HTTP_${number}`; export interface FederationClientErrorOptions { status?: number; code: FederationClientErrorCode; message: string; peerId: string; cause?: unknown; } /** * Thrown by FederationClientService on every failure path. * Callers can dispatch on `error.code` for programmatic handling. */ export class FederationClientError extends Error { readonly status?: number; readonly code: FederationClientErrorCode; readonly peerId: string; readonly cause?: unknown; constructor(opts: FederationClientErrorOptions) { super(opts.message); this.name = 'FederationClientError'; this.status = opts.status; this.code = opts.code; this.peerId = opts.peerId; this.cause = opts.cause; } } // --------------------------------------------------------------------------- // Internal cache types // --------------------------------------------------------------------------- interface AgentCacheEntry { agent: Agent; endpointUrl: string; certPem: string; certSerial: string; } // --------------------------------------------------------------------------- // Service // --------------------------------------------------------------------------- @Injectable() export class FederationClientService { private readonly logger = new Logger(FederationClientService.name); /** * Per-peer undici Agent cache. * Key = peerId (UUID string). * * Values are either a resolved `AgentCacheEntry` or an in-flight * `Promise` (promise-cache pattern). Storing the promise * prevents duplicate DB lookups and duplicate key-unseal operations when two * requests for the same peer arrive before the first build completes. * * Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6). */ private readonly cache = new Map>(); /** * Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`. * Used as the trust anchor for peer server certificates so federation TLS is * pinned to our PKI, not the public trust store. Lazily loaded on first use * so unit tests that don't exercise the agent path can run without the env var. */ private cachedCaPem: string | null = null; constructor(@Inject(DB) private readonly db: Db) {} // ------------------------------------------------------------------------- // Public verb API // ------------------------------------------------------------------------- /** * Invoke the `list` verb on a remote peer. * * @param peerId UUID of the peer row in `federation_peers`. * @param resource Resource path, e.g. "tasks". * @param request Free-form body sent as JSON in the POST body. * @returns Parsed `FederationListResponse`. */ async list( peerId: string, resource: string, request: Record, ): Promise> { const { endpointUrl, agent } = await this.resolveEntry(peerId); const url = `${endpointUrl}/api/federation/v1/list/${encodeURIComponent(resource)}`; const body = await this.doPost(peerId, url, agent, request); return this.parseWith>( peerId, body, FederationListResponseSchema(z.unknown()), ); } /** * Invoke the `get` verb on a remote peer. * * @param peerId UUID of the peer row in `federation_peers`. * @param resource Resource path, e.g. "tasks". * @param id Resource identifier. * @param request Free-form body sent as JSON in the POST body. * @returns Parsed `FederationGetResponse`. */ async get( peerId: string, resource: string, id: string, request: Record, ): Promise> { const { endpointUrl, agent } = await this.resolveEntry(peerId); const url = `${endpointUrl}/api/federation/v1/get/${encodeURIComponent(resource)}/${encodeURIComponent(id)}`; const body = await this.doPost(peerId, url, agent, request); return this.parseWith>( peerId, body, FederationGetResponseSchema(z.unknown()), ); } /** * Invoke the `capabilities` verb on a remote peer. * * @param peerId UUID of the peer row in `federation_peers`. * @returns Parsed `FederationCapabilitiesResponse`. */ async capabilities(peerId: string): Promise { const { endpointUrl, agent } = await this.resolveEntry(peerId); const url = `${endpointUrl}/api/federation/v1/capabilities`; const body = await this.doGet(peerId, url, agent); return this.parseWith( peerId, body, FederationCapabilitiesResponseSchema, ); } // ------------------------------------------------------------------------- // Cache management // ------------------------------------------------------------------------- /** * Flush the cached Agent for a specific peer. * * M5/M6 MUST call this on: * - cert rotation events (so new cert material is picked up) * - peer revocation events (so future requests fail at PEER_INACTIVE) * * After flushing, the next call to `list`, `get`, or `capabilities` for * this peer will re-read the DB and rebuild the Agent. */ flushPeer(peerId: string): void { const entry = this.cache.get(peerId); if (entry === undefined) { return; } this.cache.delete(peerId); if (!(entry instanceof Promise)) { // best-effort destroy; promise-cached entries skip destroy because // the in-flight build owns its own Agent which will be GC'd when the // owning request handles the rejection from the cache miss entry.agent.destroy().catch(() => { // intentionally ignored — destroy errors are not actionable }); } this.logger.log(`Cache flushed for peer ${peerId}`); } // ------------------------------------------------------------------------- // Internal helpers // ------------------------------------------------------------------------- /** * Load and cache the Step-CA root cert PEM from `STEP_CA_ROOT_CERT_PATH`. * Throws `FederationClientError` if the env var is unset or the file cannot * be read — mTLS to a peer without a pinned trust anchor would silently * fall back to the public trust store. */ private loadStepCaRoot(): string { if (this.cachedCaPem !== null) { return this.cachedCaPem; } const path = process.env['STEP_CA_ROOT_CERT_PATH']; if (!path) { throw new FederationClientError({ code: 'PEER_MISCONFIGURED', message: 'STEP_CA_ROOT_CERT_PATH is not set; refusing to dial peer without pinned CA trust', peerId: '', }); } try { const pem = readFileSync(path, 'utf8'); this.cachedCaPem = pem; return pem; } catch (err) { throw new FederationClientError({ code: 'PEER_MISCONFIGURED', message: `Failed to read STEP_CA_ROOT_CERT_PATH (${path})`, peerId: '', cause: err, }); } } /** * Resolve the cache entry for a peer, reading DB on miss. * * Uses a promise-cache pattern: concurrent callers for the same uncached * `peerId` all `await` the same in-flight `Promise` so * only one DB lookup and one key-unseal ever runs per peer per cache miss. * The promise is replaced with the concrete entry on success, or deleted on * rejection so a transient error does not poison the cache permanently. * * Throws `FederationClientError` with appropriate code if the peer is not * found, is inactive, or is missing required fields. */ private async resolveEntry(peerId: string): Promise { const cached = this.cache.get(peerId); if (cached) { return cached; // Promise or concrete entry — both are awaitable } const inflight = this.buildEntry(peerId).then( (entry) => { this.cache.set(peerId, entry); // replace promise with concrete value return entry; }, (err: unknown) => { this.cache.delete(peerId); // don't poison the cache with a rejected promise throw err; }, ); this.cache.set(peerId, inflight); return inflight; } /** * Build the `AgentCacheEntry` for a peer by reading the DB, validating the * peer's state, unsealing the private key, and constructing the mTLS Agent. * * Throws `FederationClientError` with appropriate code if the peer is not * found, is inactive, or is missing required fields. */ private async buildEntry(peerId: string): Promise { // DB lookup const [peer] = await this.db .select() .from(federationPeers) .where(eq(federationPeers.id, peerId)) .limit(1); if (!peer) { throw new FederationClientError({ code: 'PEER_NOT_FOUND', message: `Federation peer ${peerId} not found`, peerId, }); } if (peer.state !== 'active') { throw new FederationClientError({ code: 'PEER_INACTIVE', message: `Federation peer ${peerId} is not active (state: ${peer.state})`, peerId, }); } if (!peer.endpointUrl || !peer.clientKeyPem) { throw new FederationClientError({ code: 'PEER_MISCONFIGURED', message: `Federation peer ${peerId} is missing endpointUrl or clientKeyPem`, peerId, }); } // Unseal the private key let privateKeyPem: string; try { privateKeyPem = unsealClientKey(peer.clientKeyPem); } catch (err) { throw new FederationClientError({ code: 'PEER_MISCONFIGURED', message: `Failed to unseal client key for peer ${peerId}`, peerId, cause: err, }); } // Build mTLS agent — pin trust to Step-CA root so we never accept // a peer cert signed by a public CA (defense against MITM with a // publicly-trusted DV cert for the peer's hostname). const agent = new Agent({ connect: { cert: peer.certPem, key: privateKeyPem, ca: this.loadStepCaRoot(), // rejectUnauthorized: true is the undici default for HTTPS }, }); const entry: AgentCacheEntry = { agent, endpointUrl: peer.endpointUrl, certPem: peer.certPem, certSerial: peer.certSerial, }; this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`); return entry; } /** * Execute a POST request with a JSON body. * Returns the parsed response body as an unknown value. * Throws `FederationClientError` on network errors and non-2xx responses. */ private async doPost( peerId: string, url: string, agent: Dispatcher, body: Record, ): Promise { return this.doRequest(peerId, url, agent, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), }); } /** * Execute a GET request. * Returns the parsed response body as an unknown value. * Throws `FederationClientError` on network errors and non-2xx responses. */ private async doGet(peerId: string, url: string, agent: Dispatcher): Promise { return this.doRequest(peerId, url, agent, { method: 'GET' }); } private async doRequest( peerId: string, url: string, agent: Dispatcher, init: { method: string; headers?: Record; body?: string }, ): Promise { let response: Awaited>; try { response = await undiciFetch(url, { ...init, dispatcher: agent, }); } catch (err) { throw new FederationClientError({ code: 'NETWORK', message: `Network error calling peer ${peerId} at ${url}: ${err instanceof Error ? err.message : String(err)}`, peerId, cause: err, }); } const rawBody = await response.text().catch(() => ''); if (!response.ok) { const status = response.status; // Attempt to parse as federation error envelope let serverMessage = `HTTP ${status}`; try { const json: unknown = JSON.parse(rawBody); const result = FederationErrorEnvelopeSchema.safeParse(json); if (result.success) { serverMessage = result.data.error.message; } } catch { // Not valid JSON or not a federation envelope — use generic message } // Specific code for 403 (most actionable for callers); generic HTTP_{n} for others const code: FederationClientErrorCode = status === 403 ? 'FORBIDDEN' : `HTTP_${status}`; throw new FederationClientError({ status, code, message: `Peer ${peerId} returned ${status}: ${serverMessage}`, peerId, }); } try { return JSON.parse(rawBody) as unknown; } catch (err) { throw new FederationClientError({ code: 'INVALID_RESPONSE', message: `Peer ${peerId} returned non-JSON body`, peerId, cause: err, }); } } /** * Parse and validate a response body against a Zod schema. * * For list/get, callers pass the result of `FederationListResponseSchema(z.unknown())` * so that the envelope structure is validated without requiring a concrete item schema * at the client level. The generic `T` provides compile-time typing. * * Throws `FederationClientError({ code: 'INVALID_RESPONSE' })` on parse failure. */ private parseWith(peerId: string, body: unknown, schema: z.ZodTypeAny): T { const result = schema.safeParse(body); if (!result.success) { const issues = result.error.issues .map((e: z.ZodIssue) => `[${e.path.join('.') || 'root'}] ${e.message}`) .join('; '); throw new FederationClientError({ code: 'INVALID_RESPONSE', message: `Peer ${peerId} returned invalid response shape: ${issues}`, peerId, }); } return result.data as T; } }