From ccad30dd27e029bafffc512dc2ba41e8db08a1ff Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Mon, 20 Apr 2026 00:35:08 +0000 Subject: [PATCH] feat(storage): mosaic storage migrate-tier with dry-run + idempotency (FED-M1-05) (#474) --- docs/federation/TASKS.md | 28 +- eslint.config.mjs | 1 + packages/storage/package.json | 3 +- packages/storage/src/cli.ts | 198 ++++++ packages/storage/src/index.ts | 19 + packages/storage/src/migrate-tier.spec.ts | 495 +++++++++++++++ packages/storage/src/migrate-tier.ts | 697 ++++++++++++++++++++++ packages/storage/vitest.config.ts | 8 + pnpm-lock.yaml | 54 +- 9 files changed, 1486 insertions(+), 17 deletions(-) create mode 100644 packages/storage/src/migrate-tier.spec.ts create mode 100644 packages/storage/src/migrate-tier.ts create mode 100644 packages/storage/vitest.config.ts diff --git a/docs/federation/TASKS.md b/docs/federation/TASKS.md index 971433d..b6c8d3b 100644 --- a/docs/federation/TASKS.md +++ b/docs/federation/TASKS.md @@ -15,20 +15,20 @@ Goal: Gateway runs in `federated` tier with containerized PG+pgvector+Valkey. No federation logic yet. Existing standalone behavior does not regress. -| id | status | description | issue | agent | branch | depends_on | estimate | notes | -| --------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----- | ------ | ------------------------------- | ---------- | -------- | -------------------------------------------------------------------------------------------------------------------------------- | -| FED-M1-01 | done | Extend `mosaic.config.json` schema: add `"federated"` to `tier` enum in validator + TS types. Keep `local` and `standalone` working. Update schema docs/README where referenced. | #460 | sonnet | feat/federation-m1-tier-config | — | 4K | Shipped in PR #470. Renamed `team` → `standalone`; added `team` deprecation alias; added `DEFAULT_FEDERATED_CONFIG`. | -| FED-M1-02 | done | Author `docker-compose.federated.yml` as an overlay profile: Postgres 17 + pgvector extension (port 5433), Valkey (6380), named volumes, healthchecks. Compose-up should boot cleanly on a clean machine. | #460 | sonnet | feat/federation-m1-compose | FED-M1-01 | 5K | Shipped in PR #471. Overlay defines `postgres-federated`/`valkey-federated`, profile-gated, with pg-init for pgvector extension. | -| FED-M1-03 | done | Add pgvector support to `packages/storage/src/adapters/postgres.ts`: create extension on init (idempotent), expose vector column type in schema helpers. No adapter changes for non-federated tiers. | #460 | sonnet | feat/federation-m1-pgvector | FED-M1-02 | 8K | Shipped in PR #472. `enableVector` flag on postgres StorageConfig; idempotent CREATE EXTENSION before migrations. | -| FED-M1-04 | in-progress | Implement `apps/gateway/src/bootstrap/tier-detector.ts`: reads config, asserts PG/Valkey/pgvector reachable for `federated`, fail-fast with actionable error message on failure. Unit tests for each failure mode. | #460 | sonnet | feat/federation-m1-detector | FED-M1-03 | 8K | Worker delivered; reviewer flagged 3 issues (Valkey timeout, pgvector error discrimination, federated/non-bullmq guard) — fixed. | -| FED-M1-05 | not-started | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | codex | feat/federation-m1-migrate | FED-M1-04 | 10K | Do NOT run automatically. CLI subcommand `mosaic migrate tier --to federated --dry-run`. Safety rails. | -| FED-M1-06 | not-started | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. | -| FED-M1-07 | not-started | Integration test: gateway boots in `federated` tier with docker-compose `federated` profile; refuses to boot when PG unreachable (asserts fail-fast); pgvector extension query succeeds. | #460 | sonnet | feat/federation-m1-integration | FED-M1-04 | 8K | Vitest + docker-compose test profile. One test file per assertion; real services, no mocks. | -| FED-M1-08 | not-started | Integration test for migration script: seed a local PGlite with representative data (tasks, notes, users, teams), run migration, assert row counts + key samples equal on federated PG. | #460 | sonnet | feat/federation-m1-migrate-test | FED-M1-05 | 6K | Runs against docker-compose federated profile; uses temp PGlite file; deterministic seed. | -| FED-M1-09 | not-started | Standalone regression: full agent-session E2E on existing `standalone` tier with a gateway built from this branch. Must pass without referencing any federation module. | #460 | haiku | feat/federation-m1-regression | FED-M1-07 | 4K | Reuse existing e2e harness; just re-point at the federation branch build. Canary that we didn't break it. | -| FED-M1-10 | not-started | Code review pass: security-focused on the migration script (data-at-rest during migration) + tier detector (error-message sensitivity leakage). Independent reviewer, not authors of tasks 01-09. | #460 | sonnet | — | FED-M1-09 | 8K | Use `feature-dev:code-reviewer` agent. Specifically: no secrets in error messages; no partial-migration footguns. | -| FED-M1-11 | not-started | Docs update: `docs/federation/` operator notes for tier setup; README blurb on federated tier; `docs/guides/` entry for migration. Do NOT touch runbook yet (deferred to FED-M7). | #460 | haiku | feat/federation-m1-docs | FED-M1-10 | 4K | Short, actionable. Link from MISSION-MANIFEST. No decisions captured here — those belong in PRD. | -| FED-M1-12 | not-started | PR, CI green, merge to main, close #460. | #460 | — | (aggregate) | FED-M1-11 | 3K | Queue-guard before push; wait for green; merge squashed; tea `issue-close` #460. | +| id | status | description | issue | agent | branch | depends_on | estimate | notes | +| --------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----- | ------ | ------------------------------- | ---------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------- | +| FED-M1-01 | done | Extend `mosaic.config.json` schema: add `"federated"` to `tier` enum in validator + TS types. Keep `local` and `standalone` working. Update schema docs/README where referenced. | #460 | sonnet | feat/federation-m1-tier-config | — | 4K | Shipped in PR #470. Renamed `team` → `standalone`; added `team` deprecation alias; added `DEFAULT_FEDERATED_CONFIG`. | +| FED-M1-02 | done | Author `docker-compose.federated.yml` as an overlay profile: Postgres 17 + pgvector extension (port 5433), Valkey (6380), named volumes, healthchecks. Compose-up should boot cleanly on a clean machine. | #460 | sonnet | feat/federation-m1-compose | FED-M1-01 | 5K | Shipped in PR #471. Overlay defines `postgres-federated`/`valkey-federated`, profile-gated, with pg-init for pgvector extension. | +| FED-M1-03 | done | Add pgvector support to `packages/storage/src/adapters/postgres.ts`: create extension on init (idempotent), expose vector column type in schema helpers. No adapter changes for non-federated tiers. | #460 | sonnet | feat/federation-m1-pgvector | FED-M1-02 | 8K | Shipped in PR #472. `enableVector` flag on postgres StorageConfig; idempotent CREATE EXTENSION before migrations. | +| FED-M1-04 | done | Implement `apps/gateway/src/bootstrap/tier-detector.ts`: reads config, asserts PG/Valkey/pgvector reachable for `federated`, fail-fast with actionable error message on failure. Unit tests for each failure mode. | #460 | sonnet | feat/federation-m1-detector | FED-M1-03 | 8K | Shipped in PR #473. 12 tests; 5s timeouts on probes; pgvector library/permission discrimination; rejects non-bullmq for federated. | +| FED-M1-05 | in-progress | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | sonnet | feat/federation-m1-migrate | FED-M1-04 | 10K | Do NOT run automatically. CLI subcommand `mosaic migrate tier --to federated --dry-run`. Safety rails. | +| FED-M1-06 | not-started | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. | +| FED-M1-07 | not-started | Integration test: gateway boots in `federated` tier with docker-compose `federated` profile; refuses to boot when PG unreachable (asserts fail-fast); pgvector extension query succeeds. | #460 | sonnet | feat/federation-m1-integration | FED-M1-04 | 8K | Vitest + docker-compose test profile. One test file per assertion; real services, no mocks. | +| FED-M1-08 | not-started | Integration test for migration script: seed a local PGlite with representative data (tasks, notes, users, teams), run migration, assert row counts + key samples equal on federated PG. | #460 | sonnet | feat/federation-m1-migrate-test | FED-M1-05 | 6K | Runs against docker-compose federated profile; uses temp PGlite file; deterministic seed. | +| FED-M1-09 | not-started | Standalone regression: full agent-session E2E on existing `standalone` tier with a gateway built from this branch. Must pass without referencing any federation module. | #460 | haiku | feat/federation-m1-regression | FED-M1-07 | 4K | Reuse existing e2e harness; just re-point at the federation branch build. Canary that we didn't break it. | +| FED-M1-10 | not-started | Code review pass: security-focused on the migration script (data-at-rest during migration) + tier detector (error-message sensitivity leakage). Independent reviewer, not authors of tasks 01-09. | #460 | sonnet | — | FED-M1-09 | 8K | Use `feature-dev:code-reviewer` agent. Specifically: no secrets in error messages; no partial-migration footguns. | +| FED-M1-11 | not-started | Docs update: `docs/federation/` operator notes for tier setup; README blurb on federated tier; `docs/guides/` entry for migration. Do NOT touch runbook yet (deferred to FED-M7). | #460 | haiku | feat/federation-m1-docs | FED-M1-10 | 4K | Short, actionable. Link from MISSION-MANIFEST. No decisions captured here — those belong in PRD. | +| FED-M1-12 | not-started | PR, CI green, merge to main, close #460. | #460 | — | (aggregate) | FED-M1-11 | 3K | Queue-guard before push; wait for green; merge squashed; tea `issue-close` #460. | **M1 total estimate:** ~74K tokens (over-budget vs 20K PRD estimate — explanation below) diff --git a/eslint.config.mjs b/eslint.config.mjs index 9e610ad..e7e631a 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -28,6 +28,7 @@ export default tseslint.config( 'apps/web/e2e/helpers/*.ts', 'apps/web/playwright.config.ts', 'apps/gateway/vitest.config.ts', + 'packages/storage/vitest.config.ts', 'packages/mosaic/__tests__/*.ts', ], }, diff --git a/packages/storage/package.json b/packages/storage/package.json index e7235f1..f144a8a 100644 --- a/packages/storage/package.json +++ b/packages/storage/package.json @@ -24,7 +24,8 @@ "@electric-sql/pglite": "^0.2.17", "@mosaicstack/db": "workspace:^", "@mosaicstack/types": "workspace:*", - "commander": "^13.0.0" + "commander": "^13.0.0", + "postgres": "^3.4.8" }, "devDependencies": { "typescript": "^5.8.0", diff --git a/packages/storage/src/cli.ts b/packages/storage/src/cli.ts index e6fc1cf..3ff5c8e 100644 --- a/packages/storage/src/cli.ts +++ b/packages/storage/src/cli.ts @@ -1,4 +1,5 @@ import type { Command } from 'commander'; +import type { MigrationSource } from './migrate-tier.js'; /** * Reads the DATABASE_URL environment variable and redacts the password portion. @@ -209,6 +210,203 @@ export function registerStorageCommand(parent: Command): void { } }); + // ── storage migrate-tier ───────────────────────────────────────────────── + + storage + .command('migrate-tier') + .description('Migrate data from tier: local/standalone → tier: federated (Postgres + pgvector)') + .requiredOption( + '--to ', + 'Target tier to migrate to (only "federated" is supported)', + 'federated', + ) + .requiredOption('--target-url ', 'Target federated Postgres connection string (required)') + .option( + '--source-config ', + 'Path to mosaic.config.json (default: cwd/mosaic.config.json)', + ) + .option('--dry-run', 'Print what would be migrated without writing anything') + .option('--yes', 'Skip interactive confirmation prompt (required for non-TTY environments)') + .option('--batch-size ', 'Rows per transaction batch', '1000') + .option('--allow-non-empty', 'Allow writing to a non-empty target (upsert — idempotent)') + .action( + async (opts: { + to: string; + targetUrl: string; + sourceConfig?: string; + dryRun?: boolean; + yes?: boolean; + batchSize?: string; + allowNonEmpty?: boolean; + }) => { + if (opts.to !== 'federated') { + console.error( + `[migrate-tier] --to "${opts.to}" is not supported. Only "federated" is allowed.`, + ); + process.exitCode = 1; + return; + } + + const batchSize = parseInt(opts.batchSize ?? '1000', 10); + if (isNaN(batchSize) || batchSize < 1) { + console.error('[migrate-tier] --batch-size must be a positive integer.'); + process.exitCode = 1; + return; + } + + // Redact target URL password for display. + function redactUrl(url: string): string { + try { + const parsed = new URL(url); + if (parsed.password) parsed.password = '***'; + return parsed.toString(); + } catch { + return url.replace(/:([^@/]+)@/, ':***@'); + } + } + + const redactedTarget = redactUrl(opts.targetUrl); + const isDryRun = opts.dryRun ?? false; + const allowNonEmpty = opts.allowNonEmpty ?? false; + + // Determine source tier from environment. + const sourceTier = activeTier(); + const sourceDesc = configSource(); + + console.log(''); + console.log('[migrate-tier] ─────────────────────────────────────────'); + console.log(`[migrate-tier] Source tier: ${sourceTier}`); + console.log(`[migrate-tier] Source: ${sourceDesc}`); + console.log(`[migrate-tier] Target tier: federated (Postgres + pgvector)`); + console.log(`[migrate-tier] Target: ${redactedTarget}`); + console.log(`[migrate-tier] Batch size: ${batchSize.toString()}`); + console.log(`[migrate-tier] Dry run: ${isDryRun.toString()}`); + console.log(`[migrate-tier] Allow non-empty: ${allowNonEmpty.toString()}`); + console.log('[migrate-tier] ─────────────────────────────────────────'); + console.log(''); + + // Lazy-import core migration logic to keep the CLI thin. + const { + runMigrateTier, + PostgresMigrationTarget, + DrizzleMigrationSource, + getMigrationOrder, + } = await import('./migrate-tier.js'); + + // Build source adapter using Drizzle-backed DrizzleMigrationSource. + // Both local (PGlite) and standalone (Postgres) sources expose the same + // normalized Drizzle schema — this is where the actual domain data lives. + let sourceAdapter: MigrationSource; + if (sourceTier === 'pglite') { + const { createPgliteDb } = await import('@mosaicstack/db'); + const pgliteDataDir = process.env['PGLITE_DATA_DIR']; + if (!pgliteDataDir) { + console.error( + '[migrate-tier] PGLITE_DATA_DIR is not set. ' + + 'Cannot open PGlite source — set it to the data directory path.', + ); + process.exitCode = 1; + return; + } + const handle = createPgliteDb(pgliteDataDir); + // Local/PGlite sources do not have pgvector registered — the embedding + // column is omitted from the insights SELECT and set to null on target. + sourceAdapter = new DrizzleMigrationSource(handle.db, /* sourceHasVector= */ false); + } else { + const { createDb } = await import('@mosaicstack/db'); + const url = process.env['DATABASE_URL']; + if (!url) { + console.error('[migrate-tier] DATABASE_URL is not set for postgres source.'); + process.exitCode = 1; + return; + } + const handle = createDb(url); + // Standalone Postgres may or may not have pgvector — assume it does not + // (it is a non-federated tier) so embedding is treated as null. + sourceAdapter = new DrizzleMigrationSource(handle.db, /* sourceHasVector= */ false); + } + + // Print per-table row counts for the confirmation prompt. + const tablesToMigrate = getMigrationOrder(); + const counts: Array<{ table: string; count: number }> = []; + for (const table of tablesToMigrate) { + const n = await sourceAdapter.count(table); + counts.push({ table, count: n }); + } + + console.log('[migrate-tier] Source row counts:'); + for (const { table, count } of counts) { + console.log(` ${table}: ${count.toString()}`); + } + console.log(' sessions: SKIPPED (ephemeral)'); + console.log(' verifications: SKIPPED (ephemeral)'); + console.log(' admin_tokens: SKIPPED (environment-specific)'); + console.log(''); + + // Interactive confirmation unless --yes or dry-run. + const isTTY = process.stdin.isTTY; + if (!isDryRun) { + if (!opts.yes && !isTTY) { + console.error( + '[migrate-tier] Not running in a TTY and --yes was not passed. ' + + 'Pass --yes to confirm in headless environments.', + ); + process.exitCode = 1; + await sourceAdapter.close(); + return; + } + + if (!opts.yes) { + const { createInterface } = await import('node:readline'); + const rl = createInterface({ input: process.stdin, output: process.stdout }); + const answer = await new Promise((resolve) => { + rl.question(`This will WRITE to ${redactedTarget}. Continue? [y/N] `, (ans) => { + rl.close(); + resolve(ans); + }); + }); + if (answer.trim().toLowerCase() !== 'y') { + console.log('[migrate-tier] Aborted.'); + await sourceAdapter.close(); + return; + } + } + } + + const target = new PostgresMigrationTarget(opts.targetUrl); + + try { + const result = await runMigrateTier( + sourceAdapter, + target, + { + targetUrl: opts.targetUrl, + dryRun: isDryRun, + allowNonEmpty, + batchSize, + onProgress: (msg) => console.log(msg), + }, + /* sourceHasVector= */ sourceTier === 'postgres', + ); + + if (result.dryRun) { + console.log('[migrate-tier] Dry run complete. No data was written.'); + } else { + console.log( + `[migrate-tier] Migration complete. ${result.totalRows.toString()} rows migrated.`, + ); + } + } catch (err) { + console.error( + `[migrate-tier] ERROR: ${err instanceof Error ? err.message : String(err)}`, + ); + process.exitCode = 1; + } finally { + await Promise.all([sourceAdapter.close(), target.close()]); + } + }, + ); + // ── storage migrate ────────────────────────────────────────────────────── storage diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 4d44cea..18e06f7 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -3,6 +3,25 @@ export { createStorageAdapter, registerStorageAdapter } from './factory.js'; export { PostgresAdapter } from './adapters/postgres.js'; export { PgliteAdapter } from './adapters/pglite.js'; export { registerStorageCommand } from './cli.js'; +export { + getMigrationOrder, + topoSort, + runMigrateTier, + checkTargetPreconditions, + normaliseSourceRow, + PostgresMigrationTarget, + DrizzleMigrationSource, + SKIP_TABLES, + MIGRATION_ORDER, + MigrationPreconditionError, +} from './migrate-tier.js'; +export type { + MigrationSource, + MigrationTarget, + MigrateTierOptions, + MigrateTierResult, + TableMigrationResult, +} from './migrate-tier.js'; import { registerStorageAdapter } from './factory.js'; import { PostgresAdapter } from './adapters/postgres.js'; diff --git a/packages/storage/src/migrate-tier.spec.ts b/packages/storage/src/migrate-tier.spec.ts new file mode 100644 index 0000000..d2ce1a0 --- /dev/null +++ b/packages/storage/src/migrate-tier.spec.ts @@ -0,0 +1,495 @@ +/** + * migrate-tier.spec.ts — Unit tests for the migrate-tier core logic. + * + * These are pure unit tests — no real database connections. + * FED-M1-08 will add integration tests against real services. + */ + +import { describe, it, expect, vi } from 'vitest'; +import { + getMigrationOrder, + topoSort, + runMigrateTier, + checkTargetPreconditions, + normaliseSourceRow, + SKIP_TABLES, + MigrationPreconditionError, + type MigrationSource, + type MigrationTarget, +} from './migrate-tier.js'; + +/* ------------------------------------------------------------------ */ +/* Mock factories */ +/* ------------------------------------------------------------------ */ + +/** + * Build a mock MigrationSource backed by an in-memory table map. + * Implements the DrizzleMigrationSource-shaped contract: + * - readTable(tableName, opts?) returns paginated rows + * - count(tableName) returns row count + * + * The `sourceHasVector` flag controls whether the mock simulates the + * no-pgvector projection: when false and tableName is 'insights', rows + * are returned WITHOUT the 'embedding' field (matching DrizzleMigrationSource + * behaviour for local/PGlite sources). + */ +function makeMockSource( + data: Record[]>, + sourceHasVector = true, +): MigrationSource & { + readTableCalls: Array<{ table: string; opts?: { limit?: number; offset?: number } }>; +} { + const readTableCalls: Array<{ table: string; opts?: { limit?: number; offset?: number } }> = []; + return { + readTableCalls, + readTable: vi.fn(async (tableName: string, opts?: { limit?: number; offset?: number }) => { + readTableCalls.push({ table: tableName, opts }); + let rows = data[tableName] ?? []; + // Simulate no-vector projection: omit 'embedding' from insights rows + // when sourceHasVector is false (matches DrizzleMigrationSource behaviour). + if (tableName === 'insights' && !sourceHasVector) { + rows = rows.map(({ embedding: _omit, ...rest }) => rest); + } + const offset = opts?.offset ?? 0; + const limit = opts?.limit ?? rows.length; + return rows.slice(offset, offset + limit); + }), + count: vi.fn(async (tableName: string) => (data[tableName] ?? []).length), + close: vi.fn(async () => undefined), + }; +} + +function makeMockTarget(opts?: { + hasPgvector?: boolean; + nonEmptyTable?: string; +}): MigrationTarget & { upsertCalls: Array<{ table: string; rows: Record[] }> } { + const upsertCalls: Array<{ table: string; rows: Record[] }> = []; + const storedCounts: Record = {}; + + return { + upsertCalls, + upsertBatch: vi.fn(async (table: string, rows: Record[]) => { + upsertCalls.push({ table, rows }); + storedCounts[table] = (storedCounts[table] ?? 0) + rows.length; + }), + count: vi.fn(async (table: string) => { + if (opts?.nonEmptyTable === table) return 5; + return storedCounts[table] ?? 0; + }), + hasPgvector: vi.fn(async () => opts?.hasPgvector ?? true), + close: vi.fn(async () => undefined), + }; +} + +function noopProgress(): (msg: string) => void { + return () => undefined; +} + +/* ------------------------------------------------------------------ */ +/* 1. Topological ordering */ +/* ------------------------------------------------------------------ */ + +describe('topoSort', () => { + it('returns empty array for empty input', () => { + expect(topoSort(new Map())).toEqual([]); + }); + + it('orders parents before children — linear chain', () => { + // users -> teams -> messages + const deps = new Map([ + ['users', []], + ['teams', ['users']], + ['messages', ['teams']], + ]); + const order = topoSort(deps); + expect(order.indexOf('users')).toBeLessThan(order.indexOf('teams')); + expect(order.indexOf('teams')).toBeLessThan(order.indexOf('messages')); + }); + + it('orders parents before children — diamond graph', () => { + // a -> (b, c) -> d + const deps = new Map([ + ['a', []], + ['b', ['a']], + ['c', ['a']], + ['d', ['b', 'c']], + ]); + const order = topoSort(deps); + expect(order.indexOf('a')).toBeLessThan(order.indexOf('b')); + expect(order.indexOf('a')).toBeLessThan(order.indexOf('c')); + expect(order.indexOf('b')).toBeLessThan(order.indexOf('d')); + expect(order.indexOf('c')).toBeLessThan(order.indexOf('d')); + }); + + it('throws on cyclic dependencies', () => { + const deps = new Map([ + ['a', ['b']], + ['b', ['a']], + ]); + expect(() => topoSort(deps)).toThrow('Cycle detected'); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 2. getMigrationOrder — sessions / verifications excluded */ +/* ------------------------------------------------------------------ */ + +describe('getMigrationOrder', () => { + it('does not include "sessions"', () => { + expect(getMigrationOrder()).not.toContain('sessions'); + }); + + it('does not include "verifications"', () => { + expect(getMigrationOrder()).not.toContain('verifications'); + }); + + it('does not include "admin_tokens"', () => { + expect(getMigrationOrder()).not.toContain('admin_tokens'); + }); + + it('includes "users" before "teams"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('users')).toBeLessThan(order.indexOf('teams')); + }); + + it('includes "users" before "conversations"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('users')).toBeLessThan(order.indexOf('conversations')); + }); + + it('includes "conversations" before "messages"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('conversations')).toBeLessThan(order.indexOf('messages')); + }); + + it('includes "projects" before "agents"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('projects')).toBeLessThan(order.indexOf('agents')); + }); + + it('includes "agents" before "conversations"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('agents')).toBeLessThan(order.indexOf('conversations')); + }); + + it('includes "missions" before "mission_tasks"', () => { + const order = getMigrationOrder(); + expect(order.indexOf('missions')).toBeLessThan(order.indexOf('mission_tasks')); + }); + + it('includes all expected tables', () => { + const order = getMigrationOrder(); + const expected = [ + 'users', + 'teams', + 'accounts', + 'projects', + 'agents', + 'conversations', + 'messages', + 'insights', + ]; + for (const t of expected) { + expect(order).toContain(t); + } + }); +}); + +/* ------------------------------------------------------------------ */ +/* 3. Dry-run makes no writes */ +/* ------------------------------------------------------------------ */ + +describe('runMigrateTier — dry-run', () => { + it('makes no calls to upsertBatch', async () => { + const source = makeMockSource({ + users: [{ id: 'u1', name: 'Alice', email: 'alice@example.com' }], + }); + const target = makeMockTarget(); + + const result = await runMigrateTier(source, target, { + targetUrl: 'postgresql://localhost/test', + dryRun: true, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }); + + expect(target.upsertCalls).toHaveLength(0); + expect(result.dryRun).toBe(true); + expect(result.totalRows).toBe(0); + }); + + it('does not call checkTargetPreconditions in dry-run', async () => { + // Even if hasPgvector is false, dry-run should not throw. + const source = makeMockSource({}); + const target = makeMockTarget({ hasPgvector: false }); + + await expect( + runMigrateTier(source, target, { + targetUrl: 'postgresql://localhost/test', + dryRun: true, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }), + ).resolves.not.toThrow(); + + // hasPgvector should NOT have been called during dry run. + expect(target.hasPgvector).not.toHaveBeenCalled(); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 4. Idempotency */ +/* ------------------------------------------------------------------ */ + +describe('runMigrateTier — idempotency', () => { + it('produces the same logical row count on second run (upsert semantics)', async () => { + const userData = [ + { id: 'u1', name: 'Alice', email: 'alice@example.com' }, + { id: 'u2', name: 'Bob', email: 'bob@example.com' }, + ]; + + const source = makeMockSource({ users: userData }); + + // First run target. + const target1 = makeMockTarget(); + await runMigrateTier(source, target1, { + targetUrl: 'postgresql://localhost/test', + dryRun: false, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }); + + const firstRunUpserts = target1.upsertCalls.filter((c) => c.table === 'users'); + const firstRunRows = firstRunUpserts.reduce((acc, c) => acc + c.rows.length, 0); + + // Second run — allowNonEmpty because first run already wrote rows. + const target2 = makeMockTarget(); + await runMigrateTier(source, target2, { + targetUrl: 'postgresql://localhost/test', + dryRun: false, + allowNonEmpty: true, + batchSize: 100, + onProgress: noopProgress(), + }); + + const secondRunUpserts = target2.upsertCalls.filter((c) => c.table === 'users'); + const secondRunRows = secondRunUpserts.reduce((acc, c) => acc + c.rows.length, 0); + + // Both runs write the same number of rows (upsert — second run updates in place). + expect(firstRunRows).toBe(userData.length); + expect(secondRunRows).toBe(userData.length); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 5. Empty-target precondition */ +/* ------------------------------------------------------------------ */ + +describe('checkTargetPreconditions', () => { + it('throws when target table is non-empty and allowNonEmpty is false', async () => { + const target = makeMockTarget({ nonEmptyTable: 'users' }); + + await expect(checkTargetPreconditions(target, false, ['users'])).rejects.toThrow( + MigrationPreconditionError, + ); + }); + + it('includes remediation hint in thrown error', async () => { + const target = makeMockTarget({ nonEmptyTable: 'users' }); + + await expect(checkTargetPreconditions(target, false, ['users'])).rejects.toMatchObject({ + name: 'MigrationPreconditionError', + remediation: expect.stringContaining('--allow-non-empty'), + }); + }); + + it('does NOT throw when allowNonEmpty is true', async () => { + const target = makeMockTarget({ nonEmptyTable: 'users' }); + await expect(checkTargetPreconditions(target, true, ['users'])).resolves.not.toThrow(); + }); + + it('throws when pgvector extension is missing', async () => { + const target = makeMockTarget({ hasPgvector: false }); + + await expect(checkTargetPreconditions(target, false, ['users'])).rejects.toMatchObject({ + name: 'MigrationPreconditionError', + remediation: expect.stringContaining('pgvector'), + }); + }); + + it('passes when target is empty and pgvector is present', async () => { + const target = makeMockTarget({ hasPgvector: true }); + await expect(checkTargetPreconditions(target, false, ['users'])).resolves.not.toThrow(); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 6. Skipped tables documented */ +/* ------------------------------------------------------------------ */ + +describe('SKIP_TABLES', () => { + it('includes "sessions"', () => { + expect(SKIP_TABLES.has('sessions')).toBe(true); + }); + + it('includes "verifications"', () => { + expect(SKIP_TABLES.has('verifications')).toBe(true); + }); + + it('includes "admin_tokens"', () => { + expect(SKIP_TABLES.has('admin_tokens')).toBe(true); + }); + + it('migration result includes skipped table entries', async () => { + const source = makeMockSource({}); + const target = makeMockTarget(); + + const result = await runMigrateTier(source, target, { + targetUrl: 'postgresql://localhost/test', + dryRun: false, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }); + + const skippedNames = result.tables.filter((t) => t.skipped).map((t) => t.table); + expect(skippedNames).toContain('sessions'); + expect(skippedNames).toContain('verifications'); + expect(skippedNames).toContain('admin_tokens'); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 7. Embedding NULL on migrate from non-pgvector source */ +/* ------------------------------------------------------------------ */ + +describe('normaliseSourceRow — embedding handling', () => { + it('sets embedding to null when sourceHasVector is false and table is insights', () => { + const row: Record = { + id: 'ins-1', + content: 'Some insight', + userId: 'u1', + }; + const normalised = normaliseSourceRow('insights', row, false); + expect(normalised['embedding']).toBeNull(); + }); + + it('preserves existing embedding when sourceHasVector is true', () => { + const embedding = [0.1, 0.2, 0.3]; + const row: Record = { + id: 'ins-1', + content: 'Some insight', + userId: 'u1', + embedding, + }; + const normalised = normaliseSourceRow('insights', row, true); + expect(normalised['embedding']).toBe(embedding); + }); + + it('does not add embedding field to non-vector tables', () => { + const row: Record = { id: 'u1', name: 'Alice' }; + const normalised = normaliseSourceRow('users', row, false); + expect('embedding' in normalised).toBe(false); + }); + + it('passes through rows for non-vector tables unchanged', () => { + const row: Record = { id: 'u1', name: 'Alice', email: 'alice@test.com' }; + const normalised = normaliseSourceRow('users', row, false); + expect(normalised).toEqual(row); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 8. End-to-end: correct order of upsert calls */ +/* ------------------------------------------------------------------ */ + +describe('runMigrateTier — migration order', () => { + it('writes users before messages', async () => { + const source = makeMockSource({ + users: [{ id: 'u1', name: 'Alice', email: 'alice@test.com' }], + messages: [{ id: 'm1', conversationId: 'c1', role: 'user', content: 'Hi' }], + }); + const target = makeMockTarget(); + + await runMigrateTier(source, target, { + targetUrl: 'postgresql://localhost/test', + dryRun: false, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }); + + const tableOrder = target.upsertCalls.map((c) => c.table); + const usersIdx = tableOrder.indexOf('users'); + const messagesIdx = tableOrder.indexOf('messages'); + // users must appear before messages in the upsert call sequence. + expect(usersIdx).toBeGreaterThanOrEqual(0); + expect(messagesIdx).toBeGreaterThanOrEqual(0); + expect(usersIdx).toBeLessThan(messagesIdx); + }); +}); + +/* ------------------------------------------------------------------ */ +/* 9. Embedding-null projection: no-pgvector source */ +/* ------------------------------------------------------------------ */ + +describe('DrizzleMigrationSource embedding-null projection', () => { + it( + 'when sourceHasVector is false, readTable for insights omits embedding column ' + + 'and normaliseSourceRow sets it to null for the target insert', + async () => { + // Source has insights data but no vector — embedding omitted at read time. + const insightRowWithEmbedding = { + id: 'ins-1', + userId: 'u1', + content: 'Test insight', + embedding: [0.1, 0.2, 0.3], // present in raw data but omitted by source + source: 'agent', + category: 'general', + relevanceScore: 1.0, + }; + + // makeMockSource with sourceHasVector=false simulates DrizzleMigrationSource + // behaviour: the embedding field is stripped from the returned row. + const source = makeMockSource( + { + users: [{ id: 'u1', name: 'Alice', email: 'alice@test.com' }], + insights: [insightRowWithEmbedding], + }, + /* sourceHasVector= */ false, + ); + const target = makeMockTarget(); + + await runMigrateTier( + source, + target, + { + targetUrl: 'postgresql://localhost/test', + dryRun: false, + allowNonEmpty: false, + batchSize: 100, + onProgress: noopProgress(), + }, + /* sourceHasVector= */ false, + ); + + // Assert: readTable was called for insights + const insightsRead = source.readTableCalls.find((c) => c.table === 'insights'); + expect(insightsRead).toBeDefined(); + + // Assert: the upsert to insights has embedding === null (not the original vector) + const insightsUpsert = target.upsertCalls.find((c) => c.table === 'insights'); + expect(insightsUpsert).toBeDefined(); + const upsertedRow = insightsUpsert!.rows[0]; + expect(upsertedRow).toBeDefined(); + // embedding must be null — not the original [0.1, 0.2, 0.3] + expect(upsertedRow!['embedding']).toBeNull(); + // Other fields must pass through unchanged + expect(upsertedRow!['id']).toBe('ins-1'); + expect(upsertedRow!['content']).toBe('Test insight'); + }, + ); +}); diff --git a/packages/storage/src/migrate-tier.ts b/packages/storage/src/migrate-tier.ts new file mode 100644 index 0000000..c85da3b --- /dev/null +++ b/packages/storage/src/migrate-tier.ts @@ -0,0 +1,697 @@ +/** + * migrate-tier.ts — Core logic for `mosaic storage migrate-tier`. + * + * Migrates data from `tier: local` (PGlite, normalized Drizzle schema) or + * `tier: standalone` (Postgres without pgvector) → `tier: federated` + * (Postgres + pgvector). + * + * Source: DrizzleMigrationSource — reads from the NORMALIZED Drizzle/relational + * schema tables (not the flat `id TEXT + data JSONB` PgliteAdapter schema). + * Both local (PGlite) and standalone (Postgres) sources use the same Drizzle + * abstraction via createPgliteDb() or createDb() from @mosaicstack/db. + * Target: PostgresMigrationTarget — upserts via raw SQL into the same schema. + * + * Key design decisions: + * - Tables are migrated in topological (FK-dependency) order so that + * parent rows exist before child rows are inserted. + * - sessions + verifications are skipped — they are ephemeral / TTL'd. + * - adminTokens is skipped — token hashes are environment-specific + * and should be re-issued on the target. + * - insights.embedding is omitted from source SELECT when the source lacks + * pgvector (local/PGlite tier); target insert gets NULL for that column. + * insights.embedding is nullable per schema (no .notNull() constraint). + * - Each table's batch is wrapped in a transaction for atomicity. + * - Upsert semantics (ON CONFLICT DO UPDATE) make re-runs idempotent. + * + * TODO (FED-M1-08): Add integration tests against real PGlite → real PG. + */ + +import postgres from 'postgres'; +import * as schema from '@mosaicstack/db'; +import { sql as drizzleSql } from '@mosaicstack/db'; + +/* ------------------------------------------------------------------ */ +/* Types */ +/* ------------------------------------------------------------------ */ + +export interface MigrationSource { + /** + * Return all rows from a table (normalized Drizzle schema rows). + * When sourceHasVector is false and the table has a vector column, + * the source MUST omit the vector column from the result and the + * caller will set it to null (see normaliseSourceRow). + */ + readTable( + tableName: string, + opts?: { limit?: number; offset?: number }, + ): Promise[]>; + + /** Count rows in a table. */ + count(tableName: string): Promise; + + /** Close the source connection. */ + close(): Promise; +} + +export interface MigrationTarget { + /** + * Upsert a batch of rows into a table. + * Must use ON CONFLICT (id) DO UPDATE semantics. + */ + upsertBatch(tableName: string, rows: Record[]): Promise; + + /** + * Count rows in a target table. + */ + count(tableName: string): Promise; + + /** + * Check whether pgvector extension is installed. + */ + hasPgvector(): Promise; + + /** Close the target connection. */ + close(): Promise; +} + +export interface MigrateTierOptions { + /** Target postgres connection URL. */ + targetUrl: string; + /** Whether to skip all writes (dry-run). */ + dryRun: boolean; + /** Skip the non-empty target guard. */ + allowNonEmpty: boolean; + /** Rows per transaction batch. */ + batchSize: number; + /** Called with progress messages. */ + onProgress: (msg: string) => void; +} + +export interface TableMigrationResult { + table: string; + rowsMigrated: number; + skipped: boolean; + skipReason?: string; +} + +export interface MigrateTierResult { + tables: TableMigrationResult[]; + totalRows: number; + dryRun: boolean; +} + +/* ------------------------------------------------------------------ */ +/* Schema: FK-aware topological table order */ +/* ------------------------------------------------------------------ */ + +/** + * SKIP_TABLES: ephemeral or environment-specific tables not worth migrating. + * + * - sessions: TTL'd auth sessions — invalid in new environment. + * - verifications: one-time tokens (email verify, etc.) — already expired. + * - admin_tokens: hashed tokens bound to old environment keys — re-issue. + */ +export const SKIP_TABLES = new Set(['sessions', 'verifications', 'admin_tokens']); + +/** + * Topologically ordered table list (parents before children). + * + * Derived from FK references in packages/db/src/schema.ts: + * + * users (no FKs) + * teams → users + * team_members → teams, users + * accounts → users + * projects → users, teams + * agents → projects, users + * missions → projects, users + * tasks → projects, missions + * mission_tasks → missions, tasks, users + * conversations → users, projects, agents + * messages → conversations + * preferences → users + * insights → users [has embedding vector column] + * agent_logs → users + * skills → users (installedBy, nullable) + * routing_rules → users (userId, nullable) + * provider_credentials → users + * appreciations (no FKs) + * events (no FKs) + * tickets (no FKs) + * summarization_jobs (no FKs) + * + * Skipped (not in this list): + * sessions → users (ephemeral) + * verifications (no FKs, ephemeral) + * admin_tokens → users (environment-specific) + */ +export const MIGRATION_ORDER: string[] = [ + 'users', + 'teams', + 'team_members', + 'accounts', + 'projects', + 'agents', + 'missions', + 'tasks', + 'mission_tasks', + 'conversations', + 'messages', + 'preferences', + 'insights', + 'agent_logs', + 'skills', + 'routing_rules', + 'provider_credentials', + 'appreciations', + 'events', + 'tickets', + 'summarization_jobs', +]; + +/** Tables that carry a vector embedding column on the target. */ +const VECTOR_TABLES = new Set(['insights']); + +/* ------------------------------------------------------------------ */ +/* Utility: derive topological order from an adjacency list */ +/* ------------------------------------------------------------------ */ + +/** + * Given an adjacency list (table → list of tables it depends on), + * return a valid topological ordering (Kahn's algorithm). + * + * Exposed for unit testing. + */ +export function topoSort(deps: Map): string[] { + const nodes = [...deps.keys()]; + const inDegree = new Map(); + const adjReverse = new Map(); + + for (const node of nodes) { + if (!inDegree.has(node)) inDegree.set(node, 0); + if (!adjReverse.has(node)) adjReverse.set(node, []); + for (const dep of deps.get(node) ?? []) { + inDegree.set(node, (inDegree.get(node) ?? 0) + 1); + if (!adjReverse.has(dep)) adjReverse.set(dep, []); + adjReverse.get(dep)!.push(node); + } + } + + // Start with nodes that have no dependencies. + const queue: string[] = []; + for (const [node, deg] of inDegree) { + if (deg === 0) queue.push(node); + } + + const result: string[] = []; + while (queue.length > 0) { + const node = queue.shift()!; + result.push(node); + for (const dependent of adjReverse.get(node) ?? []) { + const newDeg = (inDegree.get(dependent) ?? 0) - 1; + inDegree.set(dependent, newDeg); + if (newDeg === 0) queue.push(dependent); + } + } + + if (result.length !== nodes.length) { + throw new Error('Cycle detected in FK dependency graph'); + } + return result; +} + +/** + * Return the migration table order, excluding SKIP_TABLES. + * Uses the pre-computed MIGRATION_ORDER (verified against schema.ts). + */ +export function getMigrationOrder(): string[] { + return MIGRATION_ORDER.filter((t) => !SKIP_TABLES.has(t)); +} + +/* ------------------------------------------------------------------ */ +/* TABLE_OBJECTS: migration table name → Drizzle table object */ +/* ------------------------------------------------------------------ */ + +/** + * Maps MIGRATION_ORDER table names to their corresponding Drizzle table + * objects from the normalized schema. Used by DrizzleMigrationSource to + * execute typed `db.select().from(table)` queries. + * + * Keyed by snake_case table name (matching MIGRATION_ORDER + SKIP_TABLES). + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const TABLE_OBJECTS: Record = { + users: schema.users, + teams: schema.teams, + team_members: schema.teamMembers, + accounts: schema.accounts, + projects: schema.projects, + agents: schema.agents, + missions: schema.missions, + tasks: schema.tasks, + mission_tasks: schema.missionTasks, + conversations: schema.conversations, + messages: schema.messages, + preferences: schema.preferences, + insights: schema.insights, + agent_logs: schema.agentLogs, + skills: schema.skills, + routing_rules: schema.routingRules, + provider_credentials: schema.providerCredentials, + appreciations: schema.appreciations, + events: schema.events, + tickets: schema.tickets, + summarization_jobs: schema.summarizationJobs, + // Skipped tables — included so count() works for preflight but never passed + // to upsertBatch. + sessions: schema.sessions, + verifications: schema.verifications, + admin_tokens: schema.adminTokens, +}; + +/* ------------------------------------------------------------------ */ +/* DrizzleMigrationSource */ +/* ------------------------------------------------------------------ */ + +/** + * MigrationSource backed by a Drizzle DB handle (works with both + * PostgresJsDatabase and PgliteDatabase — they share the same Drizzle + * query API for schema-defined tables). + * + * For the `insights` table (the only vector-column table), when the source + * lacks pgvector (local/PGlite tier), the `embedding` column is excluded + * from the SELECT projection via a raw `db.execute()` query that lists + * only non-vector columns. This prevents a type-registration error from + * PGlite, which does not know the `vector` type. The caller (runMigrateTier + * via normaliseSourceRow) will set embedding to null on the resulting rows. + * + * Column projection is opt-in: pass `sourceHasVector: false` to activate it. + */ +export class DrizzleMigrationSource implements MigrationSource { + constructor( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private readonly db: any, + private readonly sourceHasVector: boolean = true, + ) {} + + /** + * Columns of the insights table that do NOT include the vector embedding. + * Used for the no-pgvector projection path. + */ + private static readonly INSIGHTS_COLUMNS_NO_VECTOR = [ + 'id', + 'user_id', + 'content', + 'source', + 'category', + 'relevance_score', + 'metadata', + 'created_at', + 'updated_at', + 'decayed_at', + ] as const; + + async readTable( + tableName: string, + opts?: { limit?: number; offset?: number }, + ): Promise[]> { + const table = TABLE_OBJECTS[tableName]; + if (!table) throw new Error(`DrizzleMigrationSource: unknown table "${tableName}"`); + + // For vector tables when source lacks pgvector: use column-allowlist raw query + // to avoid type-registration errors. + if (VECTOR_TABLES.has(tableName) && !this.sourceHasVector) { + const cols = DrizzleMigrationSource.INSIGHTS_COLUMNS_NO_VECTOR.map((c) => `"${c}"`).join( + ', ', + ); + let sql = `SELECT ${cols} FROM "${tableName}"`; + const params: unknown[] = []; + if (opts?.limit !== undefined) { + params.push(opts.limit); + sql += ` LIMIT $${params.length.toString()}`; + } + if (opts?.offset !== undefined) { + params.push(opts.offset); + sql += ` OFFSET $${params.length.toString()}`; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const result = await (this.db as any).execute( + // drizzle-orm/pglite and drizzle-orm/postgres-js both accept a raw + // SQL template; use the tagged-template sql helper from drizzle-orm. + // Since we need dynamic params, we use db.execute with a raw string + // via the PGlite/postgres.js driver directly. + { sql, params, typings: [] }, + ); + // drizzle execute returns { rows: unknown[][] } for PGlite driver, + // or a RowList for postgres.js. Normalise both shapes. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const raw = result as any; + if (Array.isArray(raw)) { + // postgres.js shape: array of row objects + return raw as Record[]; + } + if (raw?.rows && Array.isArray(raw.rows)) { + // PGlite shape: { rows: unknown[][] } OR { rows: Record[] } + const rows = raw.rows as unknown[]; + if (rows.length === 0) return []; + if (Array.isArray(rows[0])) { + // Columnar: convert to objects using fields array if available + const fields: string[] = + (raw.fields as Array<{ name: string }> | undefined)?.map((f) => f.name) ?? + DrizzleMigrationSource.INSIGHTS_COLUMNS_NO_VECTOR.slice(); + return (rows as unknown[][]).map((row) => { + const obj: Record = {}; + for (let i = 0; i < fields.length; i++) { + obj[fields[i]!] = row[i]; + } + return obj; + }); + } + return rows as Record[]; + } + return []; + } + + // Standard Drizzle select for all other tables (and vector tables when + // the source has pgvector registered). + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let query = (this.db as any).select().from(table); + if (opts?.limit !== undefined) query = query.limit(opts.limit); + if (opts?.offset !== undefined) query = query.offset(opts.offset); + return (await query) as Record[]; + } + + async count(tableName: string): Promise { + const table = TABLE_OBJECTS[tableName]; + if (!table) throw new Error(`DrizzleMigrationSource: unknown table "${tableName}"`); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const [row] = await (this.db as any) + .select({ n: drizzleSql`COUNT(*)::int` }) + .from(table); + return (row as { n: number } | undefined)?.n ?? 0; + } + + async close(): Promise { + // Lifecycle managed externally — caller closes the db handle. + } +} + +/* ------------------------------------------------------------------ */ +/* Real postgres target adapter */ +/* ------------------------------------------------------------------ */ + +/** + * Live implementation of MigrationTarget backed by a real Postgres connection. + * Used by the CLI; mocked in tests. + */ +export class PostgresMigrationTarget implements MigrationTarget { + private sql: ReturnType; + + constructor(url: string) { + this.sql = postgres(url, { + max: 5, + connect_timeout: 10, + idle_timeout: 30, + }); + } + + async upsertBatch(tableName: string, rows: Record[]): Promise { + if (rows.length === 0) return; + + // Collect all column names from the batch (union of all row keys). + const colSet = new Set(); + for (const row of rows) { + for (const k of Object.keys(row)) colSet.add(k); + } + const cols = [...colSet]; + if (!cols.includes('id')) { + throw new Error(`Table ${tableName}: rows missing 'id' column`); + } + + // Build VALUES list — use postgres tagged-template helpers for safety. + // postgres.js supports bulk inserts via array of objects. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await this.sql.begin(async (tx: any) => { + // Insert in chunks to avoid enormous single queries. + for (let i = 0; i < rows.length; i += 500) { + const chunk = rows.slice(i, i + 500); + + // Normalise rows: fill missing columns with null. + const normalised = chunk.map((row) => { + const out: Record = {}; + for (const col of cols) { + out[col] = row[col] ?? null; + } + return out; + }); + + const colList = cols.map((c) => `"${c}"`).join(', '); + const updateList = cols + .filter((c) => c !== 'id') + .map((c) => `"${c}" = EXCLUDED."${c}"`) + .join(', '); + + // Build values placeholders + const valuePlaceholders = normalised + .map((_, ri) => `(${cols.map((_, ci) => `$${ri * cols.length + ci + 1}`).join(', ')})`) + .join(', '); + + const flatValues = normalised.flatMap((row) => cols.map((c) => row[c] ?? null)); + + const query = ` + INSERT INTO "${tableName}" (${colList}) + VALUES ${valuePlaceholders} + ON CONFLICT (id) DO UPDATE SET ${updateList} + `; + + await tx.unsafe(query, flatValues as never[]); + } + }); + } + + async count(tableName: string): Promise { + const rows = await this.sql.unsafe(`SELECT COUNT(*)::int AS n FROM "${tableName}"`); + const row = rows[0] as unknown as { n: number } | undefined; + return row?.n ?? 0; + } + + async hasPgvector(): Promise { + const rows = await this.sql` + SELECT 1 FROM pg_extension WHERE extname = 'vector' + `; + return rows.length > 0; + } + + async close(): Promise { + await this.sql.end(); + } +} + +/* ------------------------------------------------------------------ */ +/* Source-row normalisation */ +/* ------------------------------------------------------------------ */ + +/** + * Drizzle returns rows as camelCase TypeScript objects (e.g. `userId`, not + * `user_id`). The PostgresMigrationTarget upserts via raw SQL and uses the + * column names as given — the `insights` no-vector path uses snake_case column + * aliases in the SELECT, so those rows already arrive as snake_case. + * + * For vector tables (insights), if `embedding` is absent from the source row + * (because DrizzleMigrationSource omitted it in the no-vector projection), we + * explicitly set it to null so the target ON CONFLICT UPDATE doesn't error. + * + * NOTE: insights.embedding is defined as `vector('embedding', { dimensions: + * 1536 })` with no `.notNull()` in schema.ts — it accepts NULL. + */ +export function normaliseSourceRow( + tableName: string, + row: Record, + sourceHasVector: boolean, +): Record { + const out = { ...row }; + + if (VECTOR_TABLES.has(tableName) && !sourceHasVector) { + // Source cannot have embeddings — explicitly null them so ON CONFLICT + // UPDATE doesn't try to write undefined. + out['embedding'] = null; + } + + return out; +} + +/* ------------------------------------------------------------------ */ +/* Precondition checks */ +/* ------------------------------------------------------------------ */ + +export class MigrationPreconditionError extends Error { + constructor( + message: string, + public readonly remediation: string, + ) { + super(message); + this.name = 'MigrationPreconditionError'; + } +} + +/** + * Verify target preconditions before writing any data. + * + * Checks: + * 1. pgvector extension installed. + * 2. User-data tables are empty (unless --allow-non-empty). + */ +export async function checkTargetPreconditions( + target: MigrationTarget, + allowNonEmpty: boolean, + tablesToMigrate: string[], +): Promise { + const hasVector = await target.hasPgvector(); + if (!hasVector) { + throw new MigrationPreconditionError( + 'Target Postgres does not have the pgvector extension installed.', + 'Run: CREATE EXTENSION IF NOT EXISTS vector; — or use the pgvector/pgvector:pg17 Docker image.', + ); + } + + if (!allowNonEmpty) { + // Check the first non-empty user-data table. + for (const table of tablesToMigrate) { + const n = await target.count(table); + if (n > 0) { + throw new MigrationPreconditionError( + `Target table "${table}" already contains ${n.toString()} rows.`, + 'Pass --allow-non-empty to overwrite existing data (upsert semantics), ' + + 'or point to an empty target database.', + ); + } + } + } +} + +/* ------------------------------------------------------------------ */ +/* Core migration runner */ +/* ------------------------------------------------------------------ */ + +/** + * Run the tier migration. + * + * @param source Adapter for reading source rows. + * @param target Adapter for writing rows to target. + * @param opts Migration options. + * @param sourceHasVector True if the source tier supports vector columns. + */ +export async function runMigrateTier( + source: MigrationSource, + target: MigrationTarget, + opts: MigrateTierOptions, + sourceHasVector = false, +): Promise { + const { dryRun, allowNonEmpty, batchSize, onProgress } = opts; + + const tablesToMigrate = getMigrationOrder(); + + // Preflight: gather row counts from source. + onProgress('[migrate-tier] Gathering source row counts...'); + const sourceCounts = new Map(); + for (const table of tablesToMigrate) { + const n = await source.count(table); + sourceCounts.set(table, n); + } + + // Log preflight summary. + onProgress('[migrate-tier] Tables to migrate:'); + for (const table of tablesToMigrate) { + const n = sourceCounts.get(table) ?? 0; + onProgress(` ${table}: ${n.toString()} rows`); + } + for (const skipped of SKIP_TABLES) { + onProgress(` ${skipped}: SKIPPED (ephemeral/environment-specific)`); + } + + // Vector column notice. + if (!sourceHasVector) { + onProgress( + '[migrate-tier] NOTE: Source tier has no pgvector support. ' + + 'insights.embedding will be NULL on all migrated rows.', + ); + } + + if (dryRun) { + onProgress('[migrate-tier] DRY RUN — no writes will be made.'); + const tables: TableMigrationResult[] = tablesToMigrate.map((t) => ({ + table: t, + rowsMigrated: 0, + skipped: false, + })); + for (const skipped of SKIP_TABLES) { + tables.push({ table: skipped, rowsMigrated: 0, skipped: true, skipReason: 'ephemeral' }); + } + return { tables, totalRows: 0, dryRun: true }; + } + + // Check preconditions before writing. + await checkTargetPreconditions(target, allowNonEmpty, tablesToMigrate); + + const results: TableMigrationResult[] = []; + let totalRows = 0; + + for (const table of tablesToMigrate) { + const sourceCount = sourceCounts.get(table) ?? 0; + + if (sourceCount === 0) { + onProgress(`[migrate-tier] ${table}: 0 rows — skipping.`); + results.push({ table, rowsMigrated: 0, skipped: false }); + continue; + } + + onProgress(`[migrate-tier] ${table}: migrating ${sourceCount.toString()} rows...`); + + let offset = 0; + let tableTotal = 0; + let lastSuccessfulId: string | undefined; + + try { + while (offset < sourceCount) { + const rows = await source.readTable(table, { limit: batchSize, offset }); + if (rows.length === 0) break; + + const normalised = rows.map((row) => normaliseSourceRow(table, row, sourceHasVector)); + + await target.upsertBatch(table, normalised); + + lastSuccessfulId = rows[rows.length - 1]?.['id'] as string | undefined; + tableTotal += rows.length; + offset += rows.length; + + onProgress( + `[migrate-tier] ${table}: ${tableTotal.toString()}/${sourceCount.toString()} rows written`, + ); + } + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + throw new Error( + `[migrate-tier] Failed on table "${table}" after ${tableTotal.toString()} rows ` + + `(last id: ${lastSuccessfulId ?? 'none'}). Error: ${errMsg}\n` + + `Remediation: Re-run with --allow-non-empty to resume (upsert is idempotent).`, + ); + } + + results.push({ table, rowsMigrated: tableTotal, skipped: false }); + totalRows += tableTotal; + onProgress(`[migrate-tier] ${table}: done (${tableTotal.toString()} rows).`); + } + + // Add skipped table records. + for (const skipped of SKIP_TABLES) { + results.push({ + table: skipped, + rowsMigrated: 0, + skipped: true, + skipReason: 'ephemeral or environment-specific — re-issue on target', + }); + } + + onProgress(`[migrate-tier] Complete. ${totalRows.toString()} total rows migrated.`); + return { tables: results, totalRows, dryRun: false }; +} diff --git a/packages/storage/vitest.config.ts b/packages/storage/vitest.config.ts new file mode 100644 index 0000000..8e730d5 --- /dev/null +++ b/packages/storage/vitest.config.ts @@ -0,0 +1,8 @@ +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + globals: true, + environment: 'node', + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f8ce05f..7e90200 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -648,6 +648,9 @@ importers: commander: specifier: ^13.0.0 version: 13.1.0 + postgres: + specifier: ^3.4.8 + version: 3.4.8 devDependencies: typescript: specifier: ^5.8.0 @@ -695,10 +698,10 @@ importers: dependencies: '@mariozechner/pi-agent-core': specifier: ^0.63.1 - version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) + version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) '@mariozechner/pi-ai': specifier: ^0.63.1 - version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) + version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) '@sinclair/typebox': specifier: ^0.34.41 version: 0.34.48 @@ -7256,6 +7259,12 @@ snapshots: '@jridgewell/gen-mapping': 0.3.13 '@jridgewell/trace-mapping': 0.3.31 + '@anthropic-ai/sdk@0.73.0(zod@3.25.76)': + dependencies: + json-schema-to-ts: 3.1.1 + optionalDependencies: + zod: 3.25.76 + '@anthropic-ai/sdk@0.73.0(zod@4.3.6)': dependencies: json-schema-to-ts: 3.1.1 @@ -8597,6 +8606,18 @@ snapshots: - ws - zod + '@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)': + dependencies: + '@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) + transitivePeerDependencies: + - '@modelcontextprotocol/sdk' + - aws-crt + - bufferutil + - supports-color + - utf-8-validate + - ws + - zod + '@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': dependencies: '@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) @@ -8645,6 +8666,30 @@ snapshots: - ws - zod + '@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)': + dependencies: + '@anthropic-ai/sdk': 0.73.0(zod@3.25.76) + '@aws-sdk/client-bedrock-runtime': 3.1008.0 + '@google/genai': 1.45.0(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6)) + '@mistralai/mistralai': 1.14.1 + '@sinclair/typebox': 0.34.48 + ajv: 8.18.0 + ajv-formats: 3.0.1(ajv@8.18.0) + chalk: 5.6.2 + openai: 6.26.0(ws@8.20.0)(zod@3.25.76) + partial-json: 0.1.7 + proxy-agent: 6.5.0 + undici: 7.24.3 + zod-to-json-schema: 3.25.1(zod@3.25.76) + transitivePeerDependencies: + - '@modelcontextprotocol/sdk' + - aws-crt + - bufferutil + - supports-color + - utf-8-validate + - ws + - zod + '@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': dependencies: '@anthropic-ai/sdk': 0.73.0(zod@4.3.6) @@ -13140,6 +13185,11 @@ snapshots: dependencies: mimic-function: 5.0.1 + openai@6.26.0(ws@8.20.0)(zod@3.25.76): + optionalDependencies: + ws: 8.20.0 + zod: 3.25.76 + openai@6.26.0(ws@8.20.0)(zod@4.3.6): optionalDependencies: ws: 8.20.0