Compare commits

..

13 Commits

Author SHA1 Message Date
Jarvis
2f5b6ca2e7 fix(git): avoid duplicate gh repo flag in pr-diff
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/pr/ci Pipeline failed
ci/woodpecker/manual/ci Pipeline was successful
2026-05-25 14:20:13 -05:00
Jarvis
4741b3e16f fix(git): pass explicit repo to Gitea wrappers
Some checks failed
ci/woodpecker/push/ci Pipeline was canceled
ci/woodpecker/pr/ci Pipeline was canceled
2026-05-25 14:19:52 -05:00
Jarvis
2d4d799c26 fix(ci): avoid postgres service collision in k8s backend
Some checks failed
ci/woodpecker/push/ci Pipeline was canceled
ci/woodpecker/pr/ci Pipeline was canceled
2026-05-25 14:08:45 -05:00
Jarvis
410ada409c fix: handle legacy woodpecker mosaic credentials
Some checks failed
ci/woodpecker/push/ci Pipeline is running
ci/woodpecker/pr/ci Pipeline failed
2026-05-25 11:29:27 -05:00
755df9079e Merge pull request 'fix(db): bootstrap migrations on local-tier gateway startup' (#510) from fix/db-bootstrap-migrations into main 2026-05-04 22:13:14 +00:00
ac5650d9f9 fix(db): bootstrap migrations on local-tier gateway startup
Fresh `mosaic gateway install` (npm) left the gateway DB schema empty —
sign-in 500'd with `relation "users" does not exist`, and every entry
point (auth, bootstrap setup) failed because they all query the users
table first. Five stacked bugs on the local (PGlite) tier:

1. `packages/db/package.json` `files: ["dist"]` excluded the `drizzle/`
   SQL migrations from the published tarball.
2. `runMigrations()` only supports postgres-js — unusable for embedded
   PGlite.
3. `apps/gateway/src/database/database.module.ts` never invoked
   migrations at startup.
4. `createPgliteDb` didn't load pgvector, so migration 0001's
   `CREATE EXTENSION vector` failed.
5. Drizzle's PG migrator wraps every migration in one outer
   transaction, which trips Postgres' `check_safe_enum_use` on
   migration 0009 (`ALTER TYPE ADD VALUE 'pending'` → `SET DEFAULT
   'pending'` in the same tx).

Changes:
- Ship `drizzle/` in the published tarball.
- `createPgliteDb` loads `@electric-sql/pglite/vector`.
- New `runPgliteMigrations(handle)` walks the Drizzle journal and
  runs each statement-breakpoint chunk through PGlite's `client.exec()`
  (autocommit per statement). Records into `drizzle.__drizzle_migrations`
  for interop with the postgres-js path. Per-statement try/catch
  surfaces which statement of which migration failed.
- `DatabaseModule` runs migrations in `OnModuleInit` before
  `app.listen()`. Local tier: explicit `runPgliteMigrations` then
  `storageAdapter.migrate()`. Postgres tier: just `storageAdapter.migrate()`,
  which already calls `runMigrations(url)` internally — no double-call.
- Removed `packages/storage/src/test-utils/pglite-with-vector.ts`. The
  "intentionally not exported" rationale is moot now that migration
  0001 forces pgvector load anyway. The integration test uses
  `createPgliteDb` + `runPgliteMigrations` from `@mosaicstack/db`.

Tests: BetterAuth tables exist after migrate; idempotent (re-runs 0009);
partial-failure surfaces statement-level context and leaves no ledger row.

QA on a fresh PGlite install:
- `Applying PGlite schema migrations...` then `Initializing storage
  adapter (pglite)...` in startup log.
- `GET /api/bootstrap/status` → `{"needsSetup":true}` HTTP 200 (was 500).
- `POST /api/bootstrap/setup` reaches Zod validator (was 500).

Scope: this PR fixes the local (PGlite) tier. Postgres-tier first
install still has the outer-transaction problem and a journal ordering
bug (0009's `when` < 0008's). Documented inline as TODO and in the
scratchpad — needs a separate change with real-Postgres validation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 17:06:50 -05:00
bd83f86740 Merge pull request 'feat(federation): mTLS AuthGuard with OID-based grant resolution (FED-M3-03)' (#509) from feat/federation-m3-auth-guard into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful
2026-04-25 13:27:20 +00:00
Jarvis
0af3e218a1 fix(federation/auth-guard): remediate CRIT-1/CRIT-2 + HIGH-1..4 review findings
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
- CRIT-1: Validate cert subjectUserId against grant.subjectUserId from DB;
  use authoritative DB value in FederationContext
- CRIT-2: Add @Inject(GrantsService) decorator (tsx/esbuild requirement)
- HIGH-1: Validate UTF8String TLV tag, length, and bounds in OID parser
- HIGH-2: Collapse all 403 wire messages to a generic string to prevent
  grant enumeration; keep internal logger detail
- HIGH-3: Assert federation wire envelope shape in all guard tests
- HIGH-4: Regression test for subjectUserId cert/DB mismatch

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 06:33:37 -05:00
Jarvis
b01c9b3bb0 feat(federation): mTLS AuthGuard with OID-based grant resolution (FED-M3-03)
Adds FederationAuthGuard that validates inbound mTLS client certs on
federation API routes. Extracts custom OIDs (grantId, subjectUserId),
loads the grant+peer from DB in one query, asserts active status, and
validates cert serial as defense-in-depth. Attaches FederationContext
to requests on success and uses federation wire-format error envelopes
(not raw NestJS exceptions) for 401/403 responses.

New files:
- apps/gateway/src/federation/oid.util.ts — shared OID extraction (no dupe ASN.1 logic)
- apps/gateway/src/federation/server/federation-auth.guard.ts — guard impl
- apps/gateway/src/federation/server/federation-context.ts — FederationContext type + module augment
- apps/gateway/src/federation/server/index.ts — barrel export
- apps/gateway/src/federation/server/__tests__/federation-auth.guard.spec.ts — 11 unit tests

Modified:
- apps/gateway/src/federation/grants.service.ts — adds getGrantWithPeer() with join
- apps/gateway/src/federation/federation.module.ts — registers FederationAuthGuard as provider

Closes #462

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 06:33:37 -05:00
b67f2c9f08 Merge pull request 'feat(federation): outbound mTLS FederationClient (FED-M3-08)' (#508) from feat/federation-m3-client into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful
2026-04-24 04:30:29 +00:00
Jarvis
37675ae3f2 fix(federation/client): serialize cache fills, destroy evicted Agent, cover env-var guard
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
- HIGH-A: resolveEntry now uses promise-cache pattern so concurrent
  callers serialize on a single in-flight build, eliminating duplicate
  key material in heap and duplicate DB round-trips
- HIGH-B: flushPeer destroys the evicted undici Agent so stale TLS
  connections close on cert rotation
- MED-C: add regression test for PEER_MISCONFIGURED when
  STEP_CA_ROOT_CERT_PATH is unset

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 22:56:57 -05:00
Jarvis
a4a6769a6d fix(federation/client): pin Step-CA root, fix lockfile, harden cache test
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
CRIT-1: regenerate pnpm-lock.yaml so apps/gateway resolves undici@7.24.6
(prior PR pushed package.json without lockfile update; CI failed with
ERR_PNPM_OUTDATED_LOCKFILE). Incidentally cleans 57 lines of stale
peer-dep entries.

CRIT-2: cache-hit test no longer swallows resolveEntry errors. Calls the
private method directly twice and asserts identity equality plus a
single DB select, removing the silent-failure path the prior assertion
allowed.

HIGH-1: mTLS Agent now pins Step-CA root via STEP_CA_ROOT_CERT_PATH.
Without the env var resolveEntry throws PEER_MISCONFIGURED, refusing to
dial peers against the public trust store. PEM is read once and cached
on the service instance.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 22:30:09 -05:00
Jarvis
21650fb194 feat(federation): outbound mTLS FederationClient (FED-M3-08)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/pr/ci Pipeline failed
Implements FederationClientService — a NestJS injectable that dials peer
gateways over mTLS (undici Agent with cert+sealed-key from federation_peers),
invokes list/get/capabilities verbs, validates responses via Zod, and surfaces
all failure modes as typed FederationClientError with a coherent error code
taxonomy (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK,
FORBIDDEN, HTTP_{status}, INVALID_RESPONSE).

Per-peer Agent instances are cached in a Map for the service lifetime;
flushPeer(peerId) invalidates the cache for M5/M6 cert rotation and
revocation events.

Wired into FederationModule providers + exports so QuerySourceService
(M3-09) can inject it.

13 unit tests covering all required scenarios via undici MockAgent +
real sealClientKey/unsealClientKey round-trip.

Closes #462

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 22:16:52 -05:00
24 changed files with 1620 additions and 205 deletions

View File

@@ -46,18 +46,28 @@ steps:
test:
image: *node_image
environment:
DATABASE_URL: postgresql://mosaic:mosaic@postgres:5432/mosaic
# Avoid the namespace-level Woodpecker DB service named "postgres".
# The Kubernetes backend exposes service containers by step name.
DATABASE_URL: postgresql://mosaic:mosaic@ci-postgres:5432/mosaic
commands:
- *enable_pnpm
# Install postgresql-client for pg_isready
- apk add --no-cache postgresql-client
# Wait up to 30s for postgres to be ready
# Wait up to 60s for CI postgres to be ready; fail fast if it never comes up.
- |
for i in $(seq 1 30); do
pg_isready -h postgres -p 5432 -U mosaic && break
echo "Waiting for postgres ($i/30)..."
ready=0
for i in $(seq 1 60); do
if pg_isready -h ci-postgres -p 5432 -U mosaic; then
ready=1
break
fi
echo "Waiting for ci-postgres ($i/60)..."
sleep 1
done
if [ "$ready" -ne 1 ]; then
echo "ci-postgres did not become ready" >&2
exit 1
fi
# Run migrations (DATABASE_URL is set in environment above)
- pnpm --filter @mosaicstack/db run db:migrate
# Run all tests
@@ -66,7 +76,7 @@ steps:
- typecheck
services:
postgres:
ci-postgres:
image: pgvector/pgvector:pg17
environment:
POSTGRES_USER: mosaic

View File

@@ -73,6 +73,7 @@
"rxjs": "^7.8.0",
"socket.io": "^4.8.0",
"uuid": "^11.0.0",
"undici": "^7.24.6",
"zod": "^4.3.6"
},
"devDependencies": {

View File

@@ -1,8 +1,21 @@
import { mkdirSync } from 'node:fs';
import { homedir } from 'node:os';
import { join } from 'node:path';
import { Global, Inject, Module, type OnApplicationShutdown } from '@nestjs/common';
import { createDb, createPgliteDb, type Db, type DbHandle } from '@mosaicstack/db';
import {
Global,
Inject,
Logger,
Module,
type OnApplicationShutdown,
type OnModuleInit,
} from '@nestjs/common';
import {
createDb,
createPgliteDb,
runPgliteMigrations,
type Db,
type DbHandle,
} from '@mosaicstack/db';
import { createStorageAdapter, type StorageAdapter } from '@mosaicstack/storage';
import type { MosaicConfig } from '@mosaicstack/config';
import { MOSAIC_CONFIG } from '../config/config.module.js';
@@ -39,12 +52,37 @@ export const STORAGE_ADAPTER = 'STORAGE_ADAPTER';
],
exports: [DB, STORAGE_ADAPTER],
})
export class DatabaseModule implements OnApplicationShutdown {
export class DatabaseModule implements OnApplicationShutdown, OnModuleInit {
private readonly logger = new Logger(DatabaseModule.name);
constructor(
@Inject(DB_HANDLE) private readonly handle: DbHandle,
@Inject(STORAGE_ADAPTER) private readonly storageAdapter: StorageAdapter,
@Inject(MOSAIC_CONFIG) private readonly config: MosaicConfig,
) {}
// Migrations must complete before any module that injects DB starts serving
// requests. NestJS awaits onModuleInit before app.listen(), and modules that
// inject DB are initialized after this one — so all DB-dependent code sees a
// populated schema before the first HTTP request lands.
//
// Local (PGlite) tier: we run gateway-DB migrations explicitly here. The
// storage adapter writes to a separate PGlite directory and only manages its
// own KV tables, so we still call its migrate() afterwards.
//
// Postgres tier: PostgresAdapter.migrate() already calls runMigrations() on
// the same DATABASE_URL, so a single call covers both the gateway DB and
// the storage tables. We deliberately do NOT call runMigrations() here to
// avoid opening a second short-lived connection and doubling startup cost.
async onModuleInit(): Promise<void> {
if (this.config.tier === 'local') {
this.logger.log('Applying PGlite schema migrations...');
await runPgliteMigrations(this.handle);
}
this.logger.log(`Initializing storage adapter (${this.storageAdapter.name})...`);
await this.storageAdapter.migrate();
}
async onApplicationShutdown(): Promise<void> {
await Promise.all([this.handle.close(), this.storageAdapter.close()]);
}

View File

@@ -0,0 +1,553 @@
/**
* Unit tests for FederationClientService (FED-M3-08).
*
* HTTP mocking strategy:
* undici MockAgent is used to intercept outbound HTTP requests. The service
* uses `undici.fetch` with a `dispatcher` option, so MockAgent is set as the
* global dispatcher and all requests flow through it.
*
* Because the service builds one `undici.Agent` per peer and passes it as
* the dispatcher on every fetch call, we cannot intercept at the Agent level
* in unit tests without significant refactoring. Instead, we set the global
* dispatcher to a MockAgent and override the service's `doRequest` indirection
* by spying on the internal fetch call.
*
* For the cert/key wiring, we use the real `sealClientKey` function from
* peer-key.util.ts with a test secret — no stubs.
*
* Sealed-key setup:
* Each test (or beforeAll) calls `sealClientKey(TEST_PRIVATE_KEY_PEM)` with
* BETTER_AUTH_SECRET set to a deterministic test value so that
* `unsealClientKey` in the service recovers the original PEM.
*/
import 'reflect-metadata';
import { describe, it, expect, vi, beforeEach, afterEach, beforeAll, afterAll } from 'vitest';
import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from 'undici';
import type { Dispatcher } from 'undici';
import { writeFileSync, unlinkSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { Db } from '@mosaicstack/db';
import { FederationClientService, FederationClientError } from '../federation-client.service.js';
import { sealClientKey } from '../../peer-key.util.js';
// ---------------------------------------------------------------------------
// Test constants
// ---------------------------------------------------------------------------
const TEST_SECRET = 'test-secret-for-federation-client-spec-only';
const PEER_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa';
const ENDPOINT = 'https://peer.example.com';
// Minimal valid RSA/EC private key PEM — does NOT need to be a real key for
// unit tests because we only verify it round-trips through seal/unseal, not
// that it actually negotiates TLS (MockAgent handles that).
const TEST_PRIVATE_KEY_PEM = `-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDummyKeyForTests
-----END PRIVATE KEY-----`;
// Minimal self-signed cert PEM (dummy — only used for mTLS Agent construction)
const TEST_CERT_PEM = `-----BEGIN CERTIFICATE-----
MIIBdummyCertForFederationClientTests==
-----END CERTIFICATE-----`;
const TEST_CERT_SERIAL = 'ABCDEF1234567890';
// ---------------------------------------------------------------------------
// Sealed key (computed once in beforeAll)
// ---------------------------------------------------------------------------
let SEALED_KEY: string;
// Path to a stub Step-CA root cert file written in beforeAll. The cert is never
// actually used to negotiate TLS in unit tests (MockAgent + spy on resolveEntry
// short-circuit the network), but loadStepCaRoot() requires the file to exist.
const STUB_CA_PEM_PATH = join(tmpdir(), 'federation-client-spec-ca.pem');
const STUB_CA_PEM = `-----BEGIN CERTIFICATE-----
MIIBdummyCAforFederationClientSpecOnly==
-----END CERTIFICATE-----
`;
// ---------------------------------------------------------------------------
// Peer row factory
// ---------------------------------------------------------------------------
function makePeerRow(overrides: Partial<Record<string, unknown>> = {}) {
return {
id: PEER_ID,
commonName: 'peer-example-com',
displayName: 'Test Peer',
certPem: TEST_CERT_PEM,
certSerial: TEST_CERT_SERIAL,
certNotAfter: new Date('2030-01-01T00:00:00Z'),
clientKeyPem: SEALED_KEY,
state: 'active' as const,
endpointUrl: ENDPOINT,
lastSeenAt: null,
createdAt: new Date('2026-01-01T00:00:00Z'),
revokedAt: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Mock DB builder
// ---------------------------------------------------------------------------
function makeDb(selectRows: unknown[] = [makePeerRow()]): Db {
const limitSelect = vi.fn().mockResolvedValue(selectRows);
const whereSelect = vi.fn().mockReturnValue({ limit: limitSelect });
const fromSelect = vi.fn().mockReturnValue({ where: whereSelect });
const selectMock = vi.fn().mockReturnValue({ from: fromSelect });
return {
select: selectMock,
insert: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
transaction: vi.fn(),
} as unknown as Db;
}
// ---------------------------------------------------------------------------
// Helpers for MockAgent HTTP interception
// ---------------------------------------------------------------------------
/**
* Create a MockAgent + MockPool for the peer endpoint, set it as the global
* dispatcher, and return both for per-test configuration.
*/
function makeMockAgent() {
const mockAgent = new MockAgent({ connections: 1 });
mockAgent.disableNetConnect();
setGlobalDispatcher(mockAgent);
const pool = mockAgent.get(ENDPOINT);
return { mockAgent, pool };
}
/**
* Build a FederationClientService with a mock DB and a spy on the internal
* fetch so we can intercept at the HTTP layer via MockAgent.
*
* The service calls `fetch(url, { dispatcher: agent })` where `agent` is the
* mTLS undici.Agent built from the peer's cert+key. To make MockAgent work,
* we need the fetch dispatcher to be the MockAgent, not the per-peer Agent.
*
* Strategy: we replace the private `resolveEntry` result's `agent` field with
* the MockAgent's pool, so fetch uses our interceptor. We do this by spying
* on `resolveEntry` and returning a controlled entry.
*/
function makeService(db: Db, mockPool: Dispatcher): FederationClientService {
const svc = new FederationClientService(db);
// Override resolveEntry to inject MockAgent pool as the dispatcher
vi.spyOn(
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> },
'resolveEntry',
).mockImplementation(async (_peerId: string) => {
// Still call DB (via the real logic) to exercise peer validation,
// but return mock pool as the agent.
// For simplicity in unit tests, directly return a controlled entry.
return {
agent: mockPool,
endpointUrl: ENDPOINT,
certPem: TEST_CERT_PEM,
certSerial: TEST_CERT_SERIAL,
};
});
return svc;
}
// ---------------------------------------------------------------------------
// Test setup
// ---------------------------------------------------------------------------
let originalDispatcher: Dispatcher;
beforeAll(() => {
// Seal the test key once — requires BETTER_AUTH_SECRET
const saved = process.env['BETTER_AUTH_SECRET'];
process.env['BETTER_AUTH_SECRET'] = TEST_SECRET;
try {
SEALED_KEY = sealClientKey(TEST_PRIVATE_KEY_PEM);
} finally {
if (saved === undefined) {
delete process.env['BETTER_AUTH_SECRET'];
} else {
process.env['BETTER_AUTH_SECRET'] = saved;
}
}
writeFileSync(STUB_CA_PEM_PATH, STUB_CA_PEM, 'utf8');
});
afterAll(() => {
try {
unlinkSync(STUB_CA_PEM_PATH);
} catch {
// best-effort cleanup
}
});
beforeEach(() => {
originalDispatcher = getGlobalDispatcher();
process.env['BETTER_AUTH_SECRET'] = TEST_SECRET;
process.env['STEP_CA_ROOT_CERT_PATH'] = STUB_CA_PEM_PATH;
});
afterEach(() => {
setGlobalDispatcher(originalDispatcher);
vi.restoreAllMocks();
delete process.env['BETTER_AUTH_SECRET'];
delete process.env['STEP_CA_ROOT_CERT_PATH'];
});
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Successful list response body */
const LIST_BODY = {
items: [{ id: '1', title: 'Task One' }],
nextCursor: undefined,
_partial: false,
};
/** Successful get response body */
const GET_BODY = {
item: { id: '1', title: 'Task One' },
_partial: false,
};
/** Successful capabilities response body */
const CAP_BODY = {
resources: ['tasks'],
excluded_resources: [],
max_rows_per_query: 100,
supported_verbs: ['list', 'get', 'capabilities'] as const,
};
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('FederationClientService', () => {
// ─── Successful verb calls ─────────────────────────────────────────────────
describe('list()', () => {
it('returns parsed typed response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/list/tasks',
method: 'POST',
})
.reply(200, LIST_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.list(PEER_ID, 'tasks', {});
expect(result.items).toHaveLength(1);
expect(result.items[0]).toMatchObject({ id: '1', title: 'Task One' });
await mockAgent.close();
});
});
describe('get()', () => {
it('returns parsed typed response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/get/tasks/1',
method: 'POST',
})
.reply(200, GET_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.get(PEER_ID, 'tasks', '1', {});
expect(result.item).toMatchObject({ id: '1', title: 'Task One' });
await mockAgent.close();
});
});
describe('capabilities()', () => {
it('returns parsed capabilities response on success', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({
path: '/api/federation/v1/capabilities',
method: 'GET',
})
.reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } });
const result = await svc.capabilities(PEER_ID);
expect(result.resources).toContain('tasks');
expect(result.max_rows_per_query).toBe(100);
await mockAgent.close();
});
});
// ─── HTTP error surfaces ──────────────────────────────────────────────────
describe('non-2xx responses', () => {
it('surfaces 403 as FederationClientError({ status: 403, code: "FORBIDDEN" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool.intercept({ path: '/api/federation/v1/list/tasks', method: 'POST' }).reply(
403,
{ error: { code: 'forbidden', message: 'Access denied' } },
{
headers: { 'content-type': 'application/json' },
},
);
await expect(svc.list(PEER_ID, 'tasks', {})).rejects.toMatchObject({
status: 403,
code: 'FORBIDDEN',
peerId: PEER_ID,
});
await mockAgent.close();
});
it('surfaces 404 as FederationClientError({ status: 404, code: "HTTP_404" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool.intercept({ path: '/api/federation/v1/get/tasks/999', method: 'POST' }).reply(
404,
{ error: { code: 'not_found', message: 'Not found' } },
{
headers: { 'content-type': 'application/json' },
},
);
await expect(svc.get(PEER_ID, 'tasks', '999', {})).rejects.toMatchObject({
status: 404,
code: 'HTTP_404',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Network error ─────────────────────────────────────────────────────────
describe('network errors', () => {
it('surfaces network error as FederationClientError({ code: "NETWORK" })', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.replyWithError(new Error('ECONNREFUSED'));
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'NETWORK',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Invalid response body ─────────────────────────────────────────────────
describe('invalid response body', () => {
it('surfaces as FederationClientError({ code: "INVALID_RESPONSE" }) when body shape is wrong', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
// capabilities returns wrong shape (missing required fields)
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.reply(200, { totally: 'wrong' }, { headers: { 'content-type': 'application/json' } });
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'INVALID_RESPONSE',
peerId: PEER_ID,
});
await mockAgent.close();
});
});
// ─── Peer DB validation ────────────────────────────────────────────────────
describe('peer validation (without resolveEntry spy)', () => {
/**
* These tests exercise the real `resolveEntry` path — no spy on resolveEntry.
*/
it('throws PEER_NOT_FOUND when peer is not in DB', async () => {
// DB returns empty array (peer not found)
const db = makeDb([]);
const svc = new FederationClientService(db);
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'PEER_NOT_FOUND',
peerId: PEER_ID,
});
});
it('throws PEER_INACTIVE when peer state is not "active"', async () => {
const db = makeDb([makePeerRow({ state: 'suspended' })]);
const svc = new FederationClientService(db);
await expect(svc.capabilities(PEER_ID)).rejects.toMatchObject({
code: 'PEER_INACTIVE',
peerId: PEER_ID,
});
});
});
// ─── Cache behaviour ───────────────────────────────────────────────────────
describe('cache behaviour', () => {
it('hits cache on second call — only one DB lookup happens', async () => {
// Verify cache by calling the private resolveEntry directly twice and
// asserting the DB was queried only once. This avoids the HTTP layer,
// which would require either a real network or per-peer Agent rewiring
// that the cache invariant doesn't depend on.
const db = makeDb();
const selectSpy = vi.spyOn(db, 'select');
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> }
).resolveEntry.bind(svc);
const first = await resolveEntry(PEER_ID);
const second = await resolveEntry(PEER_ID);
expect(first).toBe(second);
expect(selectSpy).toHaveBeenCalledTimes(1);
});
it('serializes concurrent resolveEntry calls — only one DB lookup', async () => {
const db = makeDb();
const selectSpy = vi.spyOn(db, 'select');
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<unknown>;
}
).resolveEntry.bind(svc);
const [a, b] = await Promise.all([resolveEntry(PEER_ID), resolveEntry(PEER_ID)]);
expect(a).toBe(b);
expect(selectSpy).toHaveBeenCalledTimes(1);
});
it('flushPeer destroys the evicted Agent so old TLS connections close', async () => {
const db = makeDb();
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<{ agent: { destroy: () => Promise<void> } }>;
}
).resolveEntry.bind(svc);
const entry = await resolveEntry(PEER_ID);
const destroySpy = vi.spyOn(entry.agent, 'destroy').mockResolvedValue();
svc.flushPeer(PEER_ID);
expect(destroySpy).toHaveBeenCalledTimes(1);
});
it('flushPeer() invalidates cache — next call re-reads DB', async () => {
const db = makeDb();
const { mockAgent, pool } = makeMockAgent();
const svc = makeService(db, pool);
pool
.intercept({ path: '/api/federation/v1/capabilities', method: 'GET' })
.reply(200, CAP_BODY, { headers: { 'content-type': 'application/json' } })
.times(2);
// First call — populates cache (via mock resolveEntry)
await svc.capabilities(PEER_ID);
// Flush the cache
svc.flushPeer(PEER_ID);
// The spy on resolveEntry is still active — check it's called again after flush
const resolveEntrySpy = vi.spyOn(
svc as unknown as { resolveEntry: (peerId: string) => Promise<unknown> },
'resolveEntry',
);
// Second call after flush — should call resolveEntry again
await svc.capabilities(PEER_ID);
// resolveEntry should have been called once after we started spying (post-flush)
expect(resolveEntrySpy).toHaveBeenCalledTimes(1);
await mockAgent.close();
});
});
// ─── loadStepCaRoot env-var guard ─────────────────────────────────────────
describe('loadStepCaRoot() env-var guard', () => {
it('throws PEER_MISCONFIGURED when STEP_CA_ROOT_CERT_PATH is not set', async () => {
delete process.env['STEP_CA_ROOT_CERT_PATH'];
const db = makeDb();
const svc = new FederationClientService(db);
const resolveEntry = (
svc as unknown as {
resolveEntry: (peerId: string) => Promise<unknown>;
}
).resolveEntry.bind(svc);
await expect(resolveEntry(PEER_ID)).rejects.toMatchObject({
code: 'PEER_MISCONFIGURED',
});
});
});
// ─── FederationClientError class ──────────────────────────────────────────
describe('FederationClientError', () => {
it('is instanceof Error and FederationClientError', () => {
const err = new FederationClientError({
code: 'PEER_NOT_FOUND',
message: 'test',
peerId: PEER_ID,
});
expect(err).toBeInstanceOf(Error);
expect(err).toBeInstanceOf(FederationClientError);
expect(err.name).toBe('FederationClientError');
});
it('carries status, code, and peerId', () => {
const err = new FederationClientError({
status: 403,
code: 'FORBIDDEN',
message: 'forbidden',
peerId: PEER_ID,
});
expect(err.status).toBe(403);
expect(err.code).toBe('FORBIDDEN');
expect(err.peerId).toBe(PEER_ID);
});
});
});

View File

@@ -0,0 +1,500 @@
/**
* FederationClientService — outbound mTLS client for federation requests (FED-M3-08).
*
* Dials peer gateways over mTLS using the cert+sealed-key stored in `federation_peers`,
* invokes federation verbs (list / get / capabilities), and surfaces all failure modes
* as typed `FederationClientError` instances.
*
* ## Error code taxonomy
*
* | Code | When |
* | ------------------ | ------------------------------------------------------------- |
* | PEER_NOT_FOUND | No row in federation_peers for the given peerId |
* | PEER_INACTIVE | Peer row exists but state !== 'active' |
* | PEER_MISCONFIGURED | Peer row is active but missing endpointUrl or clientKeyPem |
* | NETWORK | undici threw a connection / TLS / timeout error |
* | HTTP_{status} | Peer returned a non-2xx response (e.g. HTTP_403, HTTP_404) |
* | FORBIDDEN | Peer returned 403 (convenience alias alongside HTTP_403) |
* | INVALID_RESPONSE | Response body failed Zod schema validation |
*
* ## Cache strategy
*
* Per-peer `undici.Agent` instances are cached in a `Map<peerId, AgentCacheEntry>` for
* the lifetime of the service instance. The cache is keyed on peerId (UUID).
*
* Cache invalidation:
* - `flushPeer(peerId)` — removes the entry immediately. M5/M6 MUST call this on
* cert rotation or peer revocation events so the next request re-reads the DB and
* builds a fresh TLS Agent with the new cert material.
* - On cache miss: re-reads the DB, checks state === 'active', rebuilds Agent.
*
* Cache does NOT auto-expire. The service is expected to be a singleton scoped to the
* NestJS application lifecycle; flushing on revocation/rotation is the only invalidation
* path by design (avoids redundant DB round-trips on the hot path).
*/
import { Injectable, Inject, Logger } from '@nestjs/common';
import { readFileSync } from 'node:fs';
import { Agent, fetch as undiciFetch } from 'undici';
import type { Dispatcher } from 'undici';
import { z } from 'zod';
import { type Db, eq, federationPeers } from '@mosaicstack/db';
import {
FederationListResponseSchema,
FederationGetResponseSchema,
FederationCapabilitiesResponseSchema,
FederationErrorEnvelopeSchema,
type FederationListResponse,
type FederationGetResponse,
type FederationCapabilitiesResponse,
} from '@mosaicstack/types';
import { DB } from '../../database/database.module.js';
import { unsealClientKey } from '../peer-key.util.js';
// ---------------------------------------------------------------------------
// Error taxonomy
// ---------------------------------------------------------------------------
/**
* Client-side error code set. Distinct from the server-side `FederationErrorCode`
* (which lives in `@mosaicstack/types`) because the client has additional failure
* modes (PEER_NOT_FOUND, PEER_INACTIVE, PEER_MISCONFIGURED, NETWORK) that the
* server never emits.
*/
export type FederationClientErrorCode =
| 'PEER_NOT_FOUND'
| 'PEER_INACTIVE'
| 'PEER_MISCONFIGURED'
| 'NETWORK'
| 'FORBIDDEN'
| 'INVALID_RESPONSE'
| `HTTP_${number}`;
export interface FederationClientErrorOptions {
status?: number;
code: FederationClientErrorCode;
message: string;
peerId: string;
cause?: unknown;
}
/**
* Thrown by FederationClientService on every failure path.
* Callers can dispatch on `error.code` for programmatic handling.
*/
export class FederationClientError extends Error {
readonly status?: number;
readonly code: FederationClientErrorCode;
readonly peerId: string;
readonly cause?: unknown;
constructor(opts: FederationClientErrorOptions) {
super(opts.message);
this.name = 'FederationClientError';
this.status = opts.status;
this.code = opts.code;
this.peerId = opts.peerId;
this.cause = opts.cause;
}
}
// ---------------------------------------------------------------------------
// Internal cache types
// ---------------------------------------------------------------------------
interface AgentCacheEntry {
agent: Agent;
endpointUrl: string;
certPem: string;
certSerial: string;
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
@Injectable()
export class FederationClientService {
private readonly logger = new Logger(FederationClientService.name);
/**
* Per-peer undici Agent cache.
* Key = peerId (UUID string).
*
* Values are either a resolved `AgentCacheEntry` or an in-flight
* `Promise<AgentCacheEntry>` (promise-cache pattern). Storing the promise
* prevents duplicate DB lookups and duplicate key-unseal operations when two
* requests for the same peer arrive before the first build completes.
*
* Flush via `flushPeer(peerId)` on cert rotation / peer revocation (M5/M6).
*/
private readonly cache = new Map<string, AgentCacheEntry | Promise<AgentCacheEntry>>();
/**
* Step-CA root cert PEM, loaded once from `STEP_CA_ROOT_CERT_PATH`.
* Used as the trust anchor for peer server certificates so federation TLS is
* pinned to our PKI, not the public trust store. Lazily loaded on first use
* so unit tests that don't exercise the agent path can run without the env var.
*/
private cachedCaPem: string | null = null;
constructor(@Inject(DB) private readonly db: Db) {}
// -------------------------------------------------------------------------
// Public verb API
// -------------------------------------------------------------------------
/**
* Invoke the `list` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @param resource Resource path, e.g. "tasks".
* @param request Free-form body sent as JSON in the POST body.
* @returns Parsed `FederationListResponse<T>`.
*/
async list<T>(
peerId: string,
resource: string,
request: Record<string, unknown>,
): Promise<FederationListResponse<T>> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/list/${encodeURIComponent(resource)}`;
const body = await this.doPost(peerId, url, agent, request);
return this.parseWith<FederationListResponse<T>>(
peerId,
body,
FederationListResponseSchema(z.unknown()),
);
}
/**
* Invoke the `get` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @param resource Resource path, e.g. "tasks".
* @param id Resource identifier.
* @param request Free-form body sent as JSON in the POST body.
* @returns Parsed `FederationGetResponse<T>`.
*/
async get<T>(
peerId: string,
resource: string,
id: string,
request: Record<string, unknown>,
): Promise<FederationGetResponse<T>> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/get/${encodeURIComponent(resource)}/${encodeURIComponent(id)}`;
const body = await this.doPost(peerId, url, agent, request);
return this.parseWith<FederationGetResponse<T>>(
peerId,
body,
FederationGetResponseSchema(z.unknown()),
);
}
/**
* Invoke the `capabilities` verb on a remote peer.
*
* @param peerId UUID of the peer row in `federation_peers`.
* @returns Parsed `FederationCapabilitiesResponse`.
*/
async capabilities(peerId: string): Promise<FederationCapabilitiesResponse> {
const { endpointUrl, agent } = await this.resolveEntry(peerId);
const url = `${endpointUrl}/api/federation/v1/capabilities`;
const body = await this.doGet(peerId, url, agent);
return this.parseWith<FederationCapabilitiesResponse>(
peerId,
body,
FederationCapabilitiesResponseSchema,
);
}
// -------------------------------------------------------------------------
// Cache management
// -------------------------------------------------------------------------
/**
* Flush the cached Agent for a specific peer.
*
* M5/M6 MUST call this on:
* - cert rotation events (so new cert material is picked up)
* - peer revocation events (so future requests fail at PEER_INACTIVE)
*
* After flushing, the next call to `list`, `get`, or `capabilities` for
* this peer will re-read the DB and rebuild the Agent.
*/
flushPeer(peerId: string): void {
const entry = this.cache.get(peerId);
if (entry === undefined) {
return;
}
this.cache.delete(peerId);
if (!(entry instanceof Promise)) {
// best-effort destroy; promise-cached entries skip destroy because
// the in-flight build owns its own Agent which will be GC'd when the
// owning request handles the rejection from the cache miss
entry.agent.destroy().catch(() => {
// intentionally ignored — destroy errors are not actionable
});
}
this.logger.log(`Cache flushed for peer ${peerId}`);
}
// -------------------------------------------------------------------------
// Internal helpers
// -------------------------------------------------------------------------
/**
* Load and cache the Step-CA root cert PEM from `STEP_CA_ROOT_CERT_PATH`.
* Throws `FederationClientError` if the env var is unset or the file cannot
* be read — mTLS to a peer without a pinned trust anchor would silently
* fall back to the public trust store.
*/
private loadStepCaRoot(): string {
if (this.cachedCaPem !== null) {
return this.cachedCaPem;
}
const path = process.env['STEP_CA_ROOT_CERT_PATH'];
if (!path) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: 'STEP_CA_ROOT_CERT_PATH is not set; refusing to dial peer without pinned CA trust',
peerId: '',
});
}
try {
const pem = readFileSync(path, 'utf8');
this.cachedCaPem = pem;
return pem;
} catch (err) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Failed to read STEP_CA_ROOT_CERT_PATH (${path})`,
peerId: '',
cause: err,
});
}
}
/**
* Resolve the cache entry for a peer, reading DB on miss.
*
* Uses a promise-cache pattern: concurrent callers for the same uncached
* `peerId` all `await` the same in-flight `Promise<AgentCacheEntry>` so
* only one DB lookup and one key-unseal ever runs per peer per cache miss.
* The promise is replaced with the concrete entry on success, or deleted on
* rejection so a transient error does not poison the cache permanently.
*
* Throws `FederationClientError` with appropriate code if the peer is not
* found, is inactive, or is missing required fields.
*/
private async resolveEntry(peerId: string): Promise<AgentCacheEntry> {
const cached = this.cache.get(peerId);
if (cached) {
return cached; // Promise or concrete entry — both are awaitable
}
const inflight = this.buildEntry(peerId).then(
(entry) => {
this.cache.set(peerId, entry); // replace promise with concrete value
return entry;
},
(err: unknown) => {
this.cache.delete(peerId); // don't poison the cache with a rejected promise
throw err;
},
);
this.cache.set(peerId, inflight);
return inflight;
}
/**
* Build the `AgentCacheEntry` for a peer by reading the DB, validating the
* peer's state, unsealing the private key, and constructing the mTLS Agent.
*
* Throws `FederationClientError` with appropriate code if the peer is not
* found, is inactive, or is missing required fields.
*/
private async buildEntry(peerId: string): Promise<AgentCacheEntry> {
// DB lookup
const [peer] = await this.db
.select()
.from(federationPeers)
.where(eq(federationPeers.id, peerId))
.limit(1);
if (!peer) {
throw new FederationClientError({
code: 'PEER_NOT_FOUND',
message: `Federation peer ${peerId} not found`,
peerId,
});
}
if (peer.state !== 'active') {
throw new FederationClientError({
code: 'PEER_INACTIVE',
message: `Federation peer ${peerId} is not active (state: ${peer.state})`,
peerId,
});
}
if (!peer.endpointUrl || !peer.clientKeyPem) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Federation peer ${peerId} is missing endpointUrl or clientKeyPem`,
peerId,
});
}
// Unseal the private key
let privateKeyPem: string;
try {
privateKeyPem = unsealClientKey(peer.clientKeyPem);
} catch (err) {
throw new FederationClientError({
code: 'PEER_MISCONFIGURED',
message: `Failed to unseal client key for peer ${peerId}`,
peerId,
cause: err,
});
}
// Build mTLS agent — pin trust to Step-CA root so we never accept
// a peer cert signed by a public CA (defense against MITM with a
// publicly-trusted DV cert for the peer's hostname).
const agent = new Agent({
connect: {
cert: peer.certPem,
key: privateKeyPem,
ca: this.loadStepCaRoot(),
// rejectUnauthorized: true is the undici default for HTTPS
},
});
const entry: AgentCacheEntry = {
agent,
endpointUrl: peer.endpointUrl,
certPem: peer.certPem,
certSerial: peer.certSerial,
};
this.logger.log(`Agent cached for peer ${peerId} (serial: ${peer.certSerial})`);
return entry;
}
/**
* Execute a POST request with a JSON body.
* Returns the parsed response body as an unknown value.
* Throws `FederationClientError` on network errors and non-2xx responses.
*/
private async doPost(
peerId: string,
url: string,
agent: Dispatcher,
body: Record<string, unknown>,
): Promise<unknown> {
return this.doRequest(peerId, url, agent, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
}
/**
* Execute a GET request.
* Returns the parsed response body as an unknown value.
* Throws `FederationClientError` on network errors and non-2xx responses.
*/
private async doGet(peerId: string, url: string, agent: Dispatcher): Promise<unknown> {
return this.doRequest(peerId, url, agent, { method: 'GET' });
}
private async doRequest(
peerId: string,
url: string,
agent: Dispatcher,
init: { method: string; headers?: Record<string, string>; body?: string },
): Promise<unknown> {
let response: Awaited<ReturnType<typeof undiciFetch>>;
try {
response = await undiciFetch(url, {
...init,
dispatcher: agent,
});
} catch (err) {
throw new FederationClientError({
code: 'NETWORK',
message: `Network error calling peer ${peerId} at ${url}: ${err instanceof Error ? err.message : String(err)}`,
peerId,
cause: err,
});
}
const rawBody = await response.text().catch(() => '');
if (!response.ok) {
const status = response.status;
// Attempt to parse as federation error envelope
let serverMessage = `HTTP ${status}`;
try {
const json: unknown = JSON.parse(rawBody);
const result = FederationErrorEnvelopeSchema.safeParse(json);
if (result.success) {
serverMessage = result.data.error.message;
}
} catch {
// Not valid JSON or not a federation envelope — use generic message
}
// Specific code for 403 (most actionable for callers); generic HTTP_{n} for others
const code: FederationClientErrorCode = status === 403 ? 'FORBIDDEN' : `HTTP_${status}`;
throw new FederationClientError({
status,
code,
message: `Peer ${peerId} returned ${status}: ${serverMessage}`,
peerId,
});
}
try {
return JSON.parse(rawBody) as unknown;
} catch (err) {
throw new FederationClientError({
code: 'INVALID_RESPONSE',
message: `Peer ${peerId} returned non-JSON body`,
peerId,
cause: err,
});
}
}
/**
* Parse and validate a response body against a Zod schema.
*
* For list/get, callers pass the result of `FederationListResponseSchema(z.unknown())`
* so that the envelope structure is validated without requiring a concrete item schema
* at the client level. The generic `T` provides compile-time typing.
*
* Throws `FederationClientError({ code: 'INVALID_RESPONSE' })` on parse failure.
*/
private parseWith<T>(peerId: string, body: unknown, schema: z.ZodTypeAny): T {
const result = schema.safeParse(body);
if (!result.success) {
const issues = result.error.issues
.map((e: z.ZodIssue) => `[${e.path.join('.') || 'root'}] ${e.message}`)
.join('; ');
throw new FederationClientError({
code: 'INVALID_RESPONSE',
message: `Peer ${peerId} returned invalid response shape: ${issues}`,
peerId,
});
}
return result.data as T;
}
}

View File

@@ -0,0 +1,13 @@
/**
* Federation client barrel — re-exports for FederationModule consumers.
*
* M3-09 (QuerySourceService) and future milestones should import from here,
* not directly from the implementation file.
*/
export {
FederationClientService,
FederationClientError,
type FederationClientErrorCode,
type FederationClientErrorOptions,
} from './federation-client.service.js';

View File

@@ -5,11 +5,25 @@ import { EnrollmentController } from './enrollment.controller.js';
import { EnrollmentService } from './enrollment.service.js';
import { FederationController } from './federation.controller.js';
import { GrantsService } from './grants.service.js';
import { FederationClientService } from './client/index.js';
import { FederationAuthGuard } from './server/index.js';
@Module({
controllers: [EnrollmentController, FederationController],
providers: [AdminGuard, CaService, EnrollmentService, GrantsService, FederationAuthGuard],
exports: [CaService, EnrollmentService, GrantsService, FederationAuthGuard],
providers: [
AdminGuard,
CaService,
EnrollmentService,
GrantsService,
FederationClientService,
FederationAuthGuard,
],
exports: [
CaService,
EnrollmentService,
GrantsService,
FederationClientService,
FederationAuthGuard,
],
})
export class FederationModule {}

View File

@@ -42,6 +42,7 @@
"access": "public"
},
"files": [
"dist"
"dist",
"drizzle"
]
}

View File

@@ -1,10 +1,12 @@
import { PGlite } from '@electric-sql/pglite';
import { vector } from '@electric-sql/pglite/vector';
import { drizzle } from 'drizzle-orm/pglite';
import * as schema from './schema.js';
import type { DbHandle } from './client.js';
export function createPgliteDb(dataDir: string): DbHandle {
const client = new PGlite(dataDir);
// pgvector extension is required by migration 0001 (insights.embedding column).
const client = new PGlite(dataDir, { extensions: { vector } });
const db = drizzle(client, { schema });
return {
db: db as unknown as DbHandle['db'],

View File

@@ -1,6 +1,6 @@
export { createDb, type Db, type DbHandle } from './client.js';
export { createPgliteDb } from './client-pglite.js';
export { runMigrations } from './migrate.js';
export { runMigrations, runPgliteMigrations } from './migrate.js';
export * from './schema.js';
export * from './federation.js';
export {

View File

@@ -0,0 +1,70 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { sql } from 'drizzle-orm';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createPgliteDb } from './client-pglite.js';
import { runPgliteMigrations } from './migrate.js';
import type { DbHandle } from './client.js';
interface PgliteExec {
exec(query: string): Promise<unknown>;
}
describe('runPgliteMigrations', () => {
let dataDir: string;
let handle: DbHandle;
beforeEach(() => {
dataDir = mkdtempSync(join(tmpdir(), 'mosaic-db-migrate-test-'));
handle = createPgliteDb(dataDir);
});
afterEach(async () => {
await handle.close();
rmSync(dataDir, { recursive: true, force: true });
});
it('creates the BetterAuth tables required by the gateway', async () => {
await runPgliteMigrations(handle);
const result = (await handle.db.execute(sql`
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
`)) as unknown as { rows: Array<{ table_name: string }> };
const tables = result.rows.map((r) => r.table_name);
// Auth tables — required for sign-in / bootstrap to function.
expect(tables).toContain('users');
expect(tables).toContain('sessions');
expect(tables).toContain('accounts');
expect(tables).toContain('verifications');
// Schema sanity check — admin token table consumed by mosaic gateway config.
expect(tables).toContain('admin_tokens');
});
it('is idempotent — running twice does not error', async () => {
await runPgliteMigrations(handle);
await expect(runPgliteMigrations(handle)).resolves.toBeUndefined();
});
it('surfaces statement-level error context on failure and leaves no ledger row', async () => {
// Pre-create a `users` table that conflicts with migration 0000's CREATE TABLE,
// forcing it to fail without IF NOT EXISTS.
const client = (handle.db as unknown as { $client: PgliteExec }).$client;
await client.exec('CREATE TABLE users (sentinel text)');
await expect(runPgliteMigrations(handle)).rejects.toThrow(
/migration hash=[a-f0-9]+ statement #\d+ failed/,
);
// Ledger should be empty — partial application must not pretend to be complete.
const ledger = (await handle.db.execute(
sql`SELECT count(*)::int AS count FROM drizzle.__drizzle_migrations`,
)) as unknown as { rows: Array<{ count: number }> };
expect(ledger.rows[0]?.count).toBe(0);
});
});

View File

@@ -1,18 +1,109 @@
import { dirname, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { drizzle } from 'drizzle-orm/postgres-js';
import { migrate } from 'drizzle-orm/postgres-js/migrator';
import { sql } from 'drizzle-orm';
import { drizzle as drizzlePostgres } from 'drizzle-orm/postgres-js';
import { migrate as migratePostgres } from 'drizzle-orm/postgres-js/migrator';
import { readMigrationFiles } from 'drizzle-orm/migrator';
import postgres from 'postgres';
import { DEFAULT_DATABASE_URL } from './defaults.js';
import type { DbHandle } from './client.js';
interface PgliteExecutor {
exec(query: string): Promise<unknown>;
}
interface ExecuteRows<T> {
rows: T[];
}
function migrationsFolder(): string {
const here = dirname(fileURLToPath(import.meta.url));
return resolve(here, '../drizzle');
}
export async function runMigrations(url?: string): Promise<void> {
const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL;
const sql = postgres(connectionString, { max: 1 });
const db = drizzle(sql);
const __dirname = dirname(fileURLToPath(import.meta.url));
const sqlClient = postgres(connectionString, { max: 1 });
const db = drizzlePostgres(sqlClient);
try {
await migrate(db, { migrationsFolder: resolve(__dirname, '../drizzle') });
// TODO: postgres-tier first-install also fails because (a) Drizzle wraps every
// migration in one transaction (breaks 0009's ALTER TYPE ADD VALUE → SET DEFAULT
// sequence) and (b) drizzle/meta/_journal.json has 0009 ordered before 0008,
// which the postgres-js migrator skips by `created_at < folderMillis`. The
// PGlite path below sidesteps both. A follow-up should either share the
// per-statement loop (see runPgliteMigrations) or fix the journal ordering.
await migratePostgres(db, { migrationsFolder: migrationsFolder() });
} finally {
await sql.end();
await sqlClient.end();
}
}
// Apply Drizzle migrations against an embedded PGlite database.
//
// We don't reuse drizzle's pglite migrator because it wraps ALL migrations in
// one outer transaction, which breaks Postgres' `check_safe_enum_use` rule —
// e.g. migration 0009 does `ALTER TYPE ADD VALUE 'pending'` then references
// `'pending'` as a default in the same tx. PGlite's `exec()` runs each
// statement under the Simple Query protocol, autocommitting between them.
//
// We still write to the standard `drizzle.__drizzle_migrations` ledger so the
// result is interoperable with `runMigrations()` on a postgres-backed deploy
// (modulo the journal-ordering bug noted above).
//
// We skip-by-hash rather than skip-by-folderMillis (which is what Drizzle's
// postgres-js migrator does). That's deliberate — out-of-order timestamps in
// `_journal.json` won't silently drop migrations.
//
// Failure model: each statement autocommits, and the ledger row is written
// only after all statements in a migration succeed. A crash mid-migration
// leaves the prefix applied with no ledger entry, so the next boot will
// replay those statements and fail loudly on "already exists". Recovery:
// drop the partially-applied objects, or insert the migration's hash into
// `drizzle.__drizzle_migrations` manually. The error log identifies which
// statement of which migration was the culprit.
export async function runPgliteMigrations(handle: DbHandle): Promise<void> {
const client = (handle.db as unknown as { $client?: PgliteExecutor }).$client;
if (!client || typeof client.exec !== 'function') {
throw new Error('runPgliteMigrations: handle.db is not backed by a PGlite client');
}
await client.exec('CREATE SCHEMA IF NOT EXISTS drizzle');
await client.exec(`
CREATE TABLE IF NOT EXISTS drizzle.__drizzle_migrations (
id SERIAL PRIMARY KEY,
hash text NOT NULL,
created_at bigint
)
`);
const appliedRows = (await handle.db.execute(
sql`SELECT hash FROM drizzle.__drizzle_migrations`,
)) as unknown as ExecuteRows<{ hash: string }>;
const applied = new Set(appliedRows.rows.map((r) => r.hash));
const migrations = readMigrationFiles({ migrationsFolder: migrationsFolder() });
for (const migration of migrations) {
if (applied.has(migration.hash)) continue;
// Run each statement-breakpoint chunk in its own exec() call so PGlite
// commits between statements — this is what lets `ALTER TYPE ADD VALUE`
// become visible before a subsequent statement references the new value.
for (const [stmtIdx, stmt] of migration.sql.entries()) {
const trimmed = stmt.trim();
if (!trimmed) continue;
try {
await client.exec(trimmed);
} catch (err) {
const cause = err instanceof Error ? err.message : String(err);
throw new Error(
`runPgliteMigrations: migration hash=${migration.hash} statement #${stmtIdx} failed: ${cause}\n` +
`Statement: ${trimmed.slice(0, 200)}${trimmed.length > 200 ? '…' : ''}`,
{ cause: err },
);
}
}
await handle.db.execute(
sql`INSERT INTO drizzle.__drizzle_migrations (hash, created_at) VALUES (${migration.hash}, ${migration.folderMillis})`,
);
}
}

View File

@@ -52,6 +52,20 @@ _mosaic_sync_woodpecker_env() {
printf '%s\n' "$expected" > "$env_file"
}
# Load legacy flat Woodpecker credentials (.woodpecker.url / .woodpecker.token).
# Some environments export WOODPECKER_INSTANCE=mosaic, but the current
# credentials.json may still use the legacy flat schema. Treat "mosaic" as the
# default flat instance when a nested .woodpecker.mosaic object is absent.
_mosaic_load_woodpecker_legacy() {
export WOODPECKER_URL="$(_mosaic_read_cred '.woodpecker.url')"
export WOODPECKER_TOKEN="$(_mosaic_read_cred '.woodpecker.token')"
export WOODPECKER_INSTANCE="${WOODPECKER_INSTANCE:-mosaic}"
WOODPECKER_URL="${WOODPECKER_URL%/}"
[[ -n "$WOODPECKER_URL" ]] || { echo "Error: woodpecker.url not found" >&2; return 1; }
[[ -n "$WOODPECKER_TOKEN" ]] || { echo "Error: woodpecker.token not found" >&2; return 1; }
_mosaic_sync_woodpecker_env "$WOODPECKER_INSTANCE" "$WOODPECKER_URL" "$WOODPECKER_TOKEN"
}
load_credentials() {
local service="$1"
@@ -155,7 +169,14 @@ EOF
;;
woodpecker-*)
local wp_instance="${service#woodpecker-}"
# credentials.json is authoritative — always read from it, ignore env
# credentials.json is authoritative — always read from it, ignore env.
# Backward compatibility: the default Mosaic Woodpecker instance may be
# stored in the legacy flat schema (.woodpecker.url/.token) instead of
# .woodpecker.mosaic.url/.token.
if [[ "$wp_instance" == "mosaic" ]] && [[ -z "$(_mosaic_read_cred '.woodpecker.mosaic.url')" ]] && [[ -n "$(_mosaic_read_cred '.woodpecker.url')" ]]; then
WOODPECKER_INSTANCE="mosaic" _mosaic_load_woodpecker_legacy
return $?
fi
export WOODPECKER_URL="$(_mosaic_read_cred ".woodpecker.${wp_instance}.url")"
export WOODPECKER_TOKEN="$(_mosaic_read_cred ".woodpecker.${wp_instance}.token")"
export WOODPECKER_INSTANCE="$wp_instance"
@@ -166,7 +187,10 @@ EOF
_mosaic_sync_woodpecker_env "$wp_instance" "$WOODPECKER_URL" "$WOODPECKER_TOKEN"
;;
woodpecker)
# Resolve default instance, then load it
# Resolve default instance, then load it. If WOODPECKER_INSTANCE is set to
# "mosaic" by a shell/profile but credentials.json still uses the legacy
# flat .woodpecker.url/.token schema, load the flat credentials instead of
# failing with "woodpecker.mosaic.url not found".
local wp_default
wp_default="${WOODPECKER_INSTANCE:-$(_mosaic_read_cred '.woodpecker.default')}"
if [[ -z "$wp_default" ]]; then
@@ -174,18 +198,18 @@ EOF
local legacy_url
legacy_url="$(_mosaic_read_cred '.woodpecker.url')"
if [[ -n "$legacy_url" ]]; then
export WOODPECKER_URL="${WOODPECKER_URL:-$legacy_url}"
export WOODPECKER_TOKEN="${WOODPECKER_TOKEN:-$(_mosaic_read_cred '.woodpecker.token')}"
WOODPECKER_URL="${WOODPECKER_URL%/}"
[[ -n "$WOODPECKER_URL" ]] || { echo "Error: woodpecker.url not found" >&2; return 1; }
[[ -n "$WOODPECKER_TOKEN" ]] || { echo "Error: woodpecker.token not found" >&2; return 1; }
_mosaic_load_woodpecker_legacy
else
echo "Error: woodpecker.default not set and no WOODPECKER_INSTANCE env var" >&2
echo "Available instances: $(jq -r '.woodpecker | keys | join(", ")' "$MOSAIC_CREDENTIALS_FILE" 2>/dev/null)" >&2
return 1
fi
else
load_credentials "woodpecker-${wp_default}"
if [[ "$wp_default" == "mosaic" ]] && [[ -z "$(_mosaic_read_cred '.woodpecker.mosaic.url')" ]] && [[ -n "$(_mosaic_read_cred '.woodpecker.url')" ]]; then
WOODPECKER_INSTANCE="mosaic" _mosaic_load_woodpecker_legacy
else
load_credentials "woodpecker-${wp_default}"
fi
fi
;;
cloudflare-*)

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# issue-list.sh - List issues on Gitea or GitHub
# Usage: issue-list.sh [-s state] [-l label] [-m milestone] [-a assignee]
# Usage: issue-list.sh [-r owner/repo] [-s state] [-l label] [-m milestone] [-a assignee]
set -e
@@ -13,6 +13,7 @@ LABEL=""
MILESTONE=""
ASSIGNEE=""
LIMIT=100
REPO_OVERRIDE=""
usage() {
cat <<EOF
@@ -26,12 +27,14 @@ Options:
-m, --milestone NAME Filter by milestone name
-a, --assignee USER Filter by assignee
-n, --limit N Maximum issues to show (default: 100)
-r, --repo OWNER/REPO Repository slug (default: infer from git origin)
-h, --help Show this help message
Examples:
$(basename "$0") # List open issues
$(basename "$0") -s all -l bug # All issues with 'bug' label
$(basename "$0") -m "0.2.0" # Issues in milestone 0.2.0
$(basename "$0") --repo ddk/ai-bma # List issues from anywhere
EOF
exit 1
}
@@ -59,6 +62,10 @@ while [[ $# -gt 0 ]]; do
LIMIT="$2"
shift 2
;;
-r|--repo)
REPO_OVERRIDE="$2"
shift 2
;;
-h|--help)
usage
;;
@@ -69,25 +76,33 @@ while [[ $# -gt 0 ]]; do
esac
done
PLATFORM=$(detect_platform)
if [[ -n "$REPO_OVERRIDE" ]]; then
REPO_INFO="$REPO_OVERRIDE"
PLATFORM=$(detect_platform 2>/dev/null || echo gitea)
else
PLATFORM=$(detect_platform)
REPO_INFO=$(get_repo_info)
fi
if [[ -z "$REPO_INFO" || "$REPO_INFO" == error:* ]]; then
echo "Error: Could not determine repository from git origin. Run from a repo or pass --repo." >&2
exit 1
fi
case "$PLATFORM" in
github)
CMD="gh issue list --state $STATE --limit $LIMIT"
[[ -n "$LABEL" ]] && CMD="$CMD --label \"$LABEL\""
[[ -n "$MILESTONE" ]] && CMD="$CMD --milestone \"$MILESTONE\""
[[ -n "$ASSIGNEE" ]] && CMD="$CMD --assignee \"$ASSIGNEE\""
eval "$CMD"
CMD=(gh issue list --repo "$REPO_INFO" --state "$STATE" --limit "$LIMIT")
[[ -n "$LABEL" ]] && CMD+=(--label "$LABEL")
[[ -n "$MILESTONE" ]] && CMD+=(--milestone "$MILESTONE")
[[ -n "$ASSIGNEE" ]] && CMD+=(--assignee "$ASSIGNEE")
"${CMD[@]}"
;;
gitea)
CMD="tea issues list --state $STATE --limit $LIMIT"
[[ -n "$LABEL" ]] && CMD="$CMD --labels \"$LABEL\""
[[ -n "$MILESTONE" ]] && CMD="$CMD --milestones \"$MILESTONE\""
# Note: tea may not support assignee filter directly
eval "$CMD"
if [[ -n "$ASSIGNEE" ]]; then
echo "Note: Assignee filtering may require manual review for Gitea" >&2
fi
CMD=(tea issues list --repo "$REPO_INFO" --state "$STATE" --limit "$LIMIT")
[[ -n "$LABEL" ]] && CMD+=(--labels "$LABEL")
[[ -n "$MILESTONE" ]] && CMD+=(--milestones "$MILESTONE")
[[ -n "$ASSIGNEE" ]] && CMD+=(--assignee "$ASSIGNEE")
"${CMD[@]}"
;;
*)
echo "Error: Could not detect git platform" >&2

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# pr-ci-wait.sh - Wait for PR CI status to reach terminal state (GitHub/Gitea)
# Usage: pr-ci-wait.sh -n <pr_number> [-t timeout_sec] [-i interval_sec]
# Usage: pr-ci-wait.sh -n <pr_number> [-r owner/repo] [-t timeout_sec] [-i interval_sec]
set -euo pipefail
@@ -10,6 +10,7 @@ source "$SCRIPT_DIR/detect-platform.sh"
PR_NUMBER=""
TIMEOUT_SEC=1800
INTERVAL_SEC=15
REPO_OVERRIDE=""
usage() {
cat <<EOF
@@ -17,12 +18,14 @@ Usage: $(basename "$0") -n <pr_number> [-t timeout_sec] [-i interval_sec]
Options:
-n, --number NUMBER PR number (required)
-r, --repo OWNER/REPO Repository slug (default: infer from git origin)
-t, --timeout SECONDS Max wait time in seconds (default: 1800)
-i, --interval SECONDS Poll interval in seconds (default: 15)
-h, --help Show this help
Examples:
$(basename "$0") -n 643
$(basename "$0") -n 643 --repo ddk/ai-bma
$(basename "$0") -n 643 -t 900 -i 10
EOF
}
@@ -95,7 +98,7 @@ PY
}
github_get_pr_head_sha() {
gh pr view "$PR_NUMBER" --json headRefOid --jq '.headRefOid'
gh pr view "$PR_NUMBER" --repo "$OWNER/$REPO" --json headRefOid --jq '.headRefOid'
}
github_get_commit_status_json() {
@@ -132,6 +135,10 @@ while [[ $# -gt 0 ]]; do
PR_NUMBER="$2"
shift 2
;;
-r|--repo)
REPO_OVERRIDE="$2"
shift 2
;;
-t|--timeout)
TIMEOUT_SEC="$2"
shift 2
@@ -163,10 +170,21 @@ if ! [[ "$TIMEOUT_SEC" =~ ^[0-9]+$ ]] || ! [[ "$INTERVAL_SEC" =~ ^[0-9]+$ ]]; th
exit 1
fi
detect_platform > /dev/null
if [[ -n "$REPO_OVERRIDE" ]]; then
REPO_INFO="$REPO_OVERRIDE"
PLATFORM=$(detect_platform 2>/dev/null || echo gitea)
else
detect_platform > /dev/null
REPO_INFO=$(get_repo_info)
fi
OWNER=$(get_repo_owner)
REPO=$(get_repo_name)
if [[ -z "$REPO_INFO" || "$REPO_INFO" == error:* || "$REPO_INFO" != */* ]]; then
echo "Error: Could not determine repository from git origin. Run from a repo or pass --repo owner/repo." >&2
exit 1
fi
OWNER=${REPO_INFO%%/*}
REPO=${REPO_INFO##*/}
START_TS=$(date +%s)
DEADLINE_TS=$((START_TS + TIMEOUT_SEC))
@@ -182,10 +200,7 @@ if [[ "$PLATFORM" == "github" ]]; then
fi
echo "[pr-ci-wait] Platform=github PR=#${PR_NUMBER} head_sha=${HEAD_SHA}"
elif [[ "$PLATFORM" == "gitea" ]]; then
HOST=$(get_remote_host) || {
echo "Error: Could not determine remote host." >&2
exit 1
}
HOST=$(get_remote_host 2>/dev/null || echo "git.mosaicstack.dev")
TOKEN=$(get_gitea_token "$HOST") || {
echo "Error: Gitea token not found. Set GITEA_TOKEN or configure ~/.git-credentials." >&2
exit 1
@@ -195,7 +210,7 @@ elif [[ "$PLATFORM" == "gitea" ]]; then
echo "Error: Could not resolve head SHA for PR #$PR_NUMBER." >&2
exit 1
fi
echo "[pr-ci-wait] Platform=gitea host=${HOST} PR=#${PR_NUMBER} head_sha=${HEAD_SHA}"
echo "[pr-ci-wait] Platform=gitea host=${HOST} repo=${OWNER}/${REPO} PR=#${PR_NUMBER} head_sha=${HEAD_SHA}"
else
echo "Error: Unsupported platform '${PLATFORM}'." >&2
exit 1

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# pr-diff.sh - Get the diff for a pull request on GitHub or Gitea
# Usage: pr-diff.sh -n <pr_number> [-o <output_file>]
# Usage: pr-diff.sh -n <pr_number> [-r owner/repo] [-o <output_file>]
set -e
@@ -10,6 +10,7 @@ source "$SCRIPT_DIR/detect-platform.sh"
# Parse arguments
PR_NUMBER=""
OUTPUT_FILE=""
REPO_OVERRIDE=""
while [[ $# -gt 0 ]]; do
case $1 in
@@ -21,11 +22,16 @@ while [[ $# -gt 0 ]]; do
OUTPUT_FILE="$2"
shift 2
;;
-r|--repo)
REPO_OVERRIDE="$2"
shift 2
;;
-h|--help)
echo "Usage: pr-diff.sh -n <pr_number> [-o <output_file>]"
echo "Usage: pr-diff.sh -n <pr_number> [-r owner/repo] [-o <output_file>]"
echo ""
echo "Options:"
echo " -n, --number PR number (required)"
echo " -r, --repo Repository slug (default: infer from git origin)"
echo " -o, --output Output file (optional, prints to stdout if omitted)"
echo " -h, --help Show this help"
exit 0
@@ -42,31 +48,30 @@ if [[ -z "$PR_NUMBER" ]]; then
exit 1
fi
detect_platform > /dev/null
if [[ -n "$REPO_OVERRIDE" ]]; then
REPO_INFO="$REPO_OVERRIDE"
PLATFORM=$(detect_platform 2>/dev/null || echo gitea)
else
detect_platform > /dev/null
REPO_INFO=$(get_repo_info)
fi
if [[ -z "$REPO_INFO" || "$REPO_INFO" == error:* ]]; then
echo "Error: Could not determine repository from git origin. Run from a repo or pass --repo." >&2
exit 1
fi
if [[ "$PLATFORM" == "github" ]]; then
if [[ -n "$OUTPUT_FILE" ]]; then
gh pr diff "$PR_NUMBER" > "$OUTPUT_FILE"
gh pr diff "$PR_NUMBER" --repo "$REPO_INFO" > "$OUTPUT_FILE"
else
gh pr diff "$PR_NUMBER"
gh pr diff "$PR_NUMBER" --repo "$REPO_INFO"
fi
elif [[ "$PLATFORM" == "gitea" ]]; then
# tea doesn't have a direct diff command — use the API
OWNER=$(get_repo_owner)
REPO=$(get_repo_name)
REMOTE_URL=$(git remote get-url origin 2>/dev/null)
HOST=$(get_remote_host 2>/dev/null || echo "git.mosaicstack.dev")
# Extract host from remote URL
if [[ "$REMOTE_URL" == https://* ]]; then
HOST=$(echo "$REMOTE_URL" | sed -E 's|https://([^/]+)/.*|\1|')
elif [[ "$REMOTE_URL" == git@* ]]; then
HOST=$(echo "$REMOTE_URL" | sed -E 's|git@([^:]+):.*|\1|')
else
echo "Error: Cannot determine host from remote URL" >&2
exit 1
fi
DIFF_URL="https://${HOST}/api/v1/repos/${OWNER}/${REPO}/pulls/${PR_NUMBER}.diff"
DIFF_URL="https://${HOST}/api/v1/repos/${REPO_INFO}/pulls/${PR_NUMBER}.diff"
GITEA_API_TOKEN=$(get_gitea_token "$HOST" || true)

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# pr-list.sh - List pull requests on Gitea or GitHub
# Usage: pr-list.sh [-s state] [-l label] [-a author]
# Usage: pr-list.sh [-r owner/repo] [-s state] [-l label] [-a author]
set -e
@@ -12,6 +12,7 @@ STATE="open"
LABEL=""
AUTHOR=""
LIMIT=100
REPO_OVERRIDE=""
usage() {
cat <<EOF
@@ -24,12 +25,14 @@ Options:
-l, --label LABEL Filter by label
-a, --author USER Filter by author
-n, --limit N Maximum PRs to show (default: 100)
-r, --repo OWNER/REPO Repository slug (default: infer from git origin)
-h, --help Show this help message
Examples:
$(basename "$0") # List open PRs
$(basename "$0") -s all # All PRs
$(basename "$0") -s merged -a username # Merged PRs by user
$(basename "$0") --repo ddk/ai-bma # List PRs from anywhere
EOF
exit 1
}
@@ -53,6 +56,10 @@ while [[ $# -gt 0 ]]; do
LIMIT="$2"
shift 2
;;
-r|--repo)
REPO_OVERRIDE="$2"
shift 2
;;
-h|--help)
usage
;;
@@ -63,18 +70,30 @@ while [[ $# -gt 0 ]]; do
esac
done
PLATFORM=$(detect_platform)
if [[ -n "$REPO_OVERRIDE" ]]; then
REPO_INFO="$REPO_OVERRIDE"
# Explicit --repo is primarily for Gitea wrappers; if a git origin is present,
# still honor GitHub detection for cross-platform behavior.
PLATFORM=$(detect_platform 2>/dev/null || echo gitea)
else
PLATFORM=$(detect_platform)
REPO_INFO=$(get_repo_info)
fi
if [[ -z "$REPO_INFO" || "$REPO_INFO" == error:* ]]; then
echo "Error: Could not determine repository from git origin. Run from a repo or pass --repo." >&2
exit 1
fi
case "$PLATFORM" in
github)
CMD="gh pr list --state $STATE --limit $LIMIT"
[[ -n "$LABEL" ]] && CMD="$CMD --label \"$LABEL\""
[[ -n "$AUTHOR" ]] && CMD="$CMD --author \"$AUTHOR\""
eval "$CMD"
CMD=(gh pr list --repo "$REPO_INFO" --state "$STATE" --limit "$LIMIT")
[[ -n "$LABEL" ]] && CMD+=(--label "$LABEL")
[[ -n "$AUTHOR" ]] && CMD+=(--author "$AUTHOR")
"${CMD[@]}"
;;
gitea)
# tea pr list - note: tea uses 'pulls' subcommand in some versions
CMD="tea pr list --state $STATE --limit $LIMIT"
CMD=(tea pr list --repo "$REPO_INFO" --state "$STATE" --limit "$LIMIT")
# tea filtering may be limited
if [[ -n "$LABEL" ]]; then
@@ -84,7 +103,7 @@ case "$PLATFORM" in
echo "Note: Author filtering may require manual review for Gitea" >&2
fi
eval "$CMD"
"${CMD[@]}"
;;
*)
echo "Error: Could not detect git platform" >&2

View File

@@ -1,6 +1,6 @@
#!/bin/bash
# pr-view.sh - View pull request details on GitHub or Gitea
# Usage: pr-view.sh -n <pr_number>
# Usage: pr-view.sh -n <pr_number> [-r owner/repo]
set -e
@@ -9,6 +9,7 @@ source "$SCRIPT_DIR/detect-platform.sh"
# Parse arguments
PR_NUMBER=""
REPO_OVERRIDE=""
while [[ $# -gt 0 ]]; do
case $1 in
@@ -16,11 +17,16 @@ while [[ $# -gt 0 ]]; do
PR_NUMBER="$2"
shift 2
;;
-r|--repo)
REPO_OVERRIDE="$2"
shift 2
;;
-h|--help)
echo "Usage: pr-view.sh -n <pr_number>"
echo "Usage: pr-view.sh -n <pr_number> [-r owner/repo]"
echo ""
echo "Options:"
echo " -n, --number PR number (required)"
echo " -r, --repo Repository slug (default: infer from git origin)"
echo " -h, --help Show this help"
exit 0
;;
@@ -36,12 +42,23 @@ if [[ -z "$PR_NUMBER" ]]; then
exit 1
fi
detect_platform
if [[ -n "$REPO_OVERRIDE" ]]; then
REPO_INFO="$REPO_OVERRIDE"
PLATFORM=$(detect_platform 2>/dev/null || echo gitea)
else
detect_platform > /dev/null
REPO_INFO=$(get_repo_info)
fi
if [[ -z "$REPO_INFO" || "$REPO_INFO" == error:* ]]; then
echo "Error: Could not determine repository from git origin. Run from a repo or pass --repo." >&2
exit 1
fi
if [[ "$PLATFORM" == "github" ]]; then
gh pr view "$PR_NUMBER"
gh pr view "$PR_NUMBER" --repo "$REPO_INFO"
elif [[ "$PLATFORM" == "gitea" ]]; then
tea pr "$PR_NUMBER"
tea pr "$PR_NUMBER" --repo "$REPO_INFO"
else
echo "Error: Unknown platform"
exit 1

View File

@@ -50,7 +50,7 @@ REPO_ID=$(wp_resolve_repo_id "$REPO") || exit 1
response=$(curl -sk -w "\n%{http_code}" \
-H "Authorization: Bearer $WOODPECKER_TOKEN" \
"${WOODPECKER_URL}/api/repos/${REPO_ID}/pipelines?per_page=${LIMIT}")
"${WOODPECKER_URL}/api/repos/${REPO_ID}/pipelines?perPage=${LIMIT}")
http_code=$(echo "$response" | tail -n1)
body=$(echo "$response" | sed '$d')

View File

@@ -64,7 +64,7 @@ _wp_fetch() {
if [[ -z "$NUMBER" ]]; then
# Get latest pipeline number from list, then fetch full detail
list_body=$(_wp_fetch "${WOODPECKER_URL}/api/repos/${REPO_ID}/pipelines?per_page=1") || exit 1
list_body=$(_wp_fetch "${WOODPECKER_URL}/api/repos/${REPO_ID}/pipelines?perPage=1") || exit 1
NUMBER=$(echo "$list_body" | jq -r '.[0].number // empty')
if [[ -z "$NUMBER" ]]; then
echo "Error: No pipelines found" >&2

View File

@@ -16,8 +16,15 @@ import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { users, teams, teamMembers, conversations, messages } from '@mosaicstack/db';
import { createPgliteDbWithVector, runPgliteMigrations } from './test-utils/pglite-with-vector.js';
import {
users,
teams,
teamMembers,
conversations,
messages,
createPgliteDb,
runPgliteMigrations,
} from '@mosaicstack/db';
import postgres from 'postgres';
import { afterAll, describe, expect, it } from 'vitest';
@@ -102,11 +109,8 @@ describe.skipIf(!run)('migrate-tier — PGlite → federated PG', () => {
/* ---- 1. Create a temp PGlite db ---------------------------------- */
pgliteDataDir = await fs.mkdtemp(path.join(os.tmpdir(), 'fed-m1-08-'));
const handle = createPgliteDbWithVector(pgliteDataDir);
// Run Drizzle migrations against PGlite.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await runPgliteMigrations(handle.db as any);
const handle = createPgliteDb(pgliteDataDir);
await runPgliteMigrations(handle);
/* ---- 2. Seed representative data --------------------------------- */

View File

@@ -1,52 +0,0 @@
/**
* Test-only helpers for creating a PGlite database with the pgvector extension
* and running Drizzle migrations against it.
*
* These are intentionally NOT exported from @mosaicstack/db to avoid pulling
* the WASM vector bundle into the public API surface.
*/
import { createRequire } from 'node:module';
import { dirname, resolve } from 'node:path';
import { PGlite } from '@electric-sql/pglite';
import { vector } from '@electric-sql/pglite/vector';
import { drizzle } from 'drizzle-orm/pglite';
import { migrate as migratePglite } from 'drizzle-orm/pglite/migrator';
import type { PgliteDatabase } from 'drizzle-orm/pglite';
import * as schema from '@mosaicstack/db';
import type { DbHandle } from '@mosaicstack/db';
/**
* Create a PGlite DB handle with the pgvector extension loaded.
* Required for running Drizzle migrations that include `CREATE EXTENSION vector`.
*/
export function createPgliteDbWithVector(dataDir: string): DbHandle {
const client = new PGlite(dataDir, { extensions: { vector } });
const db = drizzle(client, { schema });
return {
db: db as unknown as DbHandle['db'],
close: async () => {
await client.close();
},
};
}
/**
* Run Drizzle migrations against an already-open PGlite database handle.
* Resolves the migrations folder from @mosaicstack/db's installed location.
*
* @param db A PgliteDatabase instance (from drizzle-orm/pglite).
*/
export async function runPgliteMigrations(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
db: PgliteDatabase<any>,
): Promise<void> {
// Resolve @mosaicstack/db package root to locate its drizzle migrations folder.
const _require = createRequire(import.meta.url);
const dbPkgMain = _require.resolve('@mosaicstack/db');
// dbPkgMain → …/packages/db/dist/index.js → dirname = dist/
// go up one level from dist/ to find the sibling drizzle/ folder
const migrationsFolder = resolve(dirname(dbPkgMain), '../drizzle');
await migratePglite(db, { migrationsFolder });
}

64
pnpm-lock.yaml generated
View File

@@ -179,6 +179,9 @@ importers:
socket.io:
specifier: ^4.8.0
version: 4.8.3
undici:
specifier: ^7.24.6
version: 7.24.6
uuid:
specifier: ^11.0.0
version: 11.1.0
@@ -713,10 +716,10 @@ importers:
dependencies:
'@mariozechner/pi-agent-core':
specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
'@mariozechner/pi-ai':
specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
'@sinclair/typebox':
specifier: ^0.34.41
version: 0.34.48
@@ -6993,10 +6996,6 @@ packages:
resolution: {integrity: sha512-gBLkYIlEnSp8pFbT64yFgGE6UIB9tAkhukC23PmMDCe5Nd+cRqKxSjw5y54MK2AZMgZfJWMaNE4nYUHgi1XEOw==}
engines: {node: '>=18.17'}
undici@7.24.3:
resolution: {integrity: sha512-eJdUmK/Wrx2d+mnWWmwwLRyA7OQCkLap60sk3dOK4ViZR7DKwwptwuIvFBg2HaiP9ESaEdhtpSymQPvytpmkCA==}
engines: {node: '>=20.18.1'}
undici@7.24.6:
resolution: {integrity: sha512-Xi4agocCbRzt0yYMZGMA6ApD7gvtUFaxm4ZmeacWI4cZxaF6C+8I8QfofC20NAePiB/IcvZmzkJ7XPa471AEtA==}
engines: {node: '>=20.18.1'}
@@ -7329,12 +7328,6 @@ snapshots:
'@jridgewell/gen-mapping': 0.3.13
'@jridgewell/trace-mapping': 0.3.31
'@anthropic-ai/sdk@0.73.0(zod@3.25.76)':
dependencies:
json-schema-to-ts: 3.1.1
optionalDependencies:
zod: 3.25.76
'@anthropic-ai/sdk@0.73.0(zod@4.3.6)':
dependencies:
json-schema-to-ts: 3.1.1
@@ -8676,18 +8669,6 @@ snapshots:
- ws
- zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
@@ -8736,30 +8717,6 @@ snapshots:
- ws
- zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@3.25.76)
'@aws-sdk/client-bedrock-runtime': 3.1008.0
'@google/genai': 1.45.0(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))
'@mistralai/mistralai': 1.14.1
'@sinclair/typebox': 0.34.48
ajv: 8.18.0
ajv-formats: 3.0.1(ajv@8.18.0)
chalk: 5.6.2
openai: 6.26.0(ws@8.20.0)(zod@3.25.76)
partial-json: 0.1.7
proxy-agent: 6.5.0
undici: 7.24.3
zod-to-json-schema: 3.25.1(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@4.3.6)
@@ -8773,7 +8730,7 @@ snapshots:
openai: 6.26.0(ws@8.20.0)(zod@4.3.6)
partial-json: 0.1.7
proxy-agent: 6.5.0
undici: 7.24.3
undici: 7.24.6
zod-to-json-schema: 3.25.1(zod@4.3.6)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
@@ -12632,7 +12589,7 @@ snapshots:
saxes: 6.0.0
symbol-tree: 3.2.4
tough-cookie: 6.0.1
undici: 7.24.3
undici: 7.24.6
w3c-xmlserializer: 5.0.0
webidl-conversions: 8.0.1
whatwg-mimetype: 5.0.0
@@ -13352,11 +13309,6 @@ snapshots:
dependencies:
mimic-function: 5.0.1
openai@6.26.0(ws@8.20.0)(zod@3.25.76):
optionalDependencies:
ws: 8.20.0
zod: 3.25.76
openai@6.26.0(ws@8.20.0)(zod@4.3.6):
optionalDependencies:
ws: 8.20.0
@@ -14488,8 +14440,6 @@ snapshots:
undici@6.21.3: {}
undici@7.24.3: {}
undici@7.24.6: {}
unhomoglyph@1.0.6: {}

View File

@@ -0,0 +1,125 @@
# fix(db): bootstrap migrations on local-tier gateway startup
## Problem
Fresh `mosaic gateway install` (npm-installed) leaves the gateway DB schema empty:
```
relation "users" does not exist
```
Sign-in 500s, `auth users create` says "Not signed in", `admin/bootstrap setup`
also fails — every entry point queries `users` before doing anything else.
## Scope
This PR fixes the **local (PGlite) tier** end-to-end. The postgres-tier path
has additional pre-existing bugs (see "Known issues, out of scope" below) and
needs a separate change with real Postgres validation.
## Root causes addressed (5 stacked bugs on the local-tier path)
1. **`packages/db/package.json` `files: ["dist"]`** — the `drizzle/` SQL
migrations folder is excluded from the published tarball. Even if a
migrate runner existed, it would have nothing to apply.
2. **`packages/db/src/migrate.ts`** only supports `postgres-js`. Local-tier
gateways use embedded PGlite, which can't be reached over a postgres wire
protocol — so `runMigrations()` is unusable for the local tier.
3. **`apps/gateway/src/database/database.module.ts`** never invokes
migrations at startup. The module creates the DB handle and storage
adapter, but no consumer calls `.migrate()` on either. `mosaic storage
migrate` CLI even claims "pglite runs schema setup automatically on first
connection via `adapter.migrate()`" — but `adapter.migrate()` is only
called by tests, never at runtime.
4. **`createPgliteDb` does not load the pgvector extension.** Migration 0001
declares `CREATE EXTENSION IF NOT EXISTS vector;` for the
`insights.embedding` column. Bare PGlite has no pgvector — the migration
fails on extension control file lookup.
5. **Drizzle's PG migrator wraps every migration in one outer transaction.**
Migration 0009 does `ALTER TYPE grant_status ADD VALUE 'pending'` and then
`ALTER TABLE federation_grants ALTER COLUMN status SET DEFAULT 'pending'`.
Postgres' `check_safe_enum_use` rejects the second statement because the
new enum value isn't committed yet. Splitting the migration into two
files doesn't help — drizzle batches all migrations into one outer tx.
## Fix
- `packages/db/package.json` — ship `drizzle/` in `files`.
- `packages/db/src/client-pglite.ts` — load `@electric-sql/pglite/vector`.
- `packages/db/src/migrate.ts` — add `runPgliteMigrations(handle)`. Walks the
Drizzle journal and runs each statement-breakpoint chunk through PGlite's
`client.exec()` (Simple Query protocol → autocommit per statement). Writes
to the standard `drizzle.__drizzle_migrations` ledger so the result is
interoperable with `runMigrations()` on a postgres-backed deployment.
Per-statement try/catch surfaces which statement of which migration failed
and the ledger row is only written on full success.
- `packages/db/src/index.ts` — re-export.
- `apps/gateway/src/database/database.module.ts` — implement `OnModuleInit`:
- Local tier → `runPgliteMigrations(handle)`, then `storageAdapter.migrate()`
(the local storage adapter has its own kv tables in a separate PGlite dir).
- Postgres tier → `storageAdapter.migrate()` only, since
`PostgresAdapter.migrate()` already calls `runMigrations(url)` against
the same DATABASE_URL — we deliberately don't double-call.
NestJS awaits `onModuleInit` before `app.listen()`, so DB-dependent modules
see a populated schema before any HTTP traffic is accepted.
- `packages/storage/src/test-utils/pglite-with-vector.ts`**deleted**.
The "intentionally not exported" rationale is moot now that migration 0001
forces pgvector load anyway. `migrate-tier.integration.test.ts` switched
to `createPgliteDb` + `runPgliteMigrations` from `@mosaicstack/db`.
## Tests
`packages/db/src/migrate.test.ts`:
- Verifies `runPgliteMigrations` creates the BetterAuth tables (the original
failure mode).
- Idempotence (transitively re-runs migration 0009).
- Partial-failure: pre-creates a conflicting `users` table, asserts the
thrown error includes statement context (`hash=… statement #N failed`)
and that no ledger row was written.
## QA evidence
End-to-end on a fresh PGlite install:
- `[DatabaseModule] Applying PGlite schema migrations...` then
`Initializing storage adapter (pglite)...` in startup log.
- `GET /api/bootstrap/status``{"needsSetup":true}` HTTP 200 (was 500
with `relation "users" does not exist`).
- `POST /api/bootstrap/setup` with empty body → HTTP 400 with Zod
validation error (was 500), confirming the request reached the
validator past the table-existence check.
## Known issues, out of scope (file separately)
- **Postgres-tier first install is still broken.** `runMigrations()` uses
Drizzle's `migratePostgres`, which has the same outer-transaction problem
as PGlite's migrator. A fresh standalone-tier install would also fail at
migration 0009. Inline TODO in `migrate.ts:31-35` flags this. Fixing it
needs either (a) a shared per-statement loop reused for both drivers, or
(b) splitting migration 0009.
- **`drizzle/meta/_journal.json` has 0009 ordered before 0008** (`when`
values `1745280000000` < `1776822435828`). `migratePostgres` skips by
`created_at < folderMillis`, so on a postgres deployment that already
applied 0008, 0009 would be skipped forever. Our hash-based skip in the
PGlite path sidesteps this.
- **No advisory lock around the migration loop.** Two gateway processes
pointed at the same DATABASE_URL would race. PGlite is single-process by
file lock so the local tier is fine; postgres-tier deployments should add
`pg_advisory_lock(<deterministic-id>)` around the loop in a follow-up.
- **`mosaic storage migrate` CLI message is misleading** — it claims
"automatic on first connection via adapter.migrate()" but the adapter
doesn't self-migrate. With this PR the gateway invokes it explicitly, but
the CLI message could still be tightened.
- **Crash mid-migration leaves a partial-state PGlite DB without a ledger
row.** Detected loudly on next boot (the replay errors on "already
exists"), but recovery is manual (drop the partially-applied objects or
insert the migration hash into `drizzle.__drizzle_migrations`). A robust
fix would add a "started_at" column to a sidecar table to detect
half-applied state and refuse to start with actionable guidance.