diff --git a/apps/gateway/src/federation/client/__tests__/query-source.service.spec.ts b/apps/gateway/src/federation/client/__tests__/query-source.service.spec.ts new file mode 100644 index 0000000..c691c26 --- /dev/null +++ b/apps/gateway/src/federation/client/__tests__/query-source.service.spec.ts @@ -0,0 +1,255 @@ +import 'reflect-metadata'; +import { describe, expect, it, vi } from 'vitest'; +import type { Db } from '@mosaicstack/db'; +import type { FederationListResponse } from '@mosaicstack/types'; +import { + FederationClientError, + type FederationClientService, +} from '../federation-client.service.js'; +import { type QuerySourceError, QuerySourceService } from '../query-source.service.js'; + +interface TestRow { + id: string; + title: string; +} + +interface PeerRow { + id: string; + commonName: string; + endpointUrl: string | null; + clientKeyPem: string | null; + state: 'active' | 'pending' | 'suspended' | 'revoked'; +} + +const LOCAL_ROWS: TestRow[] = [ + { id: 'local-1', title: 'Local One' }, + { id: 'local-2', title: 'Local Two' }, +]; + +const PEER_A: PeerRow = { + id: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', + commonName: 'peer-a', + endpointUrl: 'https://peer-a.example.com', + clientKeyPem: 'sealed-key-a', + state: 'active', +}; + +const PEER_B: PeerRow = { + id: 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', + commonName: 'peer-b', + endpointUrl: 'https://peer-b.example.com', + clientKeyPem: 'sealed-key-b', + state: 'active', +}; + +const PEER_LOCALHOST: PeerRow = { + id: 'cccccccc-cccc-cccc-cccc-cccccccccccc', + commonName: 'peer-localhost', + endpointUrl: 'https://localhost:3001', + clientKeyPem: 'sealed-key-c', + state: 'active', +}; + +function makeDb(activePeers: PeerRow[]): Db { + const orderBy = vi.fn().mockResolvedValue(activePeers); + const where = vi.fn().mockReturnValue({ orderBy }); + const from = vi.fn().mockReturnValue({ where }); + const select = vi.fn().mockReturnValue({ from }); + + return { + select, + insert: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + transaction: vi.fn(), + } as unknown as Db; +} + +function makeFederationClient( + list: ( + peerId: string, + resource: string, + request: Record, + ) => Promise>, +): FederationClientService { + return { + list: list as unknown as FederationClientService['list'], + } as FederationClientService; +} + +function makeLocalResponse(rows: TestRow[] = LOCAL_ROWS): Promise> { + return Promise.resolve({ items: rows }); +} + +describe('QuerySourceService', () => { + it('routes source="local" to the local executor and tags rows as local', async () => { + const list = vi.fn(async (): Promise> => ({ items: [] })); + const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); + + const result = await service.list({ + source: 'local', + resource: 'tasks', + request: { cursor: 'ignored-for-local-test' }, + local: () => makeLocalResponse(), + }); + + expect(result).toEqual({ + items: [ + { id: 'local-1', title: 'Local One', _source: 'local' }, + { id: 'local-2', title: 'Local Two', _source: 'local' }, + ], + }); + expect(list).not.toHaveBeenCalled(); + }); + + it('routes source="federated:" to the matching active peer and tags rows with peer commonName', async () => { + const list = vi.fn( + async (): Promise> => ({ + items: [{ id: 'remote-1', title: 'Remote One' }], + }), + ); + const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); + + const result = await service.list({ + source: 'federated:peer-b.example.com', + resource: 'tasks', + request: { status: 'open' }, + local: () => makeLocalResponse(), + }); + + expect(result).toEqual({ + items: [{ id: 'remote-1', title: 'Remote One', _source: 'peer-b' }], + }); + expect(list).toHaveBeenCalledWith(PEER_B.id, 'tasks', { status: 'open' }); + }); + + it('matches federated hosts by endpoint host including non-default port', async () => { + const list = vi.fn( + async (): Promise> => ({ + items: [{ id: 'remote-port', title: 'Remote Port' }], + }), + ); + const service = new QuerySourceService(makeDb([PEER_LOCALHOST]), makeFederationClient(list)); + + const result = await service.list({ + source: 'federated:localhost:3001', + resource: 'tasks', + request: {}, + local: () => makeLocalResponse(), + }); + + expect(result).toEqual({ + items: [{ id: 'remote-port', title: 'Remote Port', _source: 'peer-localhost' }], + }); + expect(list).toHaveBeenCalledWith(PEER_LOCALHOST.id, 'tasks', {}); + }); + + it('fans out source="all" to local plus every active outbound peer in parallel and merges tagged rows', async () => { + const callOrder: string[] = []; + const list = vi.fn(async (peerId: string): Promise> => { + callOrder.push(`remote-start:${peerId}`); + await Promise.resolve(); + return { + items: [{ id: `remote-${peerId.slice(0, 1)}`, title: `Remote ${peerId.slice(0, 1)}` }], + }; + }); + const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); + + const result = await service.list({ + source: 'all', + resource: 'tasks', + request: { limit: 25 }, + local: async () => { + callOrder.push('local-start'); + await Promise.resolve(); + return { items: [{ id: 'local-1', title: 'Local One' }] }; + }, + }); + + expect(result).toEqual({ + items: [ + { id: 'local-1', title: 'Local One', _source: 'local' }, + { id: 'remote-a', title: 'Remote a', _source: 'peer-a' }, + { id: 'remote-b', title: 'Remote b', _source: 'peer-b' }, + ], + }); + expect(list).toHaveBeenCalledTimes(2); + expect(callOrder).toEqual([ + 'local-start', + `remote-start:${PEER_A.id}`, + `remote-start:${PEER_B.id}`, + ]); + }); + + it('marks source="all" as partial and truncated when any subquery returns a cursor', async () => { + const list = vi.fn( + async (): Promise> => ({ + items: [{ id: 'remote-a', title: 'Remote A' }], + nextCursor: 'remote-next', + }), + ); + const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); + + const result = await service.list({ + source: 'all', + resource: 'tasks', + request: {}, + local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]), + }); + + expect(result).toEqual({ + items: [ + { id: 'local-1', title: 'Local One', _source: 'local' }, + { id: 'remote-a', title: 'Remote A', _source: 'peer-a' }, + ], + _partial: true, + _truncated: true, + }); + }); + + it('returns _partial=true for source="all" when one peer fails without dropping successful sources', async () => { + const list = vi.fn(async (peerId: string): Promise> => { + if (peerId === PEER_B.id) { + throw new FederationClientError({ + code: 'NETWORK', + message: 'peer unavailable', + peerId, + }); + } + return { items: [{ id: 'remote-a', title: 'Remote A' }] }; + }); + const service = new QuerySourceService(makeDb([PEER_A, PEER_B]), makeFederationClient(list)); + + const result = await service.list({ + source: 'all', + resource: 'tasks', + request: {}, + local: () => makeLocalResponse([{ id: 'local-1', title: 'Local One' }]), + }); + + expect(result).toEqual({ + items: [ + { id: 'local-1', title: 'Local One', _source: 'local' }, + { id: 'remote-a', title: 'Remote A', _source: 'peer-a' }, + ], + _partial: true, + }); + }); + + it('throws QuerySourceError when a federated host does not match an active outbound peer', async () => { + const list = vi.fn(async (): Promise> => ({ items: [] })); + const service = new QuerySourceService(makeDb([PEER_A]), makeFederationClient(list)); + + await expect( + service.list({ + source: 'federated:missing.example.com', + resource: 'tasks', + request: {}, + local: () => makeLocalResponse(), + }), + ).rejects.toMatchObject({ + name: 'QuerySourceError', + code: 'PEER_NOT_FOUND', + } satisfies Partial); + }); +}); diff --git a/apps/gateway/src/federation/client/index.ts b/apps/gateway/src/federation/client/index.ts index 18bc610..89bbb12 100644 --- a/apps/gateway/src/federation/client/index.ts +++ b/apps/gateway/src/federation/client/index.ts @@ -11,3 +11,13 @@ export { type FederationClientErrorCode, type FederationClientErrorOptions, } from './federation-client.service.js'; +export { + QuerySourceService, + QuerySourceError, + type QuerySource, + type QuerySourceErrorCode, + type QuerySourceErrorOptions, + type QuerySourceListOptions, + type QuerySourceListResponse, + type LocalListExecutor, +} from './query-source.service.js'; diff --git a/apps/gateway/src/federation/client/query-source.service.ts b/apps/gateway/src/federation/client/query-source.service.ts new file mode 100644 index 0000000..0421a3e --- /dev/null +++ b/apps/gateway/src/federation/client/query-source.service.ts @@ -0,0 +1,261 @@ +/** + * QuerySourceService — gateway query source router (FED-M3-09). + * + * Accepts the federation query-layer `source` selector and routes list-style + * reads to local storage, one federated peer, or all active outbound peers. + * + * `source: "all"` is intentionally tolerant of per-peer failures: local data + * and successful peer responses are returned, and the envelope is marked + * `_partial: true`. Local failures still reject because there is no safe local + * fallback and the gateway's own storage is expected to be authoritative. + */ + +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { and, eq, federationPeers, isNotNull, type Db } from '@mosaicstack/db'; +import { + SOURCE_LOCAL, + tagWithSource, + type FederationListResponse, + type SourceTag, +} from '@mosaicstack/types'; +import { DB } from '../../database/database.module.js'; +import { FederationClientService } from './federation-client.service.js'; + +export type QuerySource = 'local' | 'all' | `federated:${string}`; + +export type QuerySourceErrorCode = 'INVALID_SOURCE' | 'PEER_NOT_FOUND'; + +export interface QuerySourceErrorOptions { + code: QuerySourceErrorCode; + message: string; + source: string; +} + +export class QuerySourceError extends Error { + readonly code: QuerySourceErrorCode; + readonly source: string; + + constructor(opts: QuerySourceErrorOptions) { + super(opts.message); + this.name = 'QuerySourceError'; + this.code = opts.code; + this.source = opts.source; + } +} + +export type LocalListExecutor = () => Promise | T[]>; + +export interface QuerySourceListOptions { + source: QuerySource; + resource: string; + request?: Record; + local: LocalListExecutor; +} + +export type QuerySourceListResponse = FederationListResponse; + +interface OutboundPeer { + id: string; + commonName: string; + endpointUrl: string; +} + +interface TaggedList { + items: Array; + partial: boolean; + truncated: boolean; + nextCursor?: string; +} + +@Injectable() +export class QuerySourceService { + private readonly logger = new Logger(QuerySourceService.name); + + constructor( + @Inject(DB) private readonly db: Db, + @Inject(FederationClientService) private readonly federationClient: FederationClientService, + ) {} + + async list( + options: QuerySourceListOptions, + ): Promise> { + const request = options.request ?? {}; + + if (options.source === 'local') { + const local = await this.runLocal(options.local); + return this.toResponse(this.tagList(local, SOURCE_LOCAL)); + } + + if (options.source === 'all') { + return this.listAll(options.resource, request, options.local); + } + + if (options.source.startsWith('federated:')) { + const host = options.source.slice('federated:'.length).trim(); + if (!host) { + throw new QuerySourceError({ + code: 'INVALID_SOURCE', + message: 'Federated source must include a host after federated:', + source: options.source, + }); + } + + const peer = await this.findPeerByHost(host, options.source); + const remote = await this.federationClient.list(peer.id, options.resource, request); + return this.toResponse(this.tagList(remote, peer.commonName)); + } + + throw new QuerySourceError({ + code: 'INVALID_SOURCE', + message: `Unsupported query source: ${options.source}`, + source: options.source, + }); + } + + private async listAll( + resource: string, + request: Record, + local: LocalListExecutor, + ): Promise> { + const peers = await this.listActiveOutboundPeers(); + + const localPromise = this.runLocal(local).then((response) => + this.tagList(response, SOURCE_LOCAL), + ); + const remotePromises = peers.map(async (peer: OutboundPeer): Promise | null> => { + try { + const response = await this.federationClient.list(peer.id, resource, request); + return this.tagList(response, peer.commonName); + } catch (error: unknown) { + this.logger.warn( + `Federated query to peer ${peer.commonName} (${peer.id}) failed; returning partial all-source response: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + return null; + } + }); + + const [localResult, ...remoteResults] = await Promise.all([localPromise, ...remotePromises]); + const successfulRemoteResults = remoteResults.filter( + (result: TaggedList | null): result is TaggedList => result !== null, + ); + const allResults = [localResult, ...successfulRemoteResults]; + const peerFailure = successfulRemoteResults.length !== peers.length; + + return this.mergeTaggedLists(allResults, peerFailure); + } + + private async runLocal( + local: LocalListExecutor, + ): Promise> { + const response = await local(); + if (Array.isArray(response)) { + return { items: response }; + } + return response; + } + + private tagList( + response: FederationListResponse, + source: string, + ): TaggedList { + return { + items: tagWithSource(response.items, source), + partial: response._partial === true, + truncated: response._truncated === true || response.nextCursor !== undefined, + nextCursor: response.nextCursor, + }; + } + + private mergeTaggedLists( + lists: Array>, + peerFailure: boolean, + ): QuerySourceListResponse { + const items = lists.flatMap((list: TaggedList) => list.items); + const partial = + peerFailure || + lists.some((list: TaggedList) => list.partial || list.nextCursor !== undefined); + const truncated = lists.some((list: TaggedList) => list.truncated); + + const response: QuerySourceListResponse = { items }; + if (partial) { + response._partial = true; + } + if (truncated) { + response._truncated = true; + } + return response; + } + + private toResponse(tagged: TaggedList): QuerySourceListResponse { + const response: QuerySourceListResponse = { + items: tagged.items, + }; + if (tagged.nextCursor !== undefined) { + response.nextCursor = tagged.nextCursor; + } + if (tagged.partial) { + response._partial = true; + } + if (tagged.truncated) { + response._truncated = true; + } + return response; + } + + private async findPeerByHost(sourceHost: string, source: string): Promise { + const host = normalizeHost(sourceHost); + const peers = await this.listActiveOutboundPeers(); + const peer = peers.find((candidate: OutboundPeer) => { + const commonName = normalizeHost(candidate.commonName); + const endpointHosts = endpointHostKeys(candidate.endpointUrl).map((endpointHost: string) => + normalizeHost(endpointHost), + ); + return commonName === host || endpointHosts.includes(host); + }); + + if (!peer) { + throw new QuerySourceError({ + code: 'PEER_NOT_FOUND', + message: `No active outbound federation peer matches source ${source}`, + source, + }); + } + + return peer; + } + + private async listActiveOutboundPeers(): Promise { + const rows = await this.db + .select({ + id: federationPeers.id, + commonName: federationPeers.commonName, + endpointUrl: federationPeers.endpointUrl, + }) + .from(federationPeers) + .where( + and( + eq(federationPeers.state, 'active'), + isNotNull(federationPeers.endpointUrl), + isNotNull(federationPeers.clientKeyPem), + ), + ) + .orderBy(federationPeers.commonName); + + return rows.filter((row): row is OutboundPeer => typeof row.endpointUrl === 'string'); + } +} + +function normalizeHost(host: string): string { + return host.trim().toLowerCase(); +} + +function endpointHostKeys(endpointUrl: string): string[] { + try { + const url = new URL(endpointUrl); + return Array.from(new Set([url.host, url.hostname].filter((host: string) => host.length > 0))); + } catch { + return []; + } +} diff --git a/apps/gateway/src/federation/federation.module.ts b/apps/gateway/src/federation/federation.module.ts index a2e01ef..f85599f 100644 --- a/apps/gateway/src/federation/federation.module.ts +++ b/apps/gateway/src/federation/federation.module.ts @@ -5,7 +5,7 @@ import { EnrollmentController } from './enrollment.controller.js'; import { EnrollmentService } from './enrollment.service.js'; import { FederationController } from './federation.controller.js'; import { GrantsService } from './grants.service.js'; -import { FederationClientService } from './client/index.js'; +import { FederationClientService, QuerySourceService } from './client/index.js'; import { FederationAuthGuard } from './server/index.js'; @Module({ @@ -16,6 +16,7 @@ import { FederationAuthGuard } from './server/index.js'; EnrollmentService, GrantsService, FederationClientService, + QuerySourceService, FederationAuthGuard, ], exports: [ @@ -23,6 +24,7 @@ import { FederationAuthGuard } from './server/index.js'; EnrollmentService, GrantsService, FederationClientService, + QuerySourceService, FederationAuthGuard, ], }) diff --git a/docs/scratchpads/FED-M3-09-query-source.md b/docs/scratchpads/FED-M3-09-query-source.md new file mode 100644 index 0000000..b2db65f --- /dev/null +++ b/docs/scratchpads/FED-M3-09-query-source.md @@ -0,0 +1,67 @@ +# FED-M3-09 — Query Source Service Scratchpad + +## Objective + +Implement `apps/gateway/src/federation/client/query-source.service.ts` for `source: "local" | "federated:" | "all"` routing. + +## Scope + +- Add QuerySourceService in gateway federation client layer. +- Unit-test local-only, single federated peer, all-source fan-out/merge, and per-peer partial failures. +- Keep `docs/federation/TASKS.md` read-only per project agent guidance. + +## Constraints / assumptions + +- Issue: #462. +- Branch: `feat/federation-m3-query-source` from `origin/main` (`e0e7be70`). +- ASSUMPTION: `federated:` should match active outbound peers by `commonName` first and by `endpointUrl` host/hostname as compatibility fallback; source tags use `peer.commonName` per `@mosaicstack/types` source-tag docs. +- ASSUMPTION: QuerySourceService provides list/fan-out behavior; get/source routing can be layered later because card acceptance says merge rows. +- ASSUMPTION: `source: "all"` cannot safely return a single continuation cursor for multiple sub-sources; any subquery cursor marks the merged response `_partial: true` + `_truncated: true` while omitting `nextCursor`. +- Budget: no explicit hard cap from orchestrator; working cap ~8K-12K tokens for card 1 implementation + tests + PR cycle. +- OpenBrain unavailable: credential loader failed with missing `/home/jarvis/.config/mosaic/credentials.json`; not blocking code delivery. + +## Plan + +1. Review federation client/types/db patterns. +2. Write unit tests for source behavior. +3. Implement QuerySourceService and export/register it in FederationModule. +4. Run scoped tests, typecheck, lint, format. +5. Run codex uncommitted review and remediate. +6. Commit, queue guard, push, PR via wrapper. + +## Progress + +- 2026-06-24: Intake complete; using isolated worktree to avoid dirty orchestrator files in original checkout. +- 2026-06-24: Added QuerySourceService, module export, barrel export, and 7 unit tests. +- 2026-06-24: First Codex review found pagination and port-host matching issues; both remediated with tests. + +## Tests run + +- `pnpm --filter @mosaicstack/gateway test -- query-source.service.spec.ts` — PASS (7 tests). +- `pnpm --filter @mosaicstack/gateway typecheck` — PASS. +- `pnpm --filter @mosaicstack/gateway lint` — PASS. +- `pnpm format:check` — PASS. +- `pnpm typecheck` — PASS (41/41 turbo tasks). +- `pnpm lint` — PASS (23/23 turbo tasks). +- `pnpm test` — FAIL in pre-existing/live-DB integration suite: `apps/gateway/src/__tests__/cross-user-isolation.test.ts` cleanup hit `relation "messages" does not exist` against local PostgreSQL. Changed QuerySource unit tests passed; failure is outside FED-M3-09 surface and appears tied to local DB schema state. + +## Review evidence + +- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` — first pass request-changes, 2 should-fix findings (all-source cursor handling; endpoint port host matching). +- Remediation: `_partial` + `_truncated` when any all-source subquery has `nextCursor`; endpoint match accepts URL `host` and `hostname`; added tests for both. +- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` — PASS/approve, no findings. +- `~/.config/mosaic/tools/codex/codex-security-review.sh --uncommitted` — PASS, risk level none, no findings. + +## Risks / blockers + +- Federation query layer is not yet wired; service API needs to be stable and easy to compose. +- Must avoid hard-failing `source: all` on remote peer failures. + +## Acceptance evidence mapping + +| Acceptance criterion | Evidence | +| ------------------------------------------------------------------------------- | --------------------------------------------------------------------------------- | +| local source returns local rows tagged `_source: local` | `query-source.service.spec.ts` local test; scoped test PASS | +| `federated:` queries selected peer and tags rows with peer source | `query-source.service.spec.ts` commonName/endpoint-host tests; scoped test PASS | +| `all` fans out local + active outbound peers in parallel and merges tagged rows | `query-source.service.spec.ts` all-source call-order/merge test; scoped test PASS | +| per-peer failure on `all` returns `_partial: true`, not throw | `query-source.service.spec.ts` peer failure test; scoped test PASS |