Compare commits

..

2 Commits

Author SHA1 Message Date
Jarvis
71c7b85026 fix(federation/auth-guard): remediate CRIT-1/CRIT-2 + HIGH-1..4 review findings
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
- CRIT-1: Validate cert subjectUserId against grant.subjectUserId from DB;
  use authoritative DB value in FederationContext
- CRIT-2: Add @Inject(GrantsService) decorator (tsx/esbuild requirement)
- HIGH-1: Validate UTF8String TLV tag, length, and bounds in OID parser
- HIGH-2: Collapse all 403 wire messages to a generic string to prevent
  grant enumeration; keep internal logger detail
- HIGH-3: Assert federation wire envelope shape in all guard tests
- HIGH-4: Regression test for subjectUserId cert/DB mismatch

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 22:45:35 -05:00
Jarvis
53ee36239b feat(federation): mTLS AuthGuard with OID-based grant resolution (FED-M3-03)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
Adds FederationAuthGuard that validates inbound mTLS client certs on
federation API routes. Extracts custom OIDs (grantId, subjectUserId),
loads the grant+peer from DB in one query, asserts active status, and
validates cert serial as defense-in-depth. Attaches FederationContext
to requests on success and uses federation wire-format error envelopes
(not raw NestJS exceptions) for 401/403 responses.

New files:
- apps/gateway/src/federation/oid.util.ts — shared OID extraction (no dupe ASN.1 logic)
- apps/gateway/src/federation/server/federation-auth.guard.ts — guard impl
- apps/gateway/src/federation/server/federation-context.ts — FederationContext type + module augment
- apps/gateway/src/federation/server/index.ts — barrel export
- apps/gateway/src/federation/server/__tests__/federation-auth.guard.spec.ts — 11 unit tests

Modified:
- apps/gateway/src/federation/grants.service.ts — adds getGrantWithPeer() with join
- apps/gateway/src/federation/federation.module.ts — registers FederationAuthGuard as provider

Closes #462

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 22:17:54 -05:00
22 changed files with 160 additions and 1707 deletions

View File

@@ -73,7 +73,6 @@
"rxjs": "^7.8.0",
"socket.io": "^4.8.0",
"uuid": "^11.0.0",
"undici": "^7.24.6",
"zod": "^4.3.6"
},
"devDependencies": {

View File

@@ -1,21 +1,8 @@
import { mkdirSync } from 'node:fs';
import { homedir } from 'node:os';
import { join } from 'node:path';
import {
Global,
Inject,
Logger,
Module,
type OnApplicationShutdown,
type OnModuleInit,
} from '@nestjs/common';
import {
createDb,
createPgliteDb,
runPgliteMigrations,
type Db,
type DbHandle,
} from '@mosaicstack/db';
import { Global, Inject, Module, type OnApplicationShutdown } from '@nestjs/common';
import { createDb, createPgliteDb, type Db, type DbHandle } from '@mosaicstack/db';
import { createStorageAdapter, type StorageAdapter } from '@mosaicstack/storage';
import type { MosaicConfig } from '@mosaicstack/config';
import { MOSAIC_CONFIG } from '../config/config.module.js';
@@ -52,37 +39,12 @@ export const STORAGE_ADAPTER = 'STORAGE_ADAPTER';
],
exports: [DB, STORAGE_ADAPTER],
})
export class DatabaseModule implements OnApplicationShutdown, OnModuleInit {
private readonly logger = new Logger(DatabaseModule.name);
export class DatabaseModule implements OnApplicationShutdown {
constructor(
@Inject(DB_HANDLE) private readonly handle: DbHandle,
@Inject(STORAGE_ADAPTER) private readonly storageAdapter: StorageAdapter,
@Inject(MOSAIC_CONFIG) private readonly config: MosaicConfig,
) {}
// Migrations must complete before any module that injects DB starts serving
// requests. NestJS awaits onModuleInit before app.listen(), and modules that
// inject DB are initialized after this one — so all DB-dependent code sees a
// populated schema before the first HTTP request lands.
//
// Local (PGlite) tier: we run gateway-DB migrations explicitly here. The
// storage adapter writes to a separate PGlite directory and only manages its
// own KV tables, so we still call its migrate() afterwards.
//
// Postgres tier: PostgresAdapter.migrate() already calls runMigrations() on
// the same DATABASE_URL, so a single call covers both the gateway DB and
// the storage tables. We deliberately do NOT call runMigrations() here to
// avoid opening a second short-lived connection and doubling startup cost.
async onModuleInit(): Promise<void> {
if (this.config.tier === 'local') {
this.logger.log('Applying PGlite schema migrations...');
await runPgliteMigrations(this.handle);
}
this.logger.log(`Initializing storage adapter (${this.storageAdapter.name})...`);
await this.storageAdapter.migrate();
}
async onApplicationShutdown(): Promise<void> {
await Promise.all([this.handle.close(), this.storageAdapter.close()]);
}

View File

@@ -1,553 +0,0 @@
/**
* Unit tests for FederationClientService (FED-M3-08).
*
* HTTP mocking strategy:
* undici MockAgent is used to intercept outbound HTTP requests. The service
* uses `undici.fetch` with a `dispatcher` option, so MockAgent is set as the
* global dispatcher and all requests flow through it.
*
* Because the service builds one `undici.Agent` per peer and passes it as
* the dispatcher on every fetch call, we cannot intercept at the Agent level
* in unit tests without significant refactoring. Instead, we set the global
* dispatcher to a MockAgent and override the service's `doRequest` indirection
* by spying on the internal fetch call.
*
* For the cert/key wiring, we use the real `sealClientKey` function from
* peer-key.util.ts with a test secret — no stubs.
*
* Sealed-key setup:
* Each test (or beforeAll) calls `sealClientKey(TEST_PRIVATE_KEY_PEM)` with
* BETTER_AUTH_SECRET set to a deterministic test value so that
* `unsealClientKey` in the service recovers the original PEM.
*/
import 'reflect-metadata';
import { describe, it, expect, vi, beforeEach, afterEach, beforeAll, afterAll } from 'vitest';
import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici';
import type { Dispatcher } from 'undici';
import { writeFileSync, unlinkSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { Db } from '@mosaicstack/db';
import { FederationClientService, FederationClientError } from '../federation-client.service.js';
import { sealClientKey } from '../../peer-key.util.js';
// ---------------------------------------------------------------------------
// Test constants
// ---------------------------------------------------------------------------
const TEST_SECRET = 'test-secret-for-federation-client-spec-only';
const PEER_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa';
const ENDPOINT = 'https://peer.example.com';
// Minimal valid RSA/EC private key PEM — does NOT need to be a real key for
// unit tests because we only verify it round-trips through seal/unseal, not
// that it actually negotiates TLS (MockAgent handles that).
const TEST_PRIVATE_KEY_PEM = `-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDummyKeyForTests
-----END PRIVATE KEY-----`;
// Minimal self-signed cert PEM (dummy — only used for mTLS Agent construction)
const TEST_CERT_PEM = `-----BEGIN CERTIFICATE-----
MIIBdummyCertForFederationClientTests==
-----END CERTIFICATE-----`;
const TEST_CERT_SERIAL = 'ABCDEF1234567890';
// ---------------------------------------------------------------------------
// Sealed key (computed once in beforeAll)
// ---------------------------------------------------------------------------
let SEALED_KEY: string;
// Path to a stub Step-CA root cert file written in beforeAll. The cert is never
// actually used to negotiate TLS in unit tests (MockAgent + spy on resolveEntry
// short-circuit the network), but loadStepCaRoot() requires the file to exist.
const STUB_CA_PEM_PATH = join(tmpdir(), 'federation-client-spec-ca.pem');
const STUB_CA_PEM = `-----BEGIN CERTIFICATE-----
MIIBdummyCAforFederationClientSpecOnly==
-----END CERTIFICATE-----
`;
// ---------------------------------------------------------------------------
// Peer row factory
// ---------------------------------------------------------------------------
function makePeerRow(overrides: Partial<Record<string, unknown>> = {}) {
return {
id: PEER_ID,
commonName: 'peer-example-com',
displayName: 'Test Peer',
certPem: TEST_CERT_PEM,
certSerial: TEST_CERT_SERIAL,
certNotAfter: new Date('2030-01-01T00:00:00Z'),
clientKeyPem: SEALED_KEY,
state: 'active' as const,
endpointUrl: ENDPOINT,
lastSeenAt: null,
createdAt: new Date('2026-01-01T00:00:00Z'),
revokedAt: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Mock DB builder
// ---------------------------------------------------------------------------
function makeDb(selectRows: unknown[] = [makePeerRow()]): Db {
const limitSelect = vi.fn().mockResolvedValue(selectRows);
const whereSelect = vi.fn().mockReturnValue({ limit: limitSelect });
const fromSelect = vi.fn().mockReturnValue({ where: whereSelect });
const selectMock = vi.fn().mockReturnValue({ from: fromSelect });
return {
select: selectMock,
insert: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
transaction: vi.fn(),
} as unknown as Db;
}
// ---------------------------------------------------------------------------
// Helpers for MockAgent HTTP interception
// ---------------------------------------------------------------------------
/**
* Create a MockAgent + MockPool for the peer endpoint, set it as the global
* dispatcher, and return both for per-test configuration.
*/
function makeMockAgent() {
const mockAgent = new MockAgent({ connections: 1 });
mockAgent.disableNetConnect();
setGlobalDispatcher(mockAgent);
const pool = mockAgent.get(ENDPOINT);
return { mockAgent, pool };
}
/**
* Build a FederationClientService with a mock DB and a spy on the internal
* fetch so we can intercept at the HTTP layer via MockAgent.
*
* The service calls `fetch(url, { dispatcher: agent })` where `agent` is the
* mTLS undici.Agent built from the peer's cert+key. To make MockAgent work,
* we need the fetch dispatcher to be the MockAgent, not the per-peer Agent.
*
* Strategy: we replace the private `resolveEntry` result's `agent` field with
* the MockAgent's pool, so fetch uses our interceptor. We do this by spying
* on `resolveEntry` and returning a controlled entry.
*/
function makeService(db: Db, mockPool: Dispatcher): FederationClientService {
const svc = new FederationClientService(db);
// Override resolveEntry to inject MockAgent pool as the dispatcher
vi.spyOn(
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> },
'resolveEntry',
).mockImplementation(async (_peerId: string) => {
// Still call DB (via the real logic) to exercise peer validation,
// but return mock pool as the agent.
// For simplicity in unit tests, directly return a controlled entry.
return {
agent: mockPool,
endpointUrl: ENDPOINT,
certPem: TEST_CERT_PEM,
certSerial: TEST_CERT_SERIAL,
};
});
return svc;
}
// ---------------------------------------------------------------------------
// Test setup
// ---------------------------------------------------------------------------
let originalDispatcher: Dispatcher;
beforeAll(() => {
// Seal the test key once — requires BETTER_AUTH_SECRET
const saved = process.env['BETTER_AUTH_SECRET'];
process.env['BETTER_AUTH_SECRET'] = TEST_SECRET;
try {
SEALED_KEY = sealClientKey(TEST_PRIVATE_KEY_PEM);
} finally {
if (saved === undefined) {
delete process.env['BETTER_AUTH_SECRET'];
} else {
process.env['BETTER_AUTH_SECRET'] = saved;
}
}
writeFileSync(STUB_CA_PEM_PATH, STUB_CA_PEM, 'utf8');
});
afterAll(() => {
try {
unlinkSync(STUB_CA_PEM_PATH);
} catch {
// best-effort cleanup
}
});
beforeEach(() => {
originalDispatcher = getGlobalDispatcher();
process.env['BETTER_AUTH_SECRET'] = TEST_SECRET;
process.env['STEP_CA_ROOT_CERT_PATH'] = STUB_CA_PEM_PATH;
});
afterEach(() => {
setGlobalDispatcher(originalDispatcher);
vi.restoreAllMocks();
delete process.env['BETTER_AUTH_SECRET'];
delete process.env['STEP_CA_ROOT_CERT_PATH'];
});
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Successful list response body */
const LIST_BODY = {
items: [{ id: '1', title: 'Task One' }],
nextCursor: undefined,
_partial: false,
};
/** Successful get response body */
const GET_BODY = {
item: { id: '1', title: 'Task One' },
_partial: false,
};
/** Successful capabilities response body */
const CAP_BODY = {
resources: ['tasks'],
excluded_resources: [],
max_rows_per_query: 100,
supported_verbs: ['list', 'get', 'capabilities'] as const,
};
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('FederationClientService', () => {
// ─── Successful verb calls ─────────────────────────────────────────────────
describe('list()', () => {
it('returns parsed typed response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/list/tasks',
method: 'POST',
})
.reply(200, LIST_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.list(PEER_ID, 'tasks', {});
expect(result.items).toHaveLength(1);
expect(result.items[0]).toMatchObject({ id: '1', title: 'Task One' });
await mockAgent.close();
});
});
describe('get()', () => {
it('returns parsed typed response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/get/tasks/1',
method: 'POST',
})
.reply(200, GET_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.get(PEER_ID, 'tasks', '1', {});
expect(result.item).toMatchObject({ id: '1', title: 'Task One' });
await mockAgent.close();
});
});
describe('capabilities()', () => {
it('returns parsed capabilities response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/capabilities',
method: 'GET',
})
.reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.capabilities(PEER_ID);
expect(result.resources).toContain('tasks');
expect(result.max_rows_per_query).toBe(100);
await mockAgent.close();
});
});
// ─── HTTP error surfaces ──────────────────────────────────────────────────
describe('non-2xx responses', () => {
it('surfaces 403 as FederationClientError({ status: 403, code: "FORBIDDEN" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool.intercept({ path: '/api/federation/v1/list/tasks', method: 'POST' }).reply(
403,
{ error: { code: 'forbidden', message: 'Access denied' } },
{
headers: { 'content-type': 'application/json' },
},
);
await expect(svc.list(PEER_ID, 'tasks', {})).rejects.toMatchObject({
status: 403,
code: 'FORBIDDEN',
peerId: PEER_ID,
});
await mockAgent.close();
});
it('surfaces 404 as FederationClientError({ status: 404, code: "HTTP_404" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool.intercept({ path: '/api/federation/v1/get/tasks/999', method: 'POST' }).reply(
404,
{ error: { code: 'not_found', message: 'Not found' } },
{
headers: { 'content-type': 'application/json' },
},
);
await expect(svc.get(PEER_ID, 'tasks', '999', {})).rejects.toMatchObject({
status: 404,
code: 'HTTP_404',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Network error ─────────────────────────────────────────────────────────
describe('network errors', () => {
it('surfaces network error as FederationClientError({ code: "NETWORK" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.replyWithError(new Error('ECONNREFUSED'));
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'NETWORK',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Invalid response body ─────────────────────────────────────────────────
describe('invalid response body', () => {
it('surfaces as FederationClientError({ code: "INVALID_RESPONSE" }) when body shape is wrong', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
// capabilities returns wrong shape (missing required fields)
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.reply(200, { totally: 'wrong' }, { headers: { 'content-type': 'application/json' } });
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'INVALID_RESPONSE',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Peer DB validation ────────────────────────────────────────────────────
describe('peer validation (without resolveEntry spy)', () => {
/**
* These tests exercise the real `resolveEntry` path — no spy on resolveEntry.
*/
it('throws PEER_NOT_FOUND when peer is not in DB', async () => {
// DB returns empty array (peer not found)
const db = makeDb([]);
const svc = new FederationClientService(db);
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'PEER_NOT_FOUND',
peerId: PEER_ID,
});
});
it('throws PEER_INACTIVE when peer state is not "active"', async () => {
const db = makeDb([makePeerRow({ state: 'suspended' })]);
const svc = new FederationClientService(db);
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'PEER_INACTIVE',
peerId: PEER_ID,
});
});
});
// ─── Cache behaviour ───────────────────────────────────────────────────────
describe('cache behaviour', () => {
it('hits cache on second call — only one DB lookup happens', async () => {
// Verify cache by calling the private resolveEntry directly twice and
// asserting the DB was queried only once. This avoids the HTTP layer,
// which would require either a real network or per-peer Agent rewiring
// that the cache invariant doesn't depend on.
const db = makeDb();
const selectSpy = vi.spyOn(db, 'select');
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> }
).resolveEntry.bind(svc);
const first = await resolveEntry(PEER_ID);
const second = await resolveEntry(PEER_ID);
expect(first).toBe(second);
expect(selectSpy).toHaveBeenCalledTimes(1);
});
it('serializes concurrent resolveEntry calls — only one DB lookup', async () => {
const db = makeDb();
const selectSpy = vi.spyOn(db, 'select');
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<unknown>;
}
).resolveEntry.bind(svc);
const [a, b] = await Promise.all([resolveEntry(PEER_ID), resolveEntry(PEER_ID)]);
expect(a).toBe(b);
expect(selectSpy).toHaveBeenCalledTimes(1);
});
it('flushPeer destroys the evicted Agent so old TLS connections close', async () => {
const db = makeDb();
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<{ agent: { destroy: () => Promise<void> } }>;
}
).resolveEntry.bind(svc);
const entry = await resolveEntry(PEER_ID);
const destroySpy = vi.spyOn(entry.agent, 'destroy').mockResolvedValue();
svc.flushPeer(PEER_ID);
expect(destroySpy).toHaveBeenCalledTimes(1);
});
it('flushPeer() invalidates cache — next call re-reads DB', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } })
.times(2);
// First call — populates cache (via mock resolveEntry)
await svc.capabilities(PEER_ID);
// Flush the cache
svc.flushPeer(PEER_ID);
// The spy on resolveEntry is still active — check it's called again after flush
const resolveEntrySpy = vi.spyOn(
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> },
'resolveEntry',
);
// Second call after flush — should call resolveEntry again
await svc.capabilities(PEER_ID);
// resolveEntry should have been called once after we started spying (post-flush)
expect(resolveEntrySpy).toHaveBeenCalledTimes(1);
await mockAgent.close();
});
});
// ─── loadStepCaRoot env-var guard ─────────────────────────────────────────
describe('loadStepCaRoot() env-var guard', () => {
it('throws PEER_MISCONFIGURED when STEP_CA_ROOT_CERT_PATH is not set', async () => {
delete process.env['STEP_CA_ROOT_CERT_PATH'];
const db = makeDb();
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<unknown>;
}
).resolveEntry.bind(svc);
await expect(resolveEntry(PEER_ID)).rejects.toMatchObject({
code: 'PEER_MISCONFIGURED',
});
});
});
// ─── FederationClientError class ──────────────────────────────────────────
describe('FederationClientError', () => {
it('is instanceof Error and FederationClientError', () => {
const err = new FederationClientError({
code: 'PEER_NOT_FOUND',
message: 'test',
peerId: PEER_ID,
});
expect(err).toBeInstanceOf(Error);
expect(err).toBeInstanceOf(FederationClientError);
expect(err.name).toBe('FederationClientError');
});
it('carries status, code, and peerId', () => {
const err = new FederationClientError({
status: 403,
code: 'FORBIDDEN',
message: 'forbidden',
peerId: PEER_ID,
});
expect(err.status).toBe(403);
expect(err.code).toBe('FORBIDDEN');
expect(err.peerId).toBe(PEER_ID);
});
});
});

View File

@@ -1,500 +0,0 @@
/**
* FederationClientService — outbound mTLS client for federation requests (FED-M3-08).
*
* Dials peer gateways over mTLS using the cert+sealed-key stored in `federation_peers`,
* invokes federation verbs (list / get / capabilities), and surfaces all failure modes
* as typed `FederationClientError` instances.
*
* ## Error code taxonomy
*
* | Code | When |
* | ------------------ | ------------------------------------------------------------- |
* | PEER_NOT_FOUND | No row in federation_peers for the given peerId |
* | PEER_INACTIVE | Peer row exists but state !== 'active' |
* | PEER_MISCONFIGURED | Peer row is active but missing endpointUrl or clientKeyPem |
* | NETWORK | undici threw a connection / TLS / timeout error |
* | HTTP_{status} | Peer returned a non-2xx response (e.g. HTTP_403, HTTP_404) |
* | FORBIDDEN | Peer returned 403 (convenience alias alongside HTTP_403) |
* | INVALID_RESPONSE | Response body failed Zod schema validation |
*
* ## Cache strategy
*
* Per-peer `undici.Agent` instances are cached in a `Map<peerId, AgentCacheEntry>` for
* the lifetime of the service instance. The cache is keyed on peerId (UUID).
*
* Cache invalidation:
* - `flushPeer(peerId)` — removes the entry immediately. M5/M6 MUST call this on
* cert rotation or peer revocation events so the next request re-reads the DB and
* builds a fresh TLS Agent with the new cert material.
* - On cache miss: re-reads the DB, checks state === 'active', rebuilds Agent.
*
* Cache does NOT auto-expire. The service is expected to be a singleton scoped to the
* NestJS application lifecycle; flushing on revocation/rotation is the only invalidation
* path by design (avoids redundant DB round-trips on the hot path).
*/
import { Injectable, Inject, Logger } from '@nestjs/common';
import { readFileSync } from 'node:fs';
import { Agent, fetch as undiciFetch } from 'undici';
import type { Dispatcher } from 'undici';
import { z } from 'zod';
import { type Db, eq, federationPeers } from '@mosaicstack/db';
import {
FederationListResponseSchema,
FederationGetResponseSchema,
FederationCapabilitiesResponseSchema,
FederationErrorEnvelopeSchema,
type FederationListResponse,
type FederationGetResponse,
type FederationCapabilitiesResponse,
} from '@mosaicstack/types';
import { DB } from '../../database/database.module.js';
import { unsealClientKey } from '../peer-key.util.js';
// ---------------------------------------------------------------------------
// Error taxonomy
// ---------------------------------------------------------------------------
/**
* Client-side error code set. Distinct from the server-side `FederationErrorCode`
* (which lives in `@mosaicstack/types`) because the client has additional failure
* modes (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK) that the
* server never emits.
*/
export type FederationClientErrorCode =
| 'PEER_NOT_FOUND'
| 'PEER_INACTIVE'
| 'PEER_MISCONFIGURED'
| 'NETWORK'
| 'FORBIDDEN'
| 'INVALID_RESPONSE'
| `HTTP_${number}`;
export interface FederationClientErrorOptions {
status?: number;
code: FederationClientErrorCode;
message: string;
peerId: string;
cause?: unknown;
}
/**
* Thrown by FederationClientService on every failure path.
* Callers can dispatch on `error.code` for programmatic handling.
*/
export class FederationClientError extends Error {
readonly status?: number;
readonly code: FederationClientErrorCode;
readonly peerId: string;
readonly cause?: unknown;
constructor(opts: FederationClientErrorOptions) {
super(opts.message);
this.name = 'FederationClientError';
this.status = opts.status;
this.code = opts.code;
this.peerId = opts.peerId;
this.cause = opts.cause;
}
}
// ---------------------------------------------------------------------------
// Internal cache types
// ---------------------------------------------------------------------------
interface AgentCacheEntry {
agent: Agent;
endpointUrl: string;
certPem: string;
certSerial: string;
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
@Injectable()
export class FederationClientService {
private readonly logger = new Logger(FederationClientService.name);
/**
* Per-peer undici Agent cache.
* Key = peerId (UUID string).
*
* Values are either a resolved `AgentCacheEntry` or an in-flight
* `Promise<AgentCacheEntry>` (promise-cache pattern). Storing the promise
* prevents duplicate DB lookups and duplicate key-unseal operations when two
* requests for the same peer arrive before the first build completes.
*
* Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6).
*/
private readonly cache = new Map<string, AgentCacheEntry | Promise<AgentCacheEntry>>();
/**
* Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`.
* Used as the trust anchor for peer server certificates so federation TLS is
* pinned to our PKI, not the public trust store. Lazily loaded on first use
* so unit tests that don't exercise the agent path can run without the env var.
*/
private cachedCaPem: string | null = null;
constructor(@Inject(DB) private readonly db: Db) {}
// -------------------------------------------------------------------------
// Public verb API
// -------------------------------------------------------------------------
/**
* Invoke the `list` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @param resource Resource path, e.g. "tasks".
* @param request Free-form body sent as JSON in the POST body.
* @returns Parsed `FederationListResponse<T>`.
*/
async list<T>(
peerId: string,
resource: string,
request: Record<string, unknown>,
): Promise<FederationListResponse<T>> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/list/${encodeURIComponent(resource)}`;
const body = await this.doPost(peerId, url, agent, request);
return this.parseWith<FederationListResponse<T>>(
peerId,
body,
FederationListResponseSchema(z.unknown()),
);
}
/**
* Invoke the `get` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @param resource Resource path, e.g. "tasks".
* @param id Resource identifier.
* @param request Free-form body sent as JSON in the POST body.
* @returns Parsed `FederationGetResponse<T>`.
*/
async get<T>(
peerId: string,
resource: string,
id: string,
request: Record<string, unknown>,
): Promise<FederationGetResponse<T>> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/get/${encodeURIComponent(resource)}/${encodeURIComponent(id)}`;
const body = await this.doPost(peerId, url, agent, request);
return this.parseWith<FederationGetResponse<T>>(
peerId,
body,
FederationGetResponseSchema(z.unknown()),
);
}
/**
* Invoke the `capabilities` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @returns Parsed `FederationCapabilitiesResponse`.
*/
async capabilities(peerId: string): Promise<FederationCapabilitiesResponse> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/capabilities`;
const body = await this.doGet(peerId, url, agent);
return this.parseWith<FederationCapabilitiesResponse>(
peerId,
body,
FederationCapabilitiesResponseSchema,
);
}
// -------------------------------------------------------------------------
// Cache management
// -------------------------------------------------------------------------
/**
* Flush the cached Agent for a specific peer.
*
* M5/M6 MUST call this on:
* - cert rotation events (so new cert material is picked up)
* - peer revocation events (so future requests fail at PEER_INACTIVE)
*
* After flushing, the next call to `list`, `get`, or `capabilities` for
* this peer will re-read the DB and rebuild the Agent.
*/
flushPeer(peerId: string): void {
const entry = this.cache.get(peerId);
if (entry === undefined) {
return;
}
this.cache.delete(peerId);
if (!(entry instanceof Promise)) {
// best-effort destroy; promise-cached entries skip destroy because
// the in-flight build owns its own Agent which will be GC'd when the
// owning request handles the rejection from the cache miss
entry.agent.destroy().catch(() => {
// intentionally ignored — destroy errors are not actionable
});
}
this.logger.log(`Cache flushed for peer ${peerId}`);
}
// -------------------------------------------------------------------------
// Internal helpers
// -------------------------------------------------------------------------
/**
* Load and cache the Step-CA root cert PEM from `STEP_CA_ROOT_CERT_PATH`.
* Throws `FederationClientError` if the env var is unset or the file cannot
* be read — mTLS to a peer without a pinned trust anchor would silently
* fall back to the public trust store.
*/
private loadStepCaRoot(): string {
if (this.cachedCaPem !== null) {
return this.cachedCaPem;
}
const path = process.env['STEP_CA_ROOT_CERT_PATH'];
if (!path) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: 'STEP_CA_ROOT_CERT_PATH is not set; refusing to dial peer without pinned CA trust',
peerId: '',
});
}
try {
const pem = readFileSync(path, 'utf8');
this.cachedCaPem = pem;
return pem;
} catch (err) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Failed to read STEP_CA_ROOT_CERT_PATH (${path})`,
peerId: '',
cause: err,
});
}
}
/**
* Resolve the cache entry for a peer, reading DB on miss.
*
* Uses a promise-cache pattern: concurrent callers for the same uncached
* `peerId` all `await` the same in-flight `Promise<AgentCacheEntry>` so
* only one DB lookup and one key-unseal ever runs per peer per cache miss.
* The promise is replaced with the concrete entry on success, or deleted on
* rejection so a transient error does not poison the cache permanently.
*
* Throws `FederationClientError` with appropriate code if the peer is not
* found, is inactive, or is missing required fields.
*/
private async resolveEntry(peerId: string): Promise<AgentCacheEntry> {
const cached = this.cache.get(peerId);
if (cached) {
return cached; // Promise or concrete entry — both are awaitable
}
const inflight = this.buildEntry(peerId).then(
(entry) => {
this.cache.set(peerId, entry); // replace promise with concrete value
return entry;
},
(err: unknown) => {
this.cache.delete(peerId); // don't poison the cache with a rejected promise
throw err;
},
);
this.cache.set(peerId, inflight);
return inflight;
}
/**
* Build the `AgentCacheEntry` for a peer by reading the DB, validating the
* peer's state, unsealing the private key, and constructing the mTLS Agent.
*
* Throws `FederationClientError` with appropriate code if the peer is not
* found, is inactive, or is missing required fields.
*/
private async buildEntry(peerId: string): Promise<AgentCacheEntry> {
// DB lookup
const [peer] = await this.db
.select()
.from(federationPeers)
.where(eq(federationPeers.id, peerId))
.limit(1);
if (!peer) {
throw new FederationClientError({
code: 'PEER_NOT_FOUND',
message: `Federation peer ${peerId} not found`,
peerId,
});
}
if (peer.state !== 'active') {
throw new FederationClientError({
code: 'PEER_INACTIVE',
message: `Federation peer ${peerId} is not active (state: ${peer.state})`,
peerId,
});
}
if (!peer.endpointUrl || !peer.clientKeyPem) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Federation peer ${peerId} is missing endpointUrl or clientKeyPem`,
peerId,
});
}
// Unseal the private key
let privateKeyPem: string;
try {
privateKeyPem = unsealClientKey(peer.clientKeyPem);
} catch (err) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Failed to unseal client key for peer ${peerId}`,
peerId,
cause: err,
});
}
// Build mTLS agent — pin trust to Step-CA root so we never accept
// a peer cert signed by a public CA (defense against MITM with a
// publicly-trusted DV cert for the peer's hostname).
const agent = new Agent({
connect: {
cert: peer.certPem,
key: privateKeyPem,
ca: this.loadStepCaRoot(),
// rejectUnauthorized: true is the undici default for HTTPS
},
});
const entry: AgentCacheEntry = {
agent,
endpointUrl: peer.endpointUrl,
certPem: peer.certPem,
certSerial: peer.certSerial,
};
this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`);
return entry;
}
/**
* Execute a POST request with a JSON body.
* Returns the parsed response body as an unknown value.
* Throws `FederationClientError` on network errors and non-2xx responses.
*/
private async doPost(
peerId: string,
url: string,
agent: Dispatcher,
body: Record<string, unknown>,
): Promise<unknown> {
return this.doRequest(peerId, url, agent, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
}
/**
* Execute a GET request.
* Returns the parsed response body as an unknown value.
* Throws `FederationClientError` on network errors and non-2xx responses.
*/
private async doGet(peerId: string, url: string, agent: Dispatcher): Promise<unknown> {
return this.doRequest(peerId, url, agent, { method: 'GET' });
}
private async doRequest(
peerId: string,
url: string,
agent: Dispatcher,
init: { method: string; headers?: Record<string, string>; body?: string },
): Promise<unknown> {
let response: Awaited<ReturnType<typeof undiciFetch>>;
try {
response = await undiciFetch(url, {
...init,
dispatcher: agent,
});
} catch (err) {
throw new FederationClientError({
code: 'NETWORK',
message: `Network error calling peer ${peerId} at ${url}: ${err instanceof Error ? err.message : String(err)}`,
peerId,
cause: err,
});
}
const rawBody = await response.text().catch(() => '');
if (!response.ok) {
const status = response.status;
// Attempt to parse as federation error envelope
let serverMessage = `HTTP ${status}`;
try {
const json: unknown = JSON.parse(rawBody);
const result = FederationErrorEnvelopeSchema.safeParse(json);
if (result.success) {
serverMessage = result.data.error.message;
}
} catch {
// Not valid JSON or not a federation envelope — use generic message
}
// Specific code for 403 (most actionable for callers); generic HTTP_{n} for others
const code: FederationClientErrorCode = status === 403 ? 'FORBIDDEN' : `HTTP_${status}`;
throw new FederationClientError({
status,
code,
message: `Peer ${peerId} returned ${status}: ${serverMessage}`,
peerId,
});
}
try {
return JSON.parse(rawBody) as unknown;
} catch (err) {
throw new FederationClientError({
code: 'INVALID_RESPONSE',
message: `Peer ${peerId} returned non-JSON body`,
peerId,
cause: err,
});
}
}
/**
* Parse and validate a response body against a Zod schema.
*
* For list/get, callers pass the result of `FederationListResponseSchema(z.unknown())`
* so that the envelope structure is validated without requiring a concrete item schema
* at the client level. The generic `T` provides compile-time typing.
*
* Throws `FederationClientError({ code: 'INVALID_RESPONSE' })` on parse failure.
*/
private parseWith<T>(peerId: string, body: unknown, schema: z.ZodTypeAny): T {
const result = schema.safeParse(body);
if (!result.success) {
const issues = result.error.issues
.map((e: z.ZodIssue) => `[${e.path.join('.') || 'root'}] ${e.message}`)
.join('; ');
throw new FederationClientError({
code: 'INVALID_RESPONSE',
message: `Peer ${peerId} returned invalid response shape: ${issues}`,
peerId,
});
}
return result.data as T;
}
}

View File

@@ -1,13 +0,0 @@
/**
* Federation client barrel — re-exports for FederationModule consumers.
*
* M3-09 (QuerySourceService) and future milestones should import from here,
* not directly from the implementation file.
*/
export {
FederationClientService,
FederationClientError,
type FederationClientErrorCode,
type FederationClientErrorOptions,
} from './federation-client.service.js';

View File

@@ -5,25 +5,11 @@ import { EnrollmentController } from './enrollment.controller.js';
import { EnrollmentService } from './enrollment.service.js';
import { FederationController } from './federation.controller.js';
import { GrantsService } from './grants.service.js';
import { FederationClientService } from './client/index.js';
import { FederationAuthGuard } from './server/index.js';
@Module({
controllers: [EnrollmentController, FederationController],
providers: [
AdminGuard,
CaService,
EnrollmentService,
GrantsService,
FederationClientService,
FederationAuthGuard,
],
exports: [
CaService,
EnrollmentService,
GrantsService,
FederationClientService,
FederationAuthGuard,
],
providers: [AdminGuard, CaService, EnrollmentService, GrantsService, FederationAuthGuard],
exports: [CaService, EnrollmentService, GrantsService, FederationAuthGuard],
})
export class FederationModule {}

View File

@@ -62,9 +62,8 @@ Jarvis (v0.2.0) is a self-hosted AI assistant with a Python FastAPI backend and
19. `@mosaicstack/prdy` — PRD wizard
20. `@mosaicstack/quality-rails` — code quality scaffolder
21. `@mosaicstack/cli` — unified `mosaic` CLI
22. Mosaic framework git wrappers — provider-aware issue/PR/CI shell wrappers for GitHub and self-hosted Gitea hosts used by Mosaic/USC repositories
23. Docker Compose deployment + bare-metal capability
24. Agent log service — ingest, parse, tier, summarize agent interaction logs
22. Docker Compose deployment + bare-metal capability
23. Agent log service — ingest, parse, tier, summarize agent interaction logs
### Out of Scope (v0.1.0)

View File

@@ -30,7 +30,6 @@ These are MVP-level checks that don't belong to any single workstream. Updated b
| MVP-T04 | not-started | Sync `.mosaic/orchestrator/mission.json` MVP slot with this manifest (milestone enumeration, etc.) | Coord state file; consider whether to repopulate via `mosaic coord` or accept hand-edit |
| MVP-T05 | in-progress | Kick off W1 / FED-M1 — federated tier infrastructure | Session 16 (2026-04-19): FED-M1-01 in-progress on `feat/federation-m1-tier-config` |
| MVP-T06 | not-started | Declare additional workstreams (web dashboard, TUI/CLI parity, remote control, etc.) as scope solidifies | Track each new workstream by adding a row to the Workstream Rollup |
| MVP-T07 | in-progress | Harden Mosaic framework Gitea PR metadata and merge preflight wrappers | Internal ref `t_a292e96f`; source branch `fix/gitea-pr-metadata-login-t-a292e96f` |
## Pointer to Active Workstream

View File

@@ -1,48 +0,0 @@
# t_a292e96f — Gitea PR metadata and merge wrapper fix
## Objective
Fix Mosaic git wrappers so Gitea repositories on `git.uscllc.com` resolve PR metadata and merge preflight through the correct host credentials, without selecting the stale `mosaicstack` Tea login.
## Acceptance criteria
- `pr-metadata.sh` returns `baseRefName=main` for U-Connect PR #1905 and PR #1908.
- `pr-metadata.sh` returns source-branch-style `headRefName`; for Gitea `refs/pull/<n>/head` responses, normalize to `head.label`.
- `pr-merge.sh` preserves Mosaic squash-only and base-branch policy, then uses host-matched Gitea API credentials for Gitea merges instead of a hard-coded Tea login.
- Add regression coverage/harness for Gitea metadata normalization and merge preflight.
- Do not print, log, or commit tokens.
## Plan
1. Reproduce current live metadata/login context with sanitized output.
2. Patch repo-source shell wrappers under `packages/mosaic/framework/tools/git/`.
3. Add a hermetic shell regression harness with fake `git`, `curl`, and `tea`.
4. Validate with `bash -n`, shellcheck if available, regression harness, and live sanitized U-Connect wrapper calls.
5. Apply the same script changes to the installed Mosaic wrapper location only after source changes validate, so active U-Connect merge wrappers are unblocked while the PR is reviewed.
6. Commit, push through queue guard, open PR, and hand off to Ultron review task `t_848435ab`; do not merge.
## Progress
- Live sanitized metadata check before source patch:
- PR #1905: `baseRefName=main`, `headRefName=edith/t_39ce717c-authentik-smoke-gate`.
- PR #1908: `baseRefName=main`, `headRefName=refs/pull/1908/head`; raw Gitea `head.label` is `fix/t_23fa9e1d-portal-health-backend`.
- `tea login list` contains only `git.mosaicstack.dev`, so the prior `--login mosaicstack` default cannot work for `git.uscllc.com`.
## Verification log
- `bash -n packages/mosaic/framework/tools/git/detect-platform.sh packages/mosaic/framework/tools/git/pr-metadata.sh packages/mosaic/framework/tools/git/pr-merge.sh packages/mosaic/framework/tools/git/tests/pr-gitea-wrapper-regression.sh` — pass.
- `shellcheck packages/mosaic/framework/tools/git/detect-platform.sh packages/mosaic/framework/tools/git/pr-metadata.sh packages/mosaic/framework/tools/git/pr-merge.sh packages/mosaic/framework/tools/git/tests/pr-gitea-wrapper-regression.sh` — pass when available in the Kanban runtime.
- `TMPDIR="$PWD/.agent-tmp" bash packages/mosaic/framework/tools/git/tests/pr-gitea-wrapper-regression.sh` — pass; proves host-matched Gitea credential selection, metadata normalization, and merge dry-run preflight without invoking `tea`.
- Live sanitized U-Connect metadata using the patched wrapper from `/src/uconnect`:
- PR #1905: `number=1905`, `baseRefName=main`, `headRefName=edith/t_39ce717c-authentik-smoke-gate`, `state=open`.
- PR #1908: `number=1908`, `baseRefName=main`, `headRefName=fix/t_23fa9e1d-portal-health-backend`, `state=closed`.
- Live sanitized U-Connect merge preflight using `pr-merge.sh --skip-queue-guard --dry-run`:
- PR #1905: `Dry run: Gitea merge preflight OK for USC/uconnect#1905 targeting main via git.uscllc.com API`.
- PR #1908: `Dry run: Gitea merge preflight OK for USC/uconnect#1908 targeting main via git.uscllc.com API`.
- Installed wrapper parity: `/home/hermes/.config/mosaic/tools/git/{detect-platform.sh,pr-metadata.sh,pr-merge.sh}` byte-match the PR source copies after validation, so active U-Connect wrapper invocations use the same fix while source PR review runs.
## Risks / notes
- `--dry-run` was added to `pr-merge.sh` to validate metadata/auth/preflight without merging a live PR.
- Gitea branch deletion after merge remains a documented warning, matching prior behavior, and is not expanded in this fix.
- Duplicate recovery PR #517 was closed after wrapper-first `pr-close.sh -n 517` failed headlessly with `/dev/tty`; PR #518 is the review target.

View File

@@ -42,7 +42,6 @@
"access": "public"
},
"files": [
"dist",
"drizzle"
"dist"
]
}

View File

@@ -1,12 +1,10 @@
import { PGlite } from '@electric-sql/pglite';
import { vector } from '@electric-sql/pglite/vector';
import { drizzle } from 'drizzle-orm/pglite';
import * as schema from './schema.js';
import type { DbHandle } from './client.js';
export function createPgliteDb(dataDir: string): DbHandle {
// pgvector extension is required by migration 0001 (insights.embedding column).
const client = new PGlite(dataDir, { extensions: { vector } });
const client = new PGlite(dataDir);
const db = drizzle(client, { schema });
return {
db: db as unknown as DbHandle['db'],

View File

@@ -1,6 +1,6 @@
export { createDb, type Db, type DbHandle } from './client.js';
export { createPgliteDb } from './client-pglite.js';
export { runMigrations, runPgliteMigrations } from './migrate.js';
export { runMigrations } from './migrate.js';
export * from './schema.js';
export * from './federation.js';
export {

View File

@@ -1,70 +0,0 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { sql } from 'drizzle-orm';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createPgliteDb } from './client-pglite.js';
import { runPgliteMigrations } from './migrate.js';
import type { DbHandle } from './client.js';
interface PgliteExec {
exec(query: string): Promise<unknown>;
}
describe('runPgliteMigrations', () => {
let dataDir: string;
let handle: DbHandle;
beforeEach(() => {
dataDir = mkdtempSync(join(tmpdir(), 'mosaic-db-migrate-test-'));
handle = createPgliteDb(dataDir);
});
afterEach(async () => {
await handle.close();
rmSync(dataDir, { recursive: true, force: true });
});
it('creates the BetterAuth tables required by the gateway', async () => {
await runPgliteMigrations(handle);
const result = (await handle.db.execute(sql`
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
`)) as unknown as { rows: Array<{ table_name: string }> };
const tables = result.rows.map((r) => r.table_name);
// Auth tables — required for sign-in / bootstrap to function.
expect(tables).toContain('users');
expect(tables).toContain('sessions');
expect(tables).toContain('accounts');
expect(tables).toContain('verifications');
// Schema sanity check — admin token table consumed by mosaic gateway config.
expect(tables).toContain('admin_tokens');
});
it('is idempotent — running twice does not error', async () => {
await runPgliteMigrations(handle);
await expect(runPgliteMigrations(handle)).resolves.toBeUndefined();
});
it('surfaces statement-level error context on failure and leaves no ledger row', async () => {
// Pre-create a `users` table that conflicts with migration 0000's CREATE TABLE,
// forcing it to fail without IF NOT EXISTS.
const client = (handle.db as unknown as { $client: PgliteExec }).$client;
await client.exec('CREATE TABLE users (sentinel text)');
await expect(runPgliteMigrations(handle)).rejects.toThrow(
/migration hash=[a-f0-9]+ statement #\d+ failed/,
);
// Ledger should be empty — partial application must not pretend to be complete.
const ledger = (await handle.db.execute(
sql`SELECT count(*)::int AS count FROM drizzle.__drizzle_migrations`,
)) as unknown as { rows: Array<{ count: number }> };
expect(ledger.rows[0]?.count).toBe(0);
});
});

View File

@@ -1,109 +1,18 @@
import { dirname, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { sql } from 'drizzle-orm';
import { drizzle as drizzlePostgres } from 'drizzle-orm/postgres-js';
import { migrate as migratePostgres } from 'drizzle-orm/postgres-js/migrator';
import { readMigrationFiles } from 'drizzle-orm/migrator';
import { drizzle } from 'drizzle-orm/postgres-js';
import { migrate } from 'drizzle-orm/postgres-js/migrator';
import postgres from 'postgres';
import { DEFAULT_DATABASE_URL } from './defaults.js';
import type { DbHandle } from './client.js';
interface PgliteExecutor {
exec(query: string): Promise<unknown>;
}
interface ExecuteRows<T> {
rows: T[];
}
function migrationsFolder(): string {
const here = dirname(fileURLToPath(import.meta.url));
return resolve(here, '../drizzle');
}
export async function runMigrations(url?: string): Promise<void> {
const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL;
const sqlClient = postgres(connectionString, { max: 1 });
const db = drizzlePostgres(sqlClient);
const sql = postgres(connectionString, { max: 1 });
const db = drizzle(sql);
const __dirname = dirname(fileURLToPath(import.meta.url));
try {
// TODO: postgres-tier first-install also fails because (a) Drizzle wraps every
// migration in one transaction (breaks 0009's ALTER TYPE ADD VALUE → SET DEFAULT
// sequence) and (b) drizzle/meta/_journal.json has 0009 ordered before 0008,
// which the postgres-js migrator skips by `created_at < folderMillis`. The
// PGlite path below sidesteps both. A follow-up should either share the
// per-statement loop (see runPgliteMigrations) or fix the journal ordering.
await migratePostgres(db, { migrationsFolder: migrationsFolder() });
await migrate(db, { migrationsFolder: resolve(__dirname, '../drizzle') });
} finally {
await sqlClient.end();
}
}
// Apply Drizzle migrations against an embedded PGlite database.
//
// We don't reuse drizzle's pglite migrator because it wraps ALL migrations in
// one outer transaction, which breaks Postgres' `check_safe_enum_use` rule —
// e.g. migration 0009 does `ALTER TYPE ADD VALUE 'pending'` then references
// `'pending'` as a default in the same tx. PGlite's `exec()` runs each
// statement under the Simple Query protocol, autocommitting between them.
//
// We still write to the standard `drizzle.__drizzle_migrations` ledger so the
// result is interoperable with `runMigrations()` on a postgres-backed deploy
// (modulo the journal-ordering bug noted above).
//
// We skip-by-hash rather than skip-by-folderMillis (which is what Drizzle's
// postgres-js migrator does). That's deliberate — out-of-order timestamps in
// `_journal.json` won't silently drop migrations.
//
// Failure model: each statement autocommits, and the ledger row is written
// only after all statements in a migration succeed. A crash mid-migration
// leaves the prefix applied with no ledger entry, so the next boot will
// replay those statements and fail loudly on "already exists". Recovery:
// drop the partially-applied objects, or insert the migration's hash into
// `drizzle.__drizzle_migrations` manually. The error log identifies which
// statement of which migration was the culprit.
export async function runPgliteMigrations(handle: DbHandle): Promise<void> {
const client = (handle.db as unknown as { $client?: PgliteExecutor }).$client;
if (!client || typeof client.exec !== 'function') {
throw new Error('runPgliteMigrations: handle.db is not backed by a PGlite client');
}
await client.exec('CREATE SCHEMA IF NOT EXISTS drizzle');
await client.exec(`
CREATE TABLE IF NOT EXISTS drizzle.__drizzle_migrations (
id SERIAL PRIMARY KEY,
hash text NOT NULL,
created_at bigint
)
`);
const appliedRows = (await handle.db.execute(
sql`SELECT hash FROM drizzle.__drizzle_migrations`,
)) as unknown as ExecuteRows<{ hash: string }>;
const applied = new Set(appliedRows.rows.map((r) => r.hash));
const migrations = readMigrationFiles({ migrationsFolder: migrationsFolder() });
for (const migration of migrations) {
if (applied.has(migration.hash)) continue;
// Run each statement-breakpoint chunk in its own exec() call so PGlite
// commits between statements — this is what lets `ALTER TYPE ADD VALUE`
// become visible before a subsequent statement references the new value.
for (const [stmtIdx, stmt] of migration.sql.entries()) {
const trimmed = stmt.trim();
if (!trimmed) continue;
try {
await client.exec(trimmed);
} catch (err) {
const cause = err instanceof Error ? err.message : String(err);
throw new Error(
`runPgliteMigrations: migration hash=${migration.hash} statement #${stmtIdx} failed: ${cause}\n` +
`Statement: ${trimmed.slice(0, 200)}${trimmed.length > 200 ? '…' : ''}`,
{ cause: err },
);
}
}
await handle.db.execute(
sql`INSERT INTO drizzle.__drizzle_migrations (hash, created_at) VALUES (${migration.hash}, ${migration.folderMillis})`,
);
await sql.end();
}
}

View File

@@ -92,7 +92,7 @@ get_remote_host() {
}
# Resolve a Gitea API token for the given host.
# Priority: Mosaic credential loader → host-matched GITEA_TOKEN env → ~/.git-credentials
# Priority: Mosaic credential loader → GITEA_TOKEN env → ~/.git-credentials
get_gitea_token() {
local host="$1"
local script_dir
@@ -103,28 +103,16 @@ get_gitea_token() {
if [[ -f "$cred_loader" ]]; then
local token
token=$(
# shellcheck source=/dev/null
source "$cred_loader"
# Host-specific wrapper resolution must not inherit a caller/global GITEA_TOKEN.
# load_credentials intentionally preserves existing env vars for interactive use,
# but merge/metadata wrappers need the token matching the remote host.
unset GITEA_TOKEN GITEA_URL
case "$host" in
git.mosaicstack.dev) load_credentials gitea-mosaicstack 2>/dev/null ;;
git.uscllc.com) load_credentials gitea-usc 2>/dev/null ;;
*)
local matched=false
for svc in gitea-mosaicstack gitea-usc; do
unset GITEA_TOKEN GITEA_URL
load_credentials "$svc" 2>/dev/null || continue
if [[ "${GITEA_URL:-}" == "https://$host" || "${GITEA_URL:-}" == "http://$host" || "${GITEA_URL:-}" == *"//$host" ]]; then
matched=true
break
fi
done
if [[ "$matched" != true ]]; then
[[ "${GITEA_URL:-}" == *"$host"* ]] && break
unset GITEA_TOKEN GITEA_URL
fi
done
;;
esac
echo "${GITEA_TOKEN:-}"
@@ -135,13 +123,11 @@ get_gitea_token() {
fi
fi
# 2. GITEA_TOKEN env var (only when GITEA_URL, if present, matches the remote host)
# 2. GITEA_TOKEN env var (may be set by caller)
if [[ -n "${GITEA_TOKEN:-}" ]]; then
if [[ -z "${GITEA_URL:-}" || "${GITEA_URL:-}" == "https://$host" || "${GITEA_URL:-}" == "http://$host" || "${GITEA_URL:-}" == *"//$host" ]]; then
echo "$GITEA_TOKEN"
return 0
fi
fi
# 3. ~/.git-credentials file
local creds="$HOME/.git-credentials"

View File

@@ -1,11 +1,10 @@
#!/bin/bash
# pr-merge.sh - Merge pull requests on Gitea or GitHub
# Usage: pr-merge.sh -n PR_NUMBER [-m squash] [-d] [--skip-queue-guard] [--dry-run]
# Usage: pr-merge.sh -n PR_NUMBER [-m squash] [-d] [--skip-queue-guard]
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck disable=SC1091
source "$SCRIPT_DIR/detect-platform.sh"
# Default values
@@ -13,7 +12,6 @@ PR_NUMBER=""
MERGE_METHOD="squash"
DELETE_BRANCH=false
SKIP_QUEUE_GUARD=false
DRY_RUN=false
usage() {
cat <<EOF
@@ -26,7 +24,6 @@ Options:
-m, --method METHOD Merge method: squash only (default: squash)
-d, --delete-branch Delete the head branch after merge
--skip-queue-guard Skip CI queue guard wait before merge
--dry-run Validate metadata/auth/preflight without merging
-h, --help Show this help message
Examples:
@@ -57,10 +54,6 @@ while [[ $# -gt 0 ]]; do
SKIP_QUEUE_GUARD=true
shift
;;
--dry-run)
DRY_RUN=true
shift
;;
-h|--help)
usage
;;
@@ -81,8 +74,7 @@ if [[ "$MERGE_METHOD" != "squash" ]]; then
exit 1
fi
METADATA_JSON="$("$SCRIPT_DIR/pr-metadata.sh" -n "$PR_NUMBER")"
BASE_BRANCH="$(printf '%s' "$METADATA_JSON" | python3 -c 'import json, sys; print((json.load(sys.stdin).get("baseRefName") or "").strip())')"
BASE_BRANCH="$("$SCRIPT_DIR/pr-metadata.sh" -n "$PR_NUMBER" | python3 -c 'import json, sys; print((json.load(sys.stdin).get("baseRefName") or "").strip())')"
if [[ "$BASE_BRANCH" != "main" ]]; then
echo "Error: Mosaic policy allows merges only for PRs targeting 'main' (found '$BASE_BRANCH')." >&2
exit 1
@@ -102,55 +94,19 @@ REPO=$(get_repo_name)
case "$PLATFORM" in
github)
if [[ "$DRY_RUN" == true ]]; then
echo "Dry run: GitHub merge preflight OK for ${OWNER}/${REPO}#${PR_NUMBER} targeting ${BASE_BRANCH}"
exit 0
fi
CMD=(gh pr merge "$PR_NUMBER" --squash)
[[ "$DELETE_BRANCH" == true ]] && CMD+=(--delete-branch)
"${CMD[@]}"
CMD="gh pr merge $PR_NUMBER --squash"
[[ "$DELETE_BRANCH" == true ]] && CMD="$CMD --delete-branch"
eval "$CMD"
;;
gitea)
HOST=$(get_remote_host) || {
echo "Error: Cannot determine host from remote URL" >&2
exit 1
}
TOKEN=$(get_gitea_token "$HOST") || {
echo "Error: Could not resolve Gitea API token for ${HOST}" >&2
exit 1
}
if [[ "$DRY_RUN" == true ]]; then
echo "Dry run: Gitea merge preflight OK for ${OWNER}/${REPO}#${PR_NUMBER} targeting ${BASE_BRANCH} via ${HOST} API"
exit 0
fi
RESPONSE_FILE=$(mktemp)
trap 'rm -f "$RESPONSE_FILE"' EXIT
HTTP_CODE=$(curl -sS \
-X POST \
-H "Authorization: token $TOKEN" \
-H "Content-Type: application/json" \
-d '{"Do":"squash"}' \
-o "$RESPONSE_FILE" \
-w '%{http_code}' \
"https://${HOST}/api/v1/repos/${OWNER}/${REPO}/pulls/${PR_NUMBER}/merge")
RESPONSE_BODY=$(cat "$RESPONSE_FILE")
rm -f "$RESPONSE_FILE"
trap - EXIT
if [[ ! "$HTTP_CODE" =~ ^2 ]]; then
echo "Error: Gitea PR merge failed for ${OWNER}/${REPO}#${PR_NUMBER} (HTTP ${HTTP_CODE})" >&2
if [[ -n "$RESPONSE_BODY" ]]; then
printf '%s\n' "$RESPONSE_BODY" >&2
fi
exit 1
fi
CMD="tea pr merge $PR_NUMBER --style squash --repo $OWNER/$REPO --login ${GITEA_LOGIN:-mosaicstack}"
# Delete branch after merge if requested
if [[ "$DELETE_BRANCH" == true ]]; then
echo "Note: Branch deletion after merge may need to be done separately with the Gitea API" >&2
echo "Note: Branch deletion after merge may need to be done separately with tea" >&2
fi
eval "$CMD"
;;
*)
echo "Error: Could not detect git platform" >&2

View File

@@ -5,7 +5,6 @@
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck disable=SC1091
source "$SCRIPT_DIR/detect-platform.sh"
# Parse arguments
@@ -56,51 +55,39 @@ if [[ "$PLATFORM" == "github" ]]; then
elif [[ "$PLATFORM" == "gitea" ]]; then
OWNER=$(get_repo_owner)
REPO=$(get_repo_name)
HOST=$(get_remote_host) || {
REMOTE_URL=$(git remote get-url origin 2>/dev/null)
# Extract host from remote URL
if [[ "$REMOTE_URL" == https://* ]]; then
HOST=$(echo "$REMOTE_URL" | sed -E 's|https://([^/]+)/.*|\1|')
elif [[ "$REMOTE_URL" == git@* ]]; then
HOST=$(echo "$REMOTE_URL" | sed -E 's|git@([^:]+):.*|\1|')
else
echo "Error: Cannot determine host from remote URL" >&2
exit 1
}
fi
API_URL="https://${HOST}/api/v1/repos/${OWNER}/${REPO}/pulls/${PR_NUMBER}"
GITEA_API_TOKEN=$(get_gitea_token "$HOST" || true)
RESPONSE_FILE=$(mktemp)
trap 'rm -f "$RESPONSE_FILE"' EXIT
if [[ -n "$GITEA_API_TOKEN" ]]; then
HTTP_CODE=$(curl -sS -H "Authorization: token $GITEA_API_TOKEN" -o "$RESPONSE_FILE" -w '%{http_code}' "$API_URL")
RAW=$(curl -sS -H "Authorization: token $GITEA_API_TOKEN" "$API_URL")
else
HTTP_CODE=$(curl -sS -o "$RESPONSE_FILE" -w '%{http_code}' "$API_URL")
fi
RAW=$(cat "$RESPONSE_FILE")
rm -f "$RESPONSE_FILE"
trap - EXIT
if [[ ! "$HTTP_CODE" =~ ^2 ]]; then
echo "Error: Gitea PR metadata request failed for ${OWNER}/${REPO}#${PR_NUMBER} (HTTP ${HTTP_CODE})" >&2
exit 1
RAW=$(curl -sS "$API_URL")
fi
# Normalize Gitea response to match our expected schema
METADATA=$(echo "$RAW" | python3 -c "
import json, sys
data = json.load(sys.stdin)
if 'message' in data and not data.get('number'):
raise SystemExit('Error: Gitea PR metadata response did not contain PR data')
head = data.get('head') or {}
head_ref = head.get('ref') or ''
head_label = head.get('label') or ''
# Gitea can report closed/merged PR heads as refs/pull/<n>/head; callers need
# the source branch name equivalent to GitHub headRefName, so prefer label then.
if head_ref.startswith('refs/pull/') and head_label:
head_ref = head_label
normalized = {
'number': data.get('number'),
'title': data.get('title'),
'body': data.get('body', ''),
'state': data.get('state'),
'author': data.get('user', {}).get('login', ''),
'headRefName': head_ref,
'headRefName': data.get('head', {}).get('ref', ''),
'baseRefName': data.get('base', {}).get('ref', ''),
'labels': [l.get('name', '') for l in data.get('labels', [])],
'assignees': [a.get('login', '') for a in data.get('assignees', [])],

View File

@@ -1,116 +0,0 @@
#!/usr/bin/env bash
# Regression harness for Gitea PR metadata normalization and merge preflight.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
GIT_TOOLS_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
TEST_ROOT="${TEST_ROOT:-$(pwd)/.test-output/pr-gitea-wrapper-regression}"
FAKE_BIN="$TEST_ROOT/bin"
FAKE_REPO="$TEST_ROOT/repo"
rm -rf "$TEST_ROOT"
mkdir -p "$FAKE_BIN" "$FAKE_REPO" "$TEST_ROOT/state"
cat > "$FAKE_BIN/git" <<'SH'
#!/usr/bin/env bash
set -euo pipefail
if [[ "$*" == "remote get-url origin" ]]; then
echo "https://git.uscllc.com/usc/uconnect.git"
exit 0
fi
echo "unexpected git invocation: $*" >&2
exit 2
SH
chmod +x "$FAKE_BIN/git"
cat > "$FAKE_BIN/curl" <<'SH'
#!/usr/bin/env bash
set -euo pipefail
method="GET"
out_file=""
write_format=""
url=""
while [[ $# -gt 0 ]]; do
case "$1" in
-X)
method="$2"; shift 2 ;;
-o)
out_file="$2"; shift 2 ;;
-w)
write_format="$2"; shift 2 ;;
-H|-d)
shift 2 ;;
-s|-S|-f|-k|-sS|-fsS)
shift ;;
http*)
url="$1"; shift ;;
*)
shift ;;
esac
done
body='{}'
code="200"
if [[ "$method" == "GET" && "$url" == *"/api/v1/repos/usc/uconnect/pulls/1908" ]]; then
body='{"number":1908,"title":"Test PR","body":"","state":"open","user":{"login":"edith"},"head":{"label":"fix/t_23fa9e1d-portal-health-backend","ref":"refs/pull/1908/head","sha":"abc123"},"base":{"label":"main","ref":"main","sha":"def456"},"labels":[],"assignees":[],"created_at":"2026-05-22T00:00:00Z","updated_at":"2026-05-22T00:00:00Z","html_url":"https://git.uscllc.com/usc/uconnect/pulls/1908","draft":false,"mergeable":true,"diff_url":"https://git.uscllc.com/usc/uconnect/pulls/1908.diff"}'
elif [[ "$method" == "POST" && "$url" == *"/api/v1/repos/usc/uconnect/pulls/1908/merge" ]]; then
echo "$url" > "${TEST_ROOT:?}/state/merge-url"
body='{"merged":true}'
else
code="404"
body='{"message":"not found"}'
fi
if [[ -n "$out_file" ]]; then
printf '%s' "$body" > "$out_file"
else
printf '%s' "$body"
fi
if [[ -n "$write_format" ]]; then
printf '%s' "$code"
fi
SH
chmod +x "$FAKE_BIN/curl"
cat > "$FAKE_BIN/tea" <<'SH'
#!/usr/bin/env bash
echo "tea must not be invoked by Gitea merge preflight" >&2
exit 99
SH
chmod +x "$FAKE_BIN/tea"
cat > "$TEST_ROOT/credentials.json" <<'JSON'
{
"gitea": {
"usc": {"url": "https://git.uscllc.com", "token": "fake-token-usc"},
"mosaicstack": {"url": "https://git.mosaicstack.dev", "token": "fake-token-mosaic"}
}
}
JSON
export PATH="$FAKE_BIN:$PATH"
export TEST_ROOT
export MOSAIC_CREDENTIALS_FILE="$TEST_ROOT/credentials.json"
cd "$FAKE_REPO"
metadata="$("$GIT_TOOLS_DIR/pr-metadata.sh" -n 1908)"
python3 - "$metadata" <<'PY'
import json
import sys
metadata = json.loads(sys.argv[1])
assert metadata["baseRefName"] == "main", metadata
assert metadata["headRefName"] == "fix/t_23fa9e1d-portal-health-backend", metadata
PY
merge_output="$("$GIT_TOOLS_DIR/pr-merge.sh" -n 1908 -m squash --skip-queue-guard --dry-run 2>&1)"
if grep -q "mosaicstack\|Login name\|tea must not" <<<"$merge_output"; then
echo "$merge_output" >&2
exit 1
fi
if ! grep -q "Dry run: Gitea merge preflight OK" <<<"$merge_output"; then
echo "$merge_output" >&2
exit 1
fi
printf 'Gitea PR metadata and merge preflight regression passed\n'

View File

@@ -16,15 +16,8 @@ import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import {
users,
teams,
teamMembers,
conversations,
messages,
createPgliteDb,
runPgliteMigrations,
} from '@mosaicstack/db';
import { users, teams, teamMembers, conversations, messages } from '@mosaicstack/db';
import { createPgliteDbWithVector, runPgliteMigrations } from './test-utils/pglite-with-vector.js';
import postgres from 'postgres';
import { afterAll, describe, expect, it } from 'vitest';
@@ -109,8 +102,11 @@ describe.skipIf(!run)('migrate-tier — PGlite → federated PG', () => {
/* ---- 1. Create a temp PGlite db ---------------------------------- */
pgliteDataDir = await fs.mkdtemp(path.join(os.tmpdir(), 'fed-m1-08-'));
const handle = createPgliteDb(pgliteDataDir);
await runPgliteMigrations(handle);
const handle = createPgliteDbWithVector(pgliteDataDir);
// Run Drizzle migrations against PGlite.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await runPgliteMigrations(handle.db as any);
/* ---- 2. Seed representative data --------------------------------- */

View File

@@ -0,0 +1,52 @@
/**
* Test-only helpers for creating a PGlite database with the pgvector extension
* and running Drizzle migrations against it.
*
* These are intentionally NOT exported from @mosaicstack/db to avoid pulling
* the WASM vector bundle into the public API surface.
*/
import { createRequire } from 'node:module';
import { dirname, resolve } from 'node:path';
import { PGlite } from '@electric-sql/pglite';
import { vector } from '@electric-sql/pglite/vector';
import { drizzle } from 'drizzle-orm/pglite';
import { migrate as migratePglite } from 'drizzle-orm/pglite/migrator';
import type { PgliteDatabase } from 'drizzle-orm/pglite';
import * as schema from '@mosaicstack/db';
import type { DbHandle } from '@mosaicstack/db';
/**
* Create a PGlite DB handle with the pgvector extension loaded.
* Required for running Drizzle migrations that include `CREATE EXTENSION vector`.
*/
export function createPgliteDbWithVector(dataDir: string): DbHandle {
const client = new PGlite(dataDir, { extensions: { vector } });
const db = drizzle(client, { schema });
return {
db: db as unknown as DbHandle['db'],
close: async () => {
await client.close();
},
};
}
/**
* Run Drizzle migrations against an already-open PGlite database handle.
* Resolves the migrations folder from @mosaicstack/db's installed location.
*
* @param db A PgliteDatabase instance (from drizzle-orm/pglite).
*/
export async function runPgliteMigrations(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
db: PgliteDatabase<any>,
): Promise<void> {
// Resolve @mosaicstack/db package root to locate its drizzle migrations folder.
const _require = createRequire(import.meta.url);
const dbPkgMain = _require.resolve('@mosaicstack/db');
// dbPkgMain → …/packages/db/dist/index.js → dirname = dist/
// go up one level from dist/ to find the sibling drizzle/ folder
const migrationsFolder = resolve(dirname(dbPkgMain), '../drizzle');
await migratePglite(db, { migrationsFolder });
}

64
pnpm-lock.yaml generated
View File

@@ -179,9 +179,6 @@ importers:
socket.io:
specifier: ^4.8.0
version: 4.8.3
undici:
specifier: ^7.24.6
version: 7.24.6
uuid:
specifier: ^11.0.0
version: 11.1.0
@@ -716,10 +713,10 @@ importers:
dependencies:
'@mariozechner/pi-agent-core':
specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
'@mariozechner/pi-ai':
specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
'@sinclair/typebox':
specifier: ^0.34.41
version: 0.34.48
@@ -6996,6 +6993,10 @@ packages:
resolution: {integrity: sha512-gBLkYIlEnSp8pFbT64yFgGE6UIB9tAkhukC23PmMDCe5Nd+cRqKxSjw5y54MK2AZMgZfJWMaNE4nYUHgi1XEOw==}
engines: {node: '>=18.17'}
undici@7.24.3:
resolution: {integrity: sha512-eJdUmK/Wrx2d+mnWWmwwLRyA7OQCkLap60sk3dOK4ViZR7DKwwptwuIvFBg2HaiP9ESaEdhtpSymQPvytpmkCA==}
engines: {node: '>=20.18.1'}
undici@7.24.6:
resolution: {integrity: sha512-Xi4agocCbRzt0yYMZGMA6ApD7gvtUFaxm4ZmeacWI4cZxaF6C+8I8QfofC20NAePiB/IcvZmzkJ7XPa471AEtA==}
engines: {node: '>=20.18.1'}
@@ -7328,6 +7329,12 @@ snapshots:
'@jridgewell/gen-mapping': 0.3.13
'@jridgewell/trace-mapping': 0.3.31
'@anthropic-ai/sdk@0.73.0(zod@3.25.76)':
dependencies:
json-schema-to-ts: 3.1.1
optionalDependencies:
zod: 3.25.76
'@anthropic-ai/sdk@0.73.0(zod@4.3.6)':
dependencies:
json-schema-to-ts: 3.1.1
@@ -8669,6 +8676,18 @@ snapshots:
- ws
- zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
@@ -8717,6 +8736,30 @@ snapshots:
- ws
- zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@3.25.76)
'@aws-sdk/client-bedrock-runtime': 3.1008.0
'@google/genai': 1.45.0(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))
'@mistralai/mistralai': 1.14.1
'@sinclair/typebox': 0.34.48
ajv: 8.18.0
ajv-formats: 3.0.1(ajv@8.18.0)
chalk: 5.6.2
openai: 6.26.0(ws@8.20.0)(zod@3.25.76)
partial-json: 0.1.7
proxy-agent: 6.5.0
undici: 7.24.3
zod-to-json-schema: 3.25.1(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@4.3.6)
@@ -8730,7 +8773,7 @@ snapshots:
openai: 6.26.0(ws@8.20.0)(zod@4.3.6)
partial-json: 0.1.7
proxy-agent: 6.5.0
undici: 7.24.6
undici: 7.24.3
zod-to-json-schema: 3.25.1(zod@4.3.6)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
@@ -12589,7 +12632,7 @@ snapshots:
saxes: 6.0.0
symbol-tree: 3.2.4
tough-cookie: 6.0.1
undici: 7.24.6
undici: 7.24.3
w3c-xmlserializer: 5.0.0
webidl-conversions: 8.0.1
whatwg-mimetype: 5.0.0
@@ -13309,6 +13352,11 @@ snapshots:
dependencies:
mimic-function: 5.0.1
openai@6.26.0(ws@8.20.0)(zod@3.25.76):
optionalDependencies:
ws: 8.20.0
zod: 3.25.76
openai@6.26.0(ws@8.20.0)(zod@4.3.6):
optionalDependencies:
ws: 8.20.0
@@ -14440,6 +14488,8 @@ snapshots:
undici@6.21.3: {}
undici@7.24.3: {}
undici@7.24.6: {}
unhomoglyph@1.0.6: {}

View File

@@ -1,125 +0,0 @@
# fix(db): bootstrap migrations on local-tier gateway startup
## Problem
Fresh `mosaic gateway install` (npm-installed) leaves the gateway DB schema empty:
```
relation "users" does not exist
```
Sign-in 500s, `auth users create` says "Not signed in", `admin/bootstrap setup`
also fails — every entry point queries `users` before doing anything else.
## Scope
This PR fixes the **local (PGlite) tier** end-to-end. The postgres-tier path
has additional pre-existing bugs (see "Known issues, out of scope" below) and
needs a separate change with real Postgres validation.
## Root causes addressed (5 stacked bugs on the local-tier path)
1. **`packages/db/package.json` `files: ["dist"]`** — the `drizzle/` SQL
migrations folder is excluded from the published tarball. Even if a
migrate runner existed, it would have nothing to apply.
2. **`packages/db/src/migrate.ts`** only supports `postgres-js`. Local-tier
gateways use embedded PGlite, which can't be reached over a postgres wire
protocol — so `runMigrations()` is unusable for the local tier.
3. **`apps/gateway/src/database/database.module.ts`** never invokes
migrations at startup. The module creates the DB handle and storage
adapter, but no consumer calls `.migrate()` on either. `mosaic storage
migrate` CLI even claims "pglite runs schema setup automatically on first
connection via `adapter.migrate()`" — but `adapter.migrate()` is only
called by tests, never at runtime.
4. **`createPgliteDb` does not load the pgvector extension.** Migration 0001
declares `CREATE EXTENSION IF NOT EXISTS vector;` for the
`insights.embedding` column. Bare PGlite has no pgvector — the migration
fails on extension control file lookup.
5. **Drizzle's PG migrator wraps every migration in one outer transaction.**
Migration 0009 does `ALTER TYPE grant_status ADD VALUE 'pending'` and then
`ALTER TABLE federation_grants ALTER COLUMN status SET DEFAULT 'pending'`.
Postgres' `check_safe_enum_use` rejects the second statement because the
new enum value isn't committed yet. Splitting the migration into two
files doesn't help — drizzle batches all migrations into one outer tx.
## Fix
- `packages/db/package.json` — ship `drizzle/` in `files`.
- `packages/db/src/client-pglite.ts` — load `@electric-sql/pglite/vector`.
- `packages/db/src/migrate.ts` — add `runPgliteMigrations(handle)`. Walks the
Drizzle journal and runs each statement-breakpoint chunk through PGlite's
`client.exec()` (Simple Query protocol → autocommit per statement). Writes
to the standard `drizzle.__drizzle_migrations` ledger so the result is
interoperable with `runMigrations()` on a postgres-backed deployment.
Per-statement try/catch surfaces which statement of which migration failed
and the ledger row is only written on full success.
- `packages/db/src/index.ts` — re-export.
- `apps/gateway/src/database/database.module.ts` — implement `OnModuleInit`:
- Local tier → `runPgliteMigrations(handle)`, then `storageAdapter.migrate()`
(the local storage adapter has its own kv tables in a separate PGlite dir).
- Postgres tier → `storageAdapter.migrate()` only, since
`PostgresAdapter.migrate()` already calls `runMigrations(url)` against
the same DATABASE_URL — we deliberately don't double-call.
NestJS awaits `onModuleInit` before `app.listen()`, so DB-dependent modules
see a populated schema before any HTTP traffic is accepted.
- `packages/storage/src/test-utils/pglite-with-vector.ts`**deleted**.
The "intentionally not exported" rationale is moot now that migration 0001
forces pgvector load anyway. `migrate-tier.integration.test.ts` switched
to `createPgliteDb` + `runPgliteMigrations` from `@mosaicstack/db`.
## Tests
`packages/db/src/migrate.test.ts`:
- Verifies `runPgliteMigrations` creates the BetterAuth tables (the original
failure mode).
- Idempotence (transitively re-runs migration 0009).
- Partial-failure: pre-creates a conflicting `users` table, asserts the
thrown error includes statement context (`hash=… statement #N failed`)
and that no ledger row was written.
## QA evidence
End-to-end on a fresh PGlite install:
- `[DatabaseModule] Applying PGlite schema migrations...` then
`Initializing storage adapter (pglite)...` in startup log.
- `GET /api/bootstrap/status``{"needsSetup":true}` HTTP 200 (was 500
with `relation "users" does not exist`).
- `POST /api/bootstrap/setup` with empty body → HTTP 400 with Zod
validation error (was 500), confirming the request reached the
validator past the table-existence check.
## Known issues, out of scope (file separately)
- **Postgres-tier first install is still broken.** `runMigrations()` uses
Drizzle's `migratePostgres`, which has the same outer-transaction problem
as PGlite's migrator. A fresh standalone-tier install would also fail at
migration 0009. Inline TODO in `migrate.ts:31-35` flags this. Fixing it
needs either (a) a shared per-statement loop reused for both drivers, or
(b) splitting migration 0009.
- **`drizzle/meta/_journal.json` has 0009 ordered before 0008** (`when`
values `1745280000000` < `1776822435828`). `migratePostgres` skips by
`created_at < folderMillis`, so on a postgres deployment that already
applied 0008, 0009 would be skipped forever. Our hash-based skip in the
PGlite path sidesteps this.
- **No advisory lock around the migration loop.** Two gateway processes
pointed at the same DATABASE_URL would race. PGlite is single-process by
file lock so the local tier is fine; postgres-tier deployments should add
`pg_advisory_lock(<deterministic-id>)` around the loop in a follow-up.
- **`mosaic storage migrate` CLI message is misleading** — it claims
"automatic on first connection via adapter.migrate()" but the adapter
doesn't self-migrate. With this PR the gateway invokes it explicitly, but
the CLI message could still be tightened.
- **Crash mid-migration leaves a partial-state PGlite DB without a ledger
row.** Detected loudly on next boot (the replay errors on "already
exists"), but recovery is manual (drop the partially-applied objects or
insert the migration hash into `drizzle.__drizzle_migrations`). A robust
fix would add a "started_at" column to a sidecar table to detect
half-applied state and refuse to start with actionable guidance.