diff --git a/apps/gateway/package.json b/apps/gateway/package.json index 79824f3..1a2cb97 100644 --- a/apps/gateway/package.json +++ b/apps/gateway/package.json @@ -73,6 +73,7 @@ "rxjs": "^7.8.0", "socket.io": "^4.8.0", "uuid": "^11.0.0", + "undici": "^7.24.6", "zod": "^4.3.6" }, "devDependencies": { diff --git a/apps/gateway/src/federation/client/__tests__/federation-client.service.spec.ts b/apps/gateway/src/federation/client/__tests__/federation-client.service.spec.ts new file mode 100644 index 0000000..3f92a33 --- /dev/null +++ b/apps/gateway/src/federation/client/__tests__/federation-client.service.spec.ts @@ -0,0 +1,492 @@ +/** + * Unit tests for FederationClientService (FED-M3-08). + * + * HTTP mocking strategy: + * undici MockAgent is used to intercept outbound HTTP requests. The service + * uses `undici.fetch` with a `dispatcher` option, so MockAgent is set as the + * global dispatcher and all requests flow through it. + * + * Because the service builds one `undici.Agent` per peer and passes it as + * the dispatcher on every fetch call, we cannot intercept at the Agent level + * in unit tests without significant refactoring. Instead, we set the global + * dispatcher to a MockAgent and override the service's `doRequest` indirection + * by spying on the internal fetch call. + * + * For the cert/key wiring, we use the real `sealClientKey` function from + * peer-key.util.ts with a test secret — no stubs. + * + * Sealed-key setup: + * Each test (or beforeAll) calls `sealClientKey(TEST_PRIVATE_KEY_PEM)` with + * BETTER_AUTH_SECRET set to a deterministic test value so that + * `unsealClientKey` in the service recovers the original PEM. + */ + +import 'reflect-metadata'; +import { describe, it, expect, vi, beforeEach, afterEach, beforeAll } from 'vitest'; +import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici'; +import type { Dispatcher } from 'undici'; +import type { Db } from '@mosaicstack/db'; +import { FederationClientService, FederationClientError } from '../federation-client.service.js'; +import { sealClientKey } from '../../peer-key.util.js'; + +// --------------------------------------------------------------------------- +// Test constants +// --------------------------------------------------------------------------- + +const TEST_SECRET = 'test-secret-for-federation-client-spec-only'; +const PEER_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'; +const ENDPOINT = 'https://peer.example.com'; + +// Minimal valid RSA/EC private key PEM — does NOT need to be a real key for +// unit tests because we only verify it round-trips through seal/unseal, not +// that it actually negotiates TLS (MockAgent handles that). +const TEST_PRIVATE_KEY_PEM = `-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDummyKeyForTests +-----END PRIVATE KEY-----`; + +// Minimal self-signed cert PEM (dummy — only used for mTLS Agent construction) +const TEST_CERT_PEM = `-----BEGIN CERTIFICATE----- +MIIBdummyCertForFederationClientTests== +-----END CERTIFICATE-----`; + +const TEST_CERT_SERIAL = 'ABCDEF1234567890'; + +// --------------------------------------------------------------------------- +// Sealed key (computed once in beforeAll) +// --------------------------------------------------------------------------- + +let SEALED_KEY: string; + +// --------------------------------------------------------------------------- +// Peer row factory +// --------------------------------------------------------------------------- + +function makePeerRow(overrides: Partial> = {}) { + return { + id: PEER_ID, + commonName: 'peer-example-com', + displayName: 'Test Peer', + certPem: TEST_CERT_PEM, + certSerial: TEST_CERT_SERIAL, + certNotAfter: new Date('2030-01-01T00:00:00Z'), + clientKeyPem: SEALED_KEY, + state: 'active' as const, + endpointUrl: ENDPOINT, + lastSeenAt: null, + createdAt: new Date('2026-01-01T00:00:00Z'), + revokedAt: null, + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Mock DB builder +// --------------------------------------------------------------------------- + +function makeDb(selectRows: unknown[] = [makePeerRow()]): Db { + const limitSelect = vi.fn().mockResolvedValue(selectRows); + const whereSelect = vi.fn().mockReturnValue({ limit: limitSelect }); + const fromSelect = vi.fn().mockReturnValue({ where: whereSelect }); + const selectMock = vi.fn().mockReturnValue({ from: fromSelect }); + + return { + select: selectMock, + insert: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + transaction: vi.fn(), + } as unknown as Db; +} + +// --------------------------------------------------------------------------- +// Helpers for MockAgent HTTP interception +// --------------------------------------------------------------------------- + +/** + * Create a MockAgent + MockPool for the peer endpoint, set it as the global + * dispatcher, and return both for per-test configuration. + */ +function makeMockAgent() { + const mockAgent = new MockAgent({ connections: 1 }); + mockAgent.disableNetConnect(); + setGlobalDispatcher(mockAgent); + const pool = mockAgent.get(ENDPOINT); + return { mockAgent, pool }; +} + +/** + * Build a FederationClientService with a mock DB and a spy on the internal + * fetch so we can intercept at the HTTP layer via MockAgent. + * + * The service calls `fetch(url, { dispatcher: agent })` where `agent` is the + * mTLS undici.Agent built from the peer's cert+key. To make MockAgent work, + * we need the fetch dispatcher to be the MockAgent, not the per-peer Agent. + * + * Strategy: we replace the private `resolveEntry` result's `agent` field with + * the MockAgent's pool, so fetch uses our interceptor. We do this by spying + * on `resolveEntry` and returning a controlled entry. + */ +function makeService(db: Db, mockPool: Dispatcher): FederationClientService { + const svc = new FederationClientService(db); + + // Override resolveEntry to inject MockAgent pool as the dispatcher + vi.spyOn( + svc as unknown as { resolveEntry: (peerId: string) => Promise }, + 'resolveEntry', + ).mockImplementation(async (_peerId: string) => { + // Still call DB (via the real logic) to exercise peer validation, + // but return mock pool as the agent. + // For simplicity in unit tests, directly return a controlled entry. + return { + agent: mockPool, + endpointUrl: ENDPOINT, + certPem: TEST_CERT_PEM, + certSerial: TEST_CERT_SERIAL, + }; + }); + + return svc; +} + +// --------------------------------------------------------------------------- +// Test setup +// --------------------------------------------------------------------------- + +let originalDispatcher: Dispatcher; + +beforeAll(() => { + // Seal the test key once — requires BETTER_AUTH_SECRET + const saved = process.env['BETTER_AUTH_SECRET']; + process.env['BETTER_AUTH_SECRET'] = TEST_SECRET; + try { + SEALED_KEY = sealClientKey(TEST_PRIVATE_KEY_PEM); + } finally { + if (saved === undefined) { + delete process.env['BETTER_AUTH_SECRET']; + } else { + process.env['BETTER_AUTH_SECRET'] = saved; + } + } +}); + +beforeEach(() => { + originalDispatcher = getGlobalDispatcher(); + process.env['BETTER_AUTH_SECRET'] = TEST_SECRET; +}); + +afterEach(() => { + setGlobalDispatcher(originalDispatcher); + vi.restoreAllMocks(); + delete process.env['BETTER_AUTH_SECRET']; +}); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Successful list response body */ +const LIST_BODY = { + items: [{ id: '1', title: 'Task One' }], + nextCursor: undefined, + _partial: false, +}; + +/** Successful get response body */ +const GET_BODY = { + item: { id: '1', title: 'Task One' }, + _partial: false, +}; + +/** Successful capabilities response body */ +const CAP_BODY = { + resources: ['tasks'], + excluded_resources: [], + max_rows_per_query: 100, + supported_verbs: ['list', 'get', 'capabilities'] as const, +}; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('FederationClientService', () => { + // ─── Successful verb calls ───────────────────────────────────────────────── + + describe('list()', () => { + it('returns parsed typed response on success', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool + .intercept({ + path: '/api/federation/v1/list/tasks', + method: 'POST', + }) + .reply(200, LIST_BODY, { headers: { 'content-type': 'application/json' } }); + + const result = await svc.list(PEER_ID, 'tasks', {}); + + expect(result.items).toHaveLength(1); + expect(result.items[0]).toMatchObject({ id: '1', title: 'Task One' }); + + await mockAgent.close(); + }); + }); + + describe('get()', () => { + it('returns parsed typed response on success', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool + .intercept({ + path: '/api/federation/v1/get/tasks/1', + method: 'POST', + }) + .reply(200, GET_BODY, { headers: { 'content-type': 'application/json' } }); + + const result = await svc.get(PEER_ID, 'tasks', '1', {}); + + expect(result.item).toMatchObject({ id: '1', title: 'Task One' }); + + await mockAgent.close(); + }); + }); + + describe('capabilities()', () => { + it('returns parsed capabilities response on success', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool + .intercept({ + path: '/api/federation/v1/capabilities', + method: 'GET', + }) + .reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } }); + + const result = await svc.capabilities(PEER_ID); + + expect(result.resources).toContain('tasks'); + expect(result.max_rows_per_query).toBe(100); + + await mockAgent.close(); + }); + }); + + // ─── HTTP error surfaces ────────────────────────────────────────────────── + + describe('non-2xx responses', () => { + it('surfaces 403 as FederationClientError({ status: 403, code: "FORBIDDEN" })', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool.intercept({ path: '/api/federation/v1/list/tasks', method: 'POST' }).reply( + 403, + { error: { code: 'forbidden', message: 'Access denied' } }, + { + headers: { 'content-type': 'application/json' }, + }, + ); + + await expect(svc.list(PEER_ID, 'tasks', {})).rejects.toMatchObject({ + status: 403, + code: 'FORBIDDEN', + peerId: PEER_ID, + }); + + await mockAgent.close(); + }); + + it('surfaces 404 as FederationClientError({ status: 404, code: "HTTP_404" })', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool.intercept({ path: '/api/federation/v1/get/tasks/999', method: 'POST' }).reply( + 404, + { error: { code: 'not_found', message: 'Not found' } }, + { + headers: { 'content-type': 'application/json' }, + }, + ); + + await expect(svc.get(PEER_ID, 'tasks', '999', {})).rejects.toMatchObject({ + status: 404, + code: 'HTTP_404', + peerId: PEER_ID, + }); + + await mockAgent.close(); + }); + }); + + // ─── Network error ───────────────────────────────────────────────────────── + + describe('network errors', () => { + it('surfaces network error as FederationClientError({ code: "NETWORK" })', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool + .intercept({ path: '/api/federation/v1/capabilities', method: 'GET' }) + .replyWithError(new Error('ECONNREFUSED')); + + await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({ + code: 'NETWORK', + peerId: PEER_ID, + }); + + await mockAgent.close(); + }); + }); + + // ─── Invalid response body ───────────────────────────────────────────────── + + describe('invalid response body', () => { + it('surfaces as FederationClientError({ code: "INVALID_RESPONSE" }) when body shape is wrong', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + // capabilities returns wrong shape (missing required fields) + pool + .intercept({ path: '/api/federation/v1/capabilities', method: 'GET' }) + .reply(200, { totally: 'wrong' }, { headers: { 'content-type': 'application/json' } }); + + await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({ + code: 'INVALID_RESPONSE', + peerId: PEER_ID, + }); + + await mockAgent.close(); + }); + }); + + // ─── Peer DB validation ──────────────────────────────────────────────────── + + describe('peer validation (without resolveEntry spy)', () => { + /** + * These tests exercise the real `resolveEntry` path — no spy on resolveEntry. + */ + + it('throws PEER_NOT_FOUND when peer is not in DB', async () => { + // DB returns empty array (peer not found) + const db = makeDb([]); + const svc = new FederationClientService(db); + + await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({ + code: 'PEER_NOT_FOUND', + peerId: PEER_ID, + }); + }); + + it('throws PEER_INACTIVE when peer state is not "active"', async () => { + const db = makeDb([makePeerRow({ state: 'suspended' })]); + const svc = new FederationClientService(db); + + await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({ + code: 'PEER_INACTIVE', + peerId: PEER_ID, + }); + }); + }); + + // ─── Cache behaviour ─────────────────────────────────────────────────────── + + describe('cache behaviour', () => { + it('hits cache on second call — only one DB lookup happens', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + + // Use real resolveEntry — let it build cache from DB + const svc = new FederationClientService(db); + + // First capabilities call — will build + cache entry + pool + .intercept({ path: '/api/federation/v1/capabilities', method: 'GET' }) + .reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } }) + .times(2); // allow two HTTP calls (one per call below) + + // Spy on the private resolveEntry to count DB calls via the DB select spy + const selectSpy = vi.spyOn(db, 'select'); + + // First call + await svc.capabilities(PEER_ID).catch(() => { + // May fail with PEER_MISCONFIGURED if key unseal fails in test — that's OK + // for this specific test which only cares about select spy count + }); + + // Second call — should use cache + await svc.capabilities(PEER_ID).catch(() => {}); + + // DB select should have been called at most once (cache hit on second call) + expect(selectSpy).toHaveBeenCalledTimes(1); + + await mockAgent.close(); + }); + + it('flushPeer() invalidates cache — next call re-reads DB', async () => { + const db = makeDb(); + const { mockAgent, pool } = makeMockAgent(); + const svc = makeService(db, pool); + + pool + .intercept({ path: '/api/federation/v1/capabilities', method: 'GET' }) + .reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } }) + .times(2); + + // First call — populates cache (via mock resolveEntry) + await svc.capabilities(PEER_ID); + + // Flush the cache + svc.flushPeer(PEER_ID); + + // The spy on resolveEntry is still active — check it's called again after flush + const resolveEntrySpy = vi.spyOn( + svc as unknown as { resolveEntry: (peerId: string) => Promise }, + 'resolveEntry', + ); + + // Second call after flush — should call resolveEntry again + await svc.capabilities(PEER_ID); + + // resolveEntry should have been called once after we started spying (post-flush) + expect(resolveEntrySpy).toHaveBeenCalledTimes(1); + + await mockAgent.close(); + }); + }); + + // ─── FederationClientError class ────────────────────────────────────────── + + describe('FederationClientError', () => { + it('is instanceof Error and FederationClientError', () => { + const err = new FederationClientError({ + code: 'PEER_NOT_FOUND', + message: 'test', + peerId: PEER_ID, + }); + expect(err).toBeInstanceOf(Error); + expect(err).toBeInstanceOf(FederationClientError); + expect(err.name).toBe('FederationClientError'); + }); + + it('carries status, code, and peerId', () => { + const err = new FederationClientError({ + status: 403, + code: 'FORBIDDEN', + message: 'forbidden', + peerId: PEER_ID, + }); + expect(err.status).toBe(403); + expect(err.code).toBe('FORBIDDEN'); + expect(err.peerId).toBe(PEER_ID); + }); + }); +}); diff --git a/apps/gateway/src/federation/client/federation-client.service.ts b/apps/gateway/src/federation/client/federation-client.service.ts new file mode 100644 index 0000000..798ab51 --- /dev/null +++ b/apps/gateway/src/federation/client/federation-client.service.ts @@ -0,0 +1,412 @@ +/** + * FederationClientService — outbound mTLS client for federation requests (FED-M3-08). + * + * Dials peer gateways over mTLS using the cert+sealed-key stored in `federation_peers`, + * invokes federation verbs (list / get / capabilities), and surfaces all failure modes + * as typed `FederationClientError` instances. + * + * ## Error code taxonomy + * + * | Code | When | + * | ------------------ | ------------------------------------------------------------- | + * | PEER_NOT_FOUND | No row in federation_peers for the given peerId | + * | PEER_INACTIVE | Peer row exists but state !== 'active' | + * | PEER_MISCONFIGURED | Peer row is active but missing endpointUrl or clientKeyPem | + * | NETWORK | undici threw a connection / TLS / timeout error | + * | HTTP_{status} | Peer returned a non-2xx response (e.g. HTTP_403, HTTP_404) | + * | FORBIDDEN | Peer returned 403 (convenience alias alongside HTTP_403) | + * | INVALID_RESPONSE | Response body failed Zod schema validation | + * + * ## Cache strategy + * + * Per-peer `undici.Agent` instances are cached in a `Map` for + * the lifetime of the service instance. The cache is keyed on peerId (UUID). + * + * Cache invalidation: + * - `flushPeer(peerId)` — removes the entry immediately. M5/M6 MUST call this on + * cert rotation or peer revocation events so the next request re-reads the DB and + * builds a fresh TLS Agent with the new cert material. + * - On cache miss: re-reads the DB, checks state === 'active', rebuilds Agent. + * + * Cache does NOT auto-expire. The service is expected to be a singleton scoped to the + * NestJS application lifecycle; flushing on revocation/rotation is the only invalidation + * path by design (avoids redundant DB round-trips on the hot path). + */ + +import { Injectable, Inject, Logger } from '@nestjs/common'; +import { Agent, fetch as undiciFetch } from 'undici'; +import type { Dispatcher } from 'undici'; +import { z } from 'zod'; +import { type Db, eq, federationPeers } from '@mosaicstack/db'; +import { + FederationListResponseSchema, + FederationGetResponseSchema, + FederationCapabilitiesResponseSchema, + FederationErrorEnvelopeSchema, + type FederationListResponse, + type FederationGetResponse, + type FederationCapabilitiesResponse, +} from '@mosaicstack/types'; +import { DB } from '../../database/database.module.js'; +import { unsealClientKey } from '../peer-key.util.js'; + +// --------------------------------------------------------------------------- +// Error taxonomy +// --------------------------------------------------------------------------- + +/** + * Client-side error code set. Distinct from the server-side `FederationErrorCode` + * (which lives in `@mosaicstack/types`) because the client has additional failure + * modes (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK) that the + * server never emits. + */ +export type FederationClientErrorCode = + | 'PEER_NOT_FOUND' + | 'PEER_INACTIVE' + | 'PEER_MISCONFIGURED' + | 'NETWORK' + | 'FORBIDDEN' + | 'INVALID_RESPONSE' + | `HTTP_${number}`; + +export interface FederationClientErrorOptions { + status?: number; + code: FederationClientErrorCode; + message: string; + peerId: string; + cause?: unknown; +} + +/** + * Thrown by FederationClientService on every failure path. + * Callers can dispatch on `error.code` for programmatic handling. + */ +export class FederationClientError extends Error { + readonly status?: number; + readonly code: FederationClientErrorCode; + readonly peerId: string; + readonly cause?: unknown; + + constructor(opts: FederationClientErrorOptions) { + super(opts.message); + this.name = 'FederationClientError'; + this.status = opts.status; + this.code = opts.code; + this.peerId = opts.peerId; + this.cause = opts.cause; + } +} + +// --------------------------------------------------------------------------- +// Internal cache types +// --------------------------------------------------------------------------- + +interface AgentCacheEntry { + agent: Agent; + endpointUrl: string; + certPem: string; + certSerial: string; +} + +// --------------------------------------------------------------------------- +// Service +// --------------------------------------------------------------------------- + +@Injectable() +export class FederationClientService { + private readonly logger = new Logger(FederationClientService.name); + + /** + * Per-peer undici Agent cache. + * Key = peerId (UUID string). + * Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6). + */ + private readonly cache = new Map(); + + constructor(@Inject(DB) private readonly db: Db) {} + + // ------------------------------------------------------------------------- + // Public verb API + // ------------------------------------------------------------------------- + + /** + * Invoke the `list` verb on a remote peer. + * + * @param peerId UUID of the peer row in `federation_peers`. + * @param resource Resource path, e.g. "tasks". + * @param request Free-form body sent as JSON in the POST body. + * @returns Parsed `FederationListResponse`. + */ + async list( + peerId: string, + resource: string, + request: Record, + ): Promise> { + const { endpointUrl, agent } = await this.resolveEntry(peerId); + const url = `${endpointUrl}/api/federation/v1/list/${encodeURIComponent(resource)}`; + const body = await this.doPost(peerId, url, agent, request); + return this.parseWith>( + peerId, + body, + FederationListResponseSchema(z.unknown()), + ); + } + + /** + * Invoke the `get` verb on a remote peer. + * + * @param peerId UUID of the peer row in `federation_peers`. + * @param resource Resource path, e.g. "tasks". + * @param id Resource identifier. + * @param request Free-form body sent as JSON in the POST body. + * @returns Parsed `FederationGetResponse`. + */ + async get( + peerId: string, + resource: string, + id: string, + request: Record, + ): Promise> { + const { endpointUrl, agent } = await this.resolveEntry(peerId); + const url = `${endpointUrl}/api/federation/v1/get/${encodeURIComponent(resource)}/${encodeURIComponent(id)}`; + const body = await this.doPost(peerId, url, agent, request); + return this.parseWith>( + peerId, + body, + FederationGetResponseSchema(z.unknown()), + ); + } + + /** + * Invoke the `capabilities` verb on a remote peer. + * + * @param peerId UUID of the peer row in `federation_peers`. + * @returns Parsed `FederationCapabilitiesResponse`. + */ + async capabilities(peerId: string): Promise { + const { endpointUrl, agent } = await this.resolveEntry(peerId); + const url = `${endpointUrl}/api/federation/v1/capabilities`; + const body = await this.doGet(peerId, url, agent); + return this.parseWith( + peerId, + body, + FederationCapabilitiesResponseSchema, + ); + } + + // ------------------------------------------------------------------------- + // Cache management + // ------------------------------------------------------------------------- + + /** + * Flush the cached Agent for a specific peer. + * + * M5/M6 MUST call this on: + * - cert rotation events (so new cert material is picked up) + * - peer revocation events (so future requests fail at PEER_INACTIVE) + * + * After flushing, the next call to `list`, `get`, or `capabilities` for + * this peer will re-read the DB and rebuild the Agent. + */ + flushPeer(peerId: string): void { + if (this.cache.has(peerId)) { + this.cache.delete(peerId); + this.logger.log(`Cache flushed for peer ${peerId}`); + } + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + /** + * Resolve the cache entry for a peer, reading DB on miss. + * + * Throws `FederationClientError` with appropriate code if the peer is not + * found, is inactive, or is missing required fields. + */ + private async resolveEntry(peerId: string): Promise { + const cached = this.cache.get(peerId); + if (cached) { + return cached; + } + + // DB lookup + const [peer] = await this.db + .select() + .from(federationPeers) + .where(eq(federationPeers.id, peerId)) + .limit(1); + + if (!peer) { + throw new FederationClientError({ + code: 'PEER_NOT_FOUND', + message: `Federation peer ${peerId} not found`, + peerId, + }); + } + + if (peer.state !== 'active') { + throw new FederationClientError({ + code: 'PEER_INACTIVE', + message: `Federation peer ${peerId} is not active (state: ${peer.state})`, + peerId, + }); + } + + if (!peer.endpointUrl || !peer.clientKeyPem) { + throw new FederationClientError({ + code: 'PEER_MISCONFIGURED', + message: `Federation peer ${peerId} is missing endpointUrl or clientKeyPem`, + peerId, + }); + } + + // Unseal the private key + let privateKeyPem: string; + try { + privateKeyPem = unsealClientKey(peer.clientKeyPem); + } catch (err) { + throw new FederationClientError({ + code: 'PEER_MISCONFIGURED', + message: `Failed to unseal client key for peer ${peerId}`, + peerId, + cause: err, + }); + } + + // Build mTLS agent + const agent = new Agent({ + connect: { + cert: peer.certPem, + key: privateKeyPem, + // rejectUnauthorized: true is the undici default for HTTPS + }, + }); + + const entry: AgentCacheEntry = { + agent, + endpointUrl: peer.endpointUrl, + certPem: peer.certPem, + certSerial: peer.certSerial, + }; + + this.cache.set(peerId, entry); + this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`); + + return entry; + } + + /** + * Execute a POST request with a JSON body. + * Returns the parsed response body as an unknown value. + * Throws `FederationClientError` on network errors and non-2xx responses. + */ + private async doPost( + peerId: string, + url: string, + agent: Dispatcher, + body: Record, + ): Promise { + return this.doRequest(peerId, url, agent, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }); + } + + /** + * Execute a GET request. + * Returns the parsed response body as an unknown value. + * Throws `FederationClientError` on network errors and non-2xx responses. + */ + private async doGet(peerId: string, url: string, agent: Dispatcher): Promise { + return this.doRequest(peerId, url, agent, { method: 'GET' }); + } + + private async doRequest( + peerId: string, + url: string, + agent: Dispatcher, + init: { method: string; headers?: Record; body?: string }, + ): Promise { + let response: Awaited>; + + try { + response = await undiciFetch(url, { + ...init, + dispatcher: agent, + }); + } catch (err) { + throw new FederationClientError({ + code: 'NETWORK', + message: `Network error calling peer ${peerId} at ${url}: ${err instanceof Error ? err.message : String(err)}`, + peerId, + cause: err, + }); + } + + const rawBody = await response.text().catch(() => ''); + + if (!response.ok) { + const status = response.status; + + // Attempt to parse as federation error envelope + let serverMessage = `HTTP ${status}`; + try { + const json: unknown = JSON.parse(rawBody); + const result = FederationErrorEnvelopeSchema.safeParse(json); + if (result.success) { + serverMessage = result.data.error.message; + } + } catch { + // Not valid JSON or not a federation envelope — use generic message + } + + // Specific code for 403 (most actionable for callers); generic HTTP_{n} for others + const code: FederationClientErrorCode = status === 403 ? 'FORBIDDEN' : `HTTP_${status}`; + + throw new FederationClientError({ + status, + code, + message: `Peer ${peerId} returned ${status}: ${serverMessage}`, + peerId, + }); + } + + try { + return JSON.parse(rawBody) as unknown; + } catch (err) { + throw new FederationClientError({ + code: 'INVALID_RESPONSE', + message: `Peer ${peerId} returned non-JSON body`, + peerId, + cause: err, + }); + } + } + + /** + * Parse and validate a response body against a Zod schema. + * + * For list/get, callers pass the result of `FederationListResponseSchema(z.unknown())` + * so that the envelope structure is validated without requiring a concrete item schema + * at the client level. The generic `T` provides compile-time typing. + * + * Throws `FederationClientError({ code: 'INVALID_RESPONSE' })` on parse failure. + */ + private parseWith(peerId: string, body: unknown, schema: z.ZodTypeAny): T { + const result = schema.safeParse(body); + if (!result.success) { + const issues = result.error.issues + .map((e: z.ZodIssue) => `[${e.path.join('.') || 'root'}] ${e.message}`) + .join('; '); + throw new FederationClientError({ + code: 'INVALID_RESPONSE', + message: `Peer ${peerId} returned invalid response shape: ${issues}`, + peerId, + }); + } + return result.data as T; + } +} diff --git a/apps/gateway/src/federation/client/index.ts b/apps/gateway/src/federation/client/index.ts new file mode 100644 index 0000000..18bc610 --- /dev/null +++ b/apps/gateway/src/federation/client/index.ts @@ -0,0 +1,13 @@ +/** + * Federation client barrel — re-exports for FederationModule consumers. + * + * M3-09 (QuerySourceService) and future milestones should import from here, + * not directly from the implementation file. + */ + +export { + FederationClientService, + FederationClientError, + type FederationClientErrorCode, + type FederationClientErrorOptions, +} from './federation-client.service.js'; diff --git a/apps/gateway/src/federation/federation.module.ts b/apps/gateway/src/federation/federation.module.ts index ebb6295..946b57c 100644 --- a/apps/gateway/src/federation/federation.module.ts +++ b/apps/gateway/src/federation/federation.module.ts @@ -5,10 +5,11 @@ import { EnrollmentController } from './enrollment.controller.js'; import { EnrollmentService } from './enrollment.service.js'; import { FederationController } from './federation.controller.js'; import { GrantsService } from './grants.service.js'; +import { FederationClientService } from './client/index.js'; @Module({ controllers: [EnrollmentController, FederationController], - providers: [AdminGuard, CaService, EnrollmentService, GrantsService], - exports: [CaService, EnrollmentService, GrantsService], + providers: [AdminGuard, CaService, EnrollmentService, GrantsService, FederationClientService], + exports: [CaService, EnrollmentService, GrantsService, FederationClientService], }) export class FederationModule {}