- HIGH-A: resolveEntry now uses promise-cache pattern so concurrent callers serialize on a single in-flight build, eliminating duplicate key material in heap and duplicate DB round-trips - HIGH-B: flushPeer destroys the evicted undici Agent so stale TLS connections close on cert rotation - MED-C: add regression test for PEER_MISCONFIGURED when STEP_CA_ROOT_CERT_PATH is unset Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
501 lines
17 KiB
TypeScript
501 lines
17 KiB
TypeScript
/**
|
|
* FederationClientService — outbound mTLS client for federation requests (FED-M3-08).
|
|
*
|
|
* Dials peer gateways over mTLS using the cert+sealed-key stored in `federation_peers`,
|
|
* invokes federation verbs (list / get / capabilities), and surfaces all failure modes
|
|
* as typed `FederationClientError` instances.
|
|
*
|
|
* ## Error code taxonomy
|
|
*
|
|
* | Code | When |
|
|
* | ------------------ | ------------------------------------------------------------- |
|
|
* | PEER_NOT_FOUND | No row in federation_peers for the given peerId |
|
|
* | PEER_INACTIVE | Peer row exists but state !== 'active' |
|
|
* | PEER_MISCONFIGURED | Peer row is active but missing endpointUrl or clientKeyPem |
|
|
* | NETWORK | undici threw a connection / TLS / timeout error |
|
|
* | HTTP_{status} | Peer returned a non-2xx response (e.g. HTTP_403, HTTP_404) |
|
|
* | FORBIDDEN | Peer returned 403 (convenience alias alongside HTTP_403) |
|
|
* | INVALID_RESPONSE | Response body failed Zod schema validation |
|
|
*
|
|
* ## Cache strategy
|
|
*
|
|
* Per-peer `undici.Agent` instances are cached in a `Map<peerId, AgentCacheEntry>` for
|
|
* the lifetime of the service instance. The cache is keyed on peerId (UUID).
|
|
*
|
|
* Cache invalidation:
|
|
* - `flushPeer(peerId)` — removes the entry immediately. M5/M6 MUST call this on
|
|
* cert rotation or peer revocation events so the next request re-reads the DB and
|
|
* builds a fresh TLS Agent with the new cert material.
|
|
* - On cache miss: re-reads the DB, checks state === 'active', rebuilds Agent.
|
|
*
|
|
* Cache does NOT auto-expire. The service is expected to be a singleton scoped to the
|
|
* NestJS application lifecycle; flushing on revocation/rotation is the only invalidation
|
|
* path by design (avoids redundant DB round-trips on the hot path).
|
|
*/
|
|
|
|
import { Injectable, Inject, Logger } from '@nestjs/common';
|
|
import { readFileSync } from 'node:fs';
|
|
import { Agent, fetch as undiciFetch } from 'undici';
|
|
import type { Dispatcher } from 'undici';
|
|
import { z } from 'zod';
|
|
import { type Db, eq, federationPeers } from '@mosaicstack/db';
|
|
import {
|
|
FederationListResponseSchema,
|
|
FederationGetResponseSchema,
|
|
FederationCapabilitiesResponseSchema,
|
|
FederationErrorEnvelopeSchema,
|
|
type FederationListResponse,
|
|
type FederationGetResponse,
|
|
type FederationCapabilitiesResponse,
|
|
} from '@mosaicstack/types';
|
|
import { DB } from '../../database/database.module.js';
|
|
import { unsealClientKey } from '../peer-key.util.js';
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Error taxonomy
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Client-side error code set. Distinct from the server-side `FederationErrorCode`
|
|
* (which lives in `@mosaicstack/types`) because the client has additional failure
|
|
* modes (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK) that the
|
|
* server never emits.
|
|
*/
|
|
export type FederationClientErrorCode =
|
|
| 'PEER_NOT_FOUND'
|
|
| 'PEER_INACTIVE'
|
|
| 'PEER_MISCONFIGURED'
|
|
| 'NETWORK'
|
|
| 'FORBIDDEN'
|
|
| 'INVALID_RESPONSE'
|
|
| `HTTP_${number}`;
|
|
|
|
export interface FederationClientErrorOptions {
|
|
status?: number;
|
|
code: FederationClientErrorCode;
|
|
message: string;
|
|
peerId: string;
|
|
cause?: unknown;
|
|
}
|
|
|
|
/**
|
|
* Thrown by FederationClientService on every failure path.
|
|
* Callers can dispatch on `error.code` for programmatic handling.
|
|
*/
|
|
export class FederationClientError extends Error {
|
|
readonly status?: number;
|
|
readonly code: FederationClientErrorCode;
|
|
readonly peerId: string;
|
|
readonly cause?: unknown;
|
|
|
|
constructor(opts: FederationClientErrorOptions) {
|
|
super(opts.message);
|
|
this.name = 'FederationClientError';
|
|
this.status = opts.status;
|
|
this.code = opts.code;
|
|
this.peerId = opts.peerId;
|
|
this.cause = opts.cause;
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Internal cache types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
interface AgentCacheEntry {
|
|
agent: Agent;
|
|
endpointUrl: string;
|
|
certPem: string;
|
|
certSerial: string;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Service
|
|
// ---------------------------------------------------------------------------
|
|
|
|
@Injectable()
|
|
export class FederationClientService {
|
|
private readonly logger = new Logger(FederationClientService.name);
|
|
|
|
/**
|
|
* Per-peer undici Agent cache.
|
|
* Key = peerId (UUID string).
|
|
*
|
|
* Values are either a resolved `AgentCacheEntry` or an in-flight
|
|
* `Promise<AgentCacheEntry>` (promise-cache pattern). Storing the promise
|
|
* prevents duplicate DB lookups and duplicate key-unseal operations when two
|
|
* requests for the same peer arrive before the first build completes.
|
|
*
|
|
* Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6).
|
|
*/
|
|
private readonly cache = new Map<string, AgentCacheEntry | Promise<AgentCacheEntry>>();
|
|
|
|
/**
|
|
* Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`.
|
|
* Used as the trust anchor for peer server certificates so federation TLS is
|
|
* pinned to our PKI, not the public trust store. Lazily loaded on first use
|
|
* so unit tests that don't exercise the agent path can run without the env var.
|
|
*/
|
|
private cachedCaPem: string | null = null;
|
|
|
|
constructor(@Inject(DB) private readonly db: Db) {}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Public verb API
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Invoke the `list` verb on a remote peer.
|
|
*
|
|
* @param peerId UUID of the peer row in `federation_peers`.
|
|
* @param resource Resource path, e.g. "tasks".
|
|
* @param request Free-form body sent as JSON in the POST body.
|
|
* @returns Parsed `FederationListResponse<T>`.
|
|
*/
|
|
async list<T>(
|
|
peerId: string,
|
|
resource: string,
|
|
request: Record<string, unknown>,
|
|
): Promise<FederationListResponse<T>> {
|
|
const { endpointUrl, agent } = await this.resolveEntry(peerId);
|
|
const url = `${endpointUrl}/api/federation/v1/list/${encodeURIComponent(resource)}`;
|
|
const body = await this.doPost(peerId, url, agent, request);
|
|
return this.parseWith<FederationListResponse<T>>(
|
|
peerId,
|
|
body,
|
|
FederationListResponseSchema(z.unknown()),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Invoke the `get` verb on a remote peer.
|
|
*
|
|
* @param peerId UUID of the peer row in `federation_peers`.
|
|
* @param resource Resource path, e.g. "tasks".
|
|
* @param id Resource identifier.
|
|
* @param request Free-form body sent as JSON in the POST body.
|
|
* @returns Parsed `FederationGetResponse<T>`.
|
|
*/
|
|
async get<T>(
|
|
peerId: string,
|
|
resource: string,
|
|
id: string,
|
|
request: Record<string, unknown>,
|
|
): Promise<FederationGetResponse<T>> {
|
|
const { endpointUrl, agent } = await this.resolveEntry(peerId);
|
|
const url = `${endpointUrl}/api/federation/v1/get/${encodeURIComponent(resource)}/${encodeURIComponent(id)}`;
|
|
const body = await this.doPost(peerId, url, agent, request);
|
|
return this.parseWith<FederationGetResponse<T>>(
|
|
peerId,
|
|
body,
|
|
FederationGetResponseSchema(z.unknown()),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Invoke the `capabilities` verb on a remote peer.
|
|
*
|
|
* @param peerId UUID of the peer row in `federation_peers`.
|
|
* @returns Parsed `FederationCapabilitiesResponse`.
|
|
*/
|
|
async capabilities(peerId: string): Promise<FederationCapabilitiesResponse> {
|
|
const { endpointUrl, agent } = await this.resolveEntry(peerId);
|
|
const url = `${endpointUrl}/api/federation/v1/capabilities`;
|
|
const body = await this.doGet(peerId, url, agent);
|
|
return this.parseWith<FederationCapabilitiesResponse>(
|
|
peerId,
|
|
body,
|
|
FederationCapabilitiesResponseSchema,
|
|
);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Cache management
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Flush the cached Agent for a specific peer.
|
|
*
|
|
* M5/M6 MUST call this on:
|
|
* - cert rotation events (so new cert material is picked up)
|
|
* - peer revocation events (so future requests fail at PEER_INACTIVE)
|
|
*
|
|
* After flushing, the next call to `list`, `get`, or `capabilities` for
|
|
* this peer will re-read the DB and rebuild the Agent.
|
|
*/
|
|
flushPeer(peerId: string): void {
|
|
const entry = this.cache.get(peerId);
|
|
if (entry === undefined) {
|
|
return;
|
|
}
|
|
this.cache.delete(peerId);
|
|
if (!(entry instanceof Promise)) {
|
|
// best-effort destroy; promise-cached entries skip destroy because
|
|
// the in-flight build owns its own Agent which will be GC'd when the
|
|
// owning request handles the rejection from the cache miss
|
|
entry.agent.destroy().catch(() => {
|
|
// intentionally ignored — destroy errors are not actionable
|
|
});
|
|
}
|
|
this.logger.log(`Cache flushed for peer ${peerId}`);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Internal helpers
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Load and cache the Step-CA root cert PEM from `STEP_CA_ROOT_CERT_PATH`.
|
|
* Throws `FederationClientError` if the env var is unset or the file cannot
|
|
* be read — mTLS to a peer without a pinned trust anchor would silently
|
|
* fall back to the public trust store.
|
|
*/
|
|
private loadStepCaRoot(): string {
|
|
if (this.cachedCaPem !== null) {
|
|
return this.cachedCaPem;
|
|
}
|
|
const path = process.env['STEP_CA_ROOT_CERT_PATH'];
|
|
if (!path) {
|
|
throw new FederationClientError({
|
|
code: 'PEER_MISCONFIGURED',
|
|
message: 'STEP_CA_ROOT_CERT_PATH is not set; refusing to dial peer without pinned CA trust',
|
|
peerId: '',
|
|
});
|
|
}
|
|
try {
|
|
const pem = readFileSync(path, 'utf8');
|
|
this.cachedCaPem = pem;
|
|
return pem;
|
|
} catch (err) {
|
|
throw new FederationClientError({
|
|
code: 'PEER_MISCONFIGURED',
|
|
message: `Failed to read STEP_CA_ROOT_CERT_PATH (${path})`,
|
|
peerId: '',
|
|
cause: err,
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolve the cache entry for a peer, reading DB on miss.
|
|
*
|
|
* Uses a promise-cache pattern: concurrent callers for the same uncached
|
|
* `peerId` all `await` the same in-flight `Promise<AgentCacheEntry>` so
|
|
* only one DB lookup and one key-unseal ever runs per peer per cache miss.
|
|
* The promise is replaced with the concrete entry on success, or deleted on
|
|
* rejection so a transient error does not poison the cache permanently.
|
|
*
|
|
* Throws `FederationClientError` with appropriate code if the peer is not
|
|
* found, is inactive, or is missing required fields.
|
|
*/
|
|
private async resolveEntry(peerId: string): Promise<AgentCacheEntry> {
|
|
const cached = this.cache.get(peerId);
|
|
if (cached) {
|
|
return cached; // Promise or concrete entry — both are awaitable
|
|
}
|
|
|
|
const inflight = this.buildEntry(peerId).then(
|
|
(entry) => {
|
|
this.cache.set(peerId, entry); // replace promise with concrete value
|
|
return entry;
|
|
},
|
|
(err: unknown) => {
|
|
this.cache.delete(peerId); // don't poison the cache with a rejected promise
|
|
throw err;
|
|
},
|
|
);
|
|
|
|
this.cache.set(peerId, inflight);
|
|
return inflight;
|
|
}
|
|
|
|
/**
|
|
* Build the `AgentCacheEntry` for a peer by reading the DB, validating the
|
|
* peer's state, unsealing the private key, and constructing the mTLS Agent.
|
|
*
|
|
* Throws `FederationClientError` with appropriate code if the peer is not
|
|
* found, is inactive, or is missing required fields.
|
|
*/
|
|
private async buildEntry(peerId: string): Promise<AgentCacheEntry> {
|
|
// DB lookup
|
|
const [peer] = await this.db
|
|
.select()
|
|
.from(federationPeers)
|
|
.where(eq(federationPeers.id, peerId))
|
|
.limit(1);
|
|
|
|
if (!peer) {
|
|
throw new FederationClientError({
|
|
code: 'PEER_NOT_FOUND',
|
|
message: `Federation peer ${peerId} not found`,
|
|
peerId,
|
|
});
|
|
}
|
|
|
|
if (peer.state !== 'active') {
|
|
throw new FederationClientError({
|
|
code: 'PEER_INACTIVE',
|
|
message: `Federation peer ${peerId} is not active (state: ${peer.state})`,
|
|
peerId,
|
|
});
|
|
}
|
|
|
|
if (!peer.endpointUrl || !peer.clientKeyPem) {
|
|
throw new FederationClientError({
|
|
code: 'PEER_MISCONFIGURED',
|
|
message: `Federation peer ${peerId} is missing endpointUrl or clientKeyPem`,
|
|
peerId,
|
|
});
|
|
}
|
|
|
|
// Unseal the private key
|
|
let privateKeyPem: string;
|
|
try {
|
|
privateKeyPem = unsealClientKey(peer.clientKeyPem);
|
|
} catch (err) {
|
|
throw new FederationClientError({
|
|
code: 'PEER_MISCONFIGURED',
|
|
message: `Failed to unseal client key for peer ${peerId}`,
|
|
peerId,
|
|
cause: err,
|
|
});
|
|
}
|
|
|
|
// Build mTLS agent — pin trust to Step-CA root so we never accept
|
|
// a peer cert signed by a public CA (defense against MITM with a
|
|
// publicly-trusted DV cert for the peer's hostname).
|
|
const agent = new Agent({
|
|
connect: {
|
|
cert: peer.certPem,
|
|
key: privateKeyPem,
|
|
ca: this.loadStepCaRoot(),
|
|
// rejectUnauthorized: true is the undici default for HTTPS
|
|
},
|
|
});
|
|
|
|
const entry: AgentCacheEntry = {
|
|
agent,
|
|
endpointUrl: peer.endpointUrl,
|
|
certPem: peer.certPem,
|
|
certSerial: peer.certSerial,
|
|
};
|
|
|
|
this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`);
|
|
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Execute a POST request with a JSON body.
|
|
* Returns the parsed response body as an unknown value.
|
|
* Throws `FederationClientError` on network errors and non-2xx responses.
|
|
*/
|
|
private async doPost(
|
|
peerId: string,
|
|
url: string,
|
|
agent: Dispatcher,
|
|
body: Record<string, unknown>,
|
|
): Promise<unknown> {
|
|
return this.doRequest(peerId, url, agent, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(body),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Execute a GET request.
|
|
* Returns the parsed response body as an unknown value.
|
|
* Throws `FederationClientError` on network errors and non-2xx responses.
|
|
*/
|
|
private async doGet(peerId: string, url: string, agent: Dispatcher): Promise<unknown> {
|
|
return this.doRequest(peerId, url, agent, { method: 'GET' });
|
|
}
|
|
|
|
private async doRequest(
|
|
peerId: string,
|
|
url: string,
|
|
agent: Dispatcher,
|
|
init: { method: string; headers?: Record<string, string>; body?: string },
|
|
): Promise<unknown> {
|
|
let response: Awaited<ReturnType<typeof undiciFetch>>;
|
|
|
|
try {
|
|
response = await undiciFetch(url, {
|
|
...init,
|
|
dispatcher: agent,
|
|
});
|
|
} catch (err) {
|
|
throw new FederationClientError({
|
|
code: 'NETWORK',
|
|
message: `Network error calling peer ${peerId} at ${url}: ${err instanceof Error ? err.message : String(err)}`,
|
|
peerId,
|
|
cause: err,
|
|
});
|
|
}
|
|
|
|
const rawBody = await response.text().catch(() => '');
|
|
|
|
if (!response.ok) {
|
|
const status = response.status;
|
|
|
|
// Attempt to parse as federation error envelope
|
|
let serverMessage = `HTTP ${status}`;
|
|
try {
|
|
const json: unknown = JSON.parse(rawBody);
|
|
const result = FederationErrorEnvelopeSchema.safeParse(json);
|
|
if (result.success) {
|
|
serverMessage = result.data.error.message;
|
|
}
|
|
} catch {
|
|
// Not valid JSON or not a federation envelope — use generic message
|
|
}
|
|
|
|
// Specific code for 403 (most actionable for callers); generic HTTP_{n} for others
|
|
const code: FederationClientErrorCode = status === 403 ? 'FORBIDDEN' : `HTTP_${status}`;
|
|
|
|
throw new FederationClientError({
|
|
status,
|
|
code,
|
|
message: `Peer ${peerId} returned ${status}: ${serverMessage}`,
|
|
peerId,
|
|
});
|
|
}
|
|
|
|
try {
|
|
return JSON.parse(rawBody) as unknown;
|
|
} catch (err) {
|
|
throw new FederationClientError({
|
|
code: 'INVALID_RESPONSE',
|
|
message: `Peer ${peerId} returned non-JSON body`,
|
|
peerId,
|
|
cause: err,
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parse and validate a response body against a Zod schema.
|
|
*
|
|
* For list/get, callers pass the result of `FederationListResponseSchema(z.unknown())`
|
|
* so that the envelope structure is validated without requiring a concrete item schema
|
|
* at the client level. The generic `T` provides compile-time typing.
|
|
*
|
|
* Throws `FederationClientError({ code: 'INVALID_RESPONSE' })` on parse failure.
|
|
*/
|
|
private parseWith<T>(peerId: string, body: unknown, schema: z.ZodTypeAny): T {
|
|
const result = schema.safeParse(body);
|
|
if (!result.success) {
|
|
const issues = result.error.issues
|
|
.map((e: z.ZodIssue) => `[${e.path.join('.') || 'root'}] ${e.message}`)
|
|
.join('; ');
|
|
throw new FederationClientError({
|
|
code: 'INVALID_RESPONSE',
|
|
message: `Peer ${peerId} returned invalid response shape: ${issues}`,
|
|
peerId,
|
|
});
|
|
}
|
|
return result.data as T;
|
|
}
|
|
}
|