/** * migrate-tier.ts — Core logic for `mosaic storage migrate-tier`. * * Migrates data from `tier: local` (PGlite, normalized Drizzle schema) or * `tier: standalone` (Postgres without pgvector) → `tier: federated` * (Postgres + pgvector). * * Source: DrizzleMigrationSource — reads from the NORMALIZED Drizzle/relational * schema tables (not the flat `id TEXT + data JSONB` PgliteAdapter schema). * Both local (PGlite) and standalone (Postgres) sources use the same Drizzle * abstraction via createPgliteDb() or createDb() from @mosaicstack/db. * Target: PostgresMigrationTarget — upserts via raw SQL into the same schema. * * Key design decisions: * - Tables are migrated in topological (FK-dependency) order so that * parent rows exist before child rows are inserted. * - sessions + verifications are skipped — they are ephemeral / TTL'd. * - adminTokens is skipped — token hashes are environment-specific * and should be re-issued on the target. * - insights.embedding is omitted from source SELECT when the source lacks * pgvector (local/PGlite tier); target insert gets NULL for that column. * insights.embedding is nullable per schema (no .notNull() constraint). * - Each table's batch is wrapped in a transaction for atomicity. * - Upsert semantics (ON CONFLICT DO UPDATE) make re-runs idempotent. * * TODO (FED-M1-08): Add integration tests against real PGlite → real PG. */ import postgres from 'postgres'; import * as schema from '@mosaicstack/db'; import { sql as drizzleSql } from '@mosaicstack/db'; /* ------------------------------------------------------------------ */ /* Types */ /* ------------------------------------------------------------------ */ export interface MigrationSource { /** * Return all rows from a table (normalized Drizzle schema rows). * When sourceHasVector is false and the table has a vector column, * the source MUST omit the vector column from the result and the * caller will set it to null (see normaliseSourceRow). */ readTable( tableName: string, opts?: { limit?: number; offset?: number }, ): Promise[]>; /** Count rows in a table. */ count(tableName: string): Promise; /** Close the source connection. */ close(): Promise; } export interface MigrationTarget { /** * Upsert a batch of rows into a table. * Must use ON CONFLICT (id) DO UPDATE semantics. */ upsertBatch(tableName: string, rows: Record[]): Promise; /** * Count rows in a target table. */ count(tableName: string): Promise; /** * Check whether pgvector extension is installed. */ hasPgvector(): Promise; /** Close the target connection. */ close(): Promise; } export interface MigrateTierOptions { /** Target postgres connection URL. */ targetUrl: string; /** Whether to skip all writes (dry-run). */ dryRun: boolean; /** Skip the non-empty target guard. */ allowNonEmpty: boolean; /** Rows per transaction batch. */ batchSize: number; /** Called with progress messages. */ onProgress: (msg: string) => void; } export interface TableMigrationResult { table: string; rowsMigrated: number; skipped: boolean; skipReason?: string; } export interface MigrateTierResult { tables: TableMigrationResult[]; totalRows: number; dryRun: boolean; } /* ------------------------------------------------------------------ */ /* Schema: FK-aware topological table order */ /* ------------------------------------------------------------------ */ /** * SKIP_TABLES: ephemeral or environment-specific tables not worth migrating. * * - sessions: TTL'd auth sessions — invalid in new environment. * - verifications: one-time tokens (email verify, etc.) — already expired. * - admin_tokens: hashed tokens bound to old environment keys — re-issue. */ export const SKIP_TABLES = new Set(['sessions', 'verifications', 'admin_tokens']); /** * Topologically ordered table list (parents before children). * * Derived from FK references in packages/db/src/schema.ts: * * users (no FKs) * teams → users * team_members → teams, users * accounts → users * projects → users, teams * agents → projects, users * missions → projects, users * tasks → projects, missions * mission_tasks → missions, tasks, users * conversations → users, projects, agents * messages → conversations * preferences → users * insights → users [has embedding vector column] * agent_logs → users * skills → users (installedBy, nullable) * routing_rules → users (userId, nullable) * provider_credentials → users * appreciations (no FKs) * events (no FKs) * tickets (no FKs) * summarization_jobs (no FKs) * * Skipped (not in this list): * sessions → users (ephemeral) * verifications (no FKs, ephemeral) * admin_tokens → users (environment-specific) */ export const MIGRATION_ORDER: string[] = [ 'users', 'teams', 'team_members', 'accounts', 'projects', 'agents', 'missions', 'tasks', 'mission_tasks', 'conversations', 'messages', 'preferences', 'insights', 'agent_logs', 'skills', 'routing_rules', 'provider_credentials', 'appreciations', 'events', 'tickets', 'summarization_jobs', ]; /** Tables that carry a vector embedding column on the target. */ const VECTOR_TABLES = new Set(['insights']); /* ------------------------------------------------------------------ */ /* Utility: derive topological order from an adjacency list */ /* ------------------------------------------------------------------ */ /** * Given an adjacency list (table → list of tables it depends on), * return a valid topological ordering (Kahn's algorithm). * * Exposed for unit testing. */ export function topoSort(deps: Map): string[] { const nodes = [...deps.keys()]; const inDegree = new Map(); const adjReverse = new Map(); for (const node of nodes) { if (!inDegree.has(node)) inDegree.set(node, 0); if (!adjReverse.has(node)) adjReverse.set(node, []); for (const dep of deps.get(node) ?? []) { inDegree.set(node, (inDegree.get(node) ?? 0) + 1); if (!adjReverse.has(dep)) adjReverse.set(dep, []); adjReverse.get(dep)!.push(node); } } // Start with nodes that have no dependencies. const queue: string[] = []; for (const [node, deg] of inDegree) { if (deg === 0) queue.push(node); } const result: string[] = []; while (queue.length > 0) { const node = queue.shift()!; result.push(node); for (const dependent of adjReverse.get(node) ?? []) { const newDeg = (inDegree.get(dependent) ?? 0) - 1; inDegree.set(dependent, newDeg); if (newDeg === 0) queue.push(dependent); } } if (result.length !== nodes.length) { throw new Error('Cycle detected in FK dependency graph'); } return result; } /** * Return the migration table order, excluding SKIP_TABLES. * Uses the pre-computed MIGRATION_ORDER (verified against schema.ts). */ export function getMigrationOrder(): string[] { return MIGRATION_ORDER.filter((t) => !SKIP_TABLES.has(t)); } /* ------------------------------------------------------------------ */ /* TABLE_OBJECTS: migration table name → Drizzle table object */ /* ------------------------------------------------------------------ */ /** * Maps MIGRATION_ORDER table names to their corresponding Drizzle table * objects from the normalized schema. Used by DrizzleMigrationSource to * execute typed `db.select().from(table)` queries. * * Keyed by snake_case table name (matching MIGRATION_ORDER + SKIP_TABLES). */ // eslint-disable-next-line @typescript-eslint/no-explicit-any const TABLE_OBJECTS: Record = { users: schema.users, teams: schema.teams, team_members: schema.teamMembers, accounts: schema.accounts, projects: schema.projects, agents: schema.agents, missions: schema.missions, tasks: schema.tasks, mission_tasks: schema.missionTasks, conversations: schema.conversations, messages: schema.messages, preferences: schema.preferences, insights: schema.insights, agent_logs: schema.agentLogs, skills: schema.skills, routing_rules: schema.routingRules, provider_credentials: schema.providerCredentials, appreciations: schema.appreciations, events: schema.events, tickets: schema.tickets, summarization_jobs: schema.summarizationJobs, // Skipped tables — included so count() works for preflight but never passed // to upsertBatch. sessions: schema.sessions, verifications: schema.verifications, admin_tokens: schema.adminTokens, }; /* ------------------------------------------------------------------ */ /* DrizzleMigrationSource */ /* ------------------------------------------------------------------ */ /** * MigrationSource backed by a Drizzle DB handle (works with both * PostgresJsDatabase and PgliteDatabase — they share the same Drizzle * query API for schema-defined tables). * * For the `insights` table (the only vector-column table), when the source * lacks pgvector (local/PGlite tier), the `embedding` column is excluded * from the SELECT projection via a raw `db.execute()` query that lists * only non-vector columns. This prevents a type-registration error from * PGlite, which does not know the `vector` type. The caller (runMigrateTier * via normaliseSourceRow) will set embedding to null on the resulting rows. * * Column projection is opt-in: pass `sourceHasVector: false` to activate it. */ export class DrizzleMigrationSource implements MigrationSource { constructor( // eslint-disable-next-line @typescript-eslint/no-explicit-any private readonly db: any, private readonly sourceHasVector: boolean = true, ) {} /** * Columns of the insights table that do NOT include the vector embedding. * Used for the no-pgvector projection path. */ private static readonly INSIGHTS_COLUMNS_NO_VECTOR = [ 'id', 'user_id', 'content', 'source', 'category', 'relevance_score', 'metadata', 'created_at', 'updated_at', 'decayed_at', ] as const; async readTable( tableName: string, opts?: { limit?: number; offset?: number }, ): Promise[]> { const table = TABLE_OBJECTS[tableName]; if (!table) throw new Error(`DrizzleMigrationSource: unknown table "${tableName}"`); // For vector tables when source lacks pgvector: use column-allowlist raw query // to avoid type-registration errors. if (VECTOR_TABLES.has(tableName) && !this.sourceHasVector) { const cols = DrizzleMigrationSource.INSIGHTS_COLUMNS_NO_VECTOR.map((c) => `"${c}"`).join( ', ', ); let sql = `SELECT ${cols} FROM "${tableName}"`; const params: unknown[] = []; if (opts?.limit !== undefined) { params.push(opts.limit); sql += ` LIMIT $${params.length.toString()}`; } if (opts?.offset !== undefined) { params.push(opts.offset); sql += ` OFFSET $${params.length.toString()}`; } // eslint-disable-next-line @typescript-eslint/no-explicit-any const result = await (this.db as any).execute( // drizzle-orm/pglite and drizzle-orm/postgres-js both accept a raw // SQL template; use the tagged-template sql helper from drizzle-orm. // Since we need dynamic params, we use db.execute with a raw string // via the PGlite/postgres.js driver directly. { sql, params, typings: [] }, ); // drizzle execute returns { rows: unknown[][] } for PGlite driver, // or a RowList for postgres.js. Normalise both shapes. // eslint-disable-next-line @typescript-eslint/no-explicit-any const raw = result as any; if (Array.isArray(raw)) { // postgres.js shape: array of row objects return raw as Record[]; } if (raw?.rows && Array.isArray(raw.rows)) { // PGlite shape: { rows: unknown[][] } OR { rows: Record[] } const rows = raw.rows as unknown[]; if (rows.length === 0) return []; if (Array.isArray(rows[0])) { // Columnar: convert to objects using fields array if available const fields: string[] = (raw.fields as Array<{ name: string }> | undefined)?.map((f) => f.name) ?? DrizzleMigrationSource.INSIGHTS_COLUMNS_NO_VECTOR.slice(); return (rows as unknown[][]).map((row) => { const obj: Record = {}; for (let i = 0; i < fields.length; i++) { obj[fields[i]!] = row[i]; } return obj; }); } return rows as Record[]; } return []; } // Standard Drizzle select for all other tables (and vector tables when // the source has pgvector registered). // eslint-disable-next-line @typescript-eslint/no-explicit-any let query = (this.db as any).select().from(table); if (opts?.limit !== undefined) query = query.limit(opts.limit); if (opts?.offset !== undefined) query = query.offset(opts.offset); return (await query) as Record[]; } async count(tableName: string): Promise { const table = TABLE_OBJECTS[tableName]; if (!table) throw new Error(`DrizzleMigrationSource: unknown table "${tableName}"`); // eslint-disable-next-line @typescript-eslint/no-explicit-any const [row] = await (this.db as any) .select({ n: drizzleSql`COUNT(*)::int` }) .from(table); return (row as { n: number } | undefined)?.n ?? 0; } async close(): Promise { // Lifecycle managed externally — caller closes the db handle. } } /* ------------------------------------------------------------------ */ /* Real postgres target adapter */ /* ------------------------------------------------------------------ */ /** * Live implementation of MigrationTarget backed by a real Postgres connection. * Used by the CLI; mocked in tests. */ export class PostgresMigrationTarget implements MigrationTarget { private sql: ReturnType; constructor(url: string) { this.sql = postgres(url, { max: 5, connect_timeout: 10, idle_timeout: 30, }); } async upsertBatch(tableName: string, rows: Record[]): Promise { if (rows.length === 0) return; // Collect all column names from the batch (union of all row keys). const colSet = new Set(); for (const row of rows) { for (const k of Object.keys(row)) colSet.add(k); } const cols = [...colSet]; if (!cols.includes('id')) { throw new Error(`Table ${tableName}: rows missing 'id' column`); } // Build VALUES list — use postgres tagged-template helpers for safety. // postgres.js supports bulk inserts via array of objects. // eslint-disable-next-line @typescript-eslint/no-explicit-any await this.sql.begin(async (tx: any) => { // Insert in chunks to avoid enormous single queries. for (let i = 0; i < rows.length; i += 500) { const chunk = rows.slice(i, i + 500); // Normalise rows: fill missing columns with null. const normalised = chunk.map((row) => { const out: Record = {}; for (const col of cols) { out[col] = row[col] ?? null; } return out; }); const colList = cols.map((c) => `"${c}"`).join(', '); const updateList = cols .filter((c) => c !== 'id') .map((c) => `"${c}" = EXCLUDED."${c}"`) .join(', '); // Build values placeholders const valuePlaceholders = normalised .map((_, ri) => `(${cols.map((_, ci) => `$${ri * cols.length + ci + 1}`).join(', ')})`) .join(', '); const flatValues = normalised.flatMap((row) => cols.map((c) => row[c] ?? null)); const query = ` INSERT INTO "${tableName}" (${colList}) VALUES ${valuePlaceholders} ON CONFLICT (id) DO UPDATE SET ${updateList} `; await tx.unsafe(query, flatValues as never[]); } }); } async count(tableName: string): Promise { const rows = await this.sql.unsafe(`SELECT COUNT(*)::int AS n FROM "${tableName}"`); const row = rows[0] as unknown as { n: number } | undefined; return row?.n ?? 0; } async hasPgvector(): Promise { const rows = await this.sql` SELECT 1 FROM pg_extension WHERE extname = 'vector' `; return rows.length > 0; } async close(): Promise { await this.sql.end(); } } /* ------------------------------------------------------------------ */ /* Source-row normalisation */ /* ------------------------------------------------------------------ */ /** * Convert a camelCase key to snake_case. * e.g. "userId" → "user_id", "emailVerified" → "email_verified". * Keys that are already snake_case (no uppercase letters) are returned as-is. */ function toSnakeCase(key: string): string { return key.replace(/[A-Z]/g, (c) => `_${c.toLowerCase()}`); } /** * Drizzle returns rows as camelCase TypeScript objects (e.g. `userId`, not * `user_id`). The PostgresMigrationTarget upserts via raw SQL and uses the * column names as given. We must convert camelCase keys → snake_case before * building the INSERT statement so column names match the PG schema. * * Exception: the `insights` no-vector path already returns snake_case keys * from its raw SQL projection — toSnakeCase() is idempotent for already- * snake_case keys so this conversion is safe in all paths. * * For vector tables (insights), if `embedding` is absent from the source row * (because DrizzleMigrationSource omitted it in the no-vector projection), we * explicitly set it to null so the target ON CONFLICT UPDATE doesn't error. * * NOTE: insights.embedding is defined as `vector('embedding', { dimensions: * 1536 })` with no `.notNull()` in schema.ts — it accepts NULL. */ export function normaliseSourceRow( tableName: string, row: Record, sourceHasVector: boolean, ): Record { // Convert all camelCase keys to snake_case for raw-SQL target compatibility. const out: Record = {}; for (const [k, v] of Object.entries(row)) { out[toSnakeCase(k)] = v; } if (VECTOR_TABLES.has(tableName) && !sourceHasVector) { // Source cannot have embeddings — explicitly null them so ON CONFLICT // UPDATE doesn't try to write undefined. out['embedding'] = null; } return out; } /* ------------------------------------------------------------------ */ /* Precondition checks */ /* ------------------------------------------------------------------ */ export class MigrationPreconditionError extends Error { constructor( message: string, public readonly remediation: string, ) { super(message); this.name = 'MigrationPreconditionError'; } } /** * Verify target preconditions before writing any data. * * Checks: * 1. pgvector extension installed. * 2. User-data tables are empty (unless --allow-non-empty). */ export async function checkTargetPreconditions( target: MigrationTarget, allowNonEmpty: boolean, tablesToMigrate: string[], ): Promise { const hasVector = await target.hasPgvector(); if (!hasVector) { throw new MigrationPreconditionError( 'Target Postgres does not have the pgvector extension installed.', 'Run: CREATE EXTENSION IF NOT EXISTS vector; — or use the pgvector/pgvector:pg17 Docker image.', ); } if (!allowNonEmpty) { // Check the first non-empty user-data table. for (const table of tablesToMigrate) { const n = await target.count(table); if (n > 0) { throw new MigrationPreconditionError( `Target table "${table}" already contains ${n.toString()} rows.`, 'Pass --allow-non-empty to overwrite existing data (upsert semantics), ' + 'or point to an empty target database.', ); } } } } /* ------------------------------------------------------------------ */ /* Core migration runner */ /* ------------------------------------------------------------------ */ /** * Run the tier migration. * * @param source Adapter for reading source rows. * @param target Adapter for writing rows to target. * @param opts Migration options. * @param sourceHasVector True if the source tier supports vector columns. */ export async function runMigrateTier( source: MigrationSource, target: MigrationTarget, opts: MigrateTierOptions, sourceHasVector = false, ): Promise { const { dryRun, allowNonEmpty, batchSize, onProgress } = opts; const tablesToMigrate = getMigrationOrder(); // Preflight: gather row counts from source. onProgress('[migrate-tier] Gathering source row counts...'); const sourceCounts = new Map(); for (const table of tablesToMigrate) { const n = await source.count(table); sourceCounts.set(table, n); } // Log preflight summary. onProgress('[migrate-tier] Tables to migrate:'); for (const table of tablesToMigrate) { const n = sourceCounts.get(table) ?? 0; onProgress(` ${table}: ${n.toString()} rows`); } for (const skipped of SKIP_TABLES) { onProgress(` ${skipped}: SKIPPED (ephemeral/environment-specific)`); } // Vector column notice. if (!sourceHasVector) { onProgress( '[migrate-tier] NOTE: Source tier has no pgvector support. ' + 'insights.embedding will be NULL on all migrated rows.', ); } if (dryRun) { onProgress('[migrate-tier] DRY RUN — no writes will be made.'); const tables: TableMigrationResult[] = tablesToMigrate.map((t) => ({ table: t, rowsMigrated: 0, skipped: false, })); for (const skipped of SKIP_TABLES) { tables.push({ table: skipped, rowsMigrated: 0, skipped: true, skipReason: 'ephemeral' }); } return { tables, totalRows: 0, dryRun: true }; } // Check preconditions before writing. await checkTargetPreconditions(target, allowNonEmpty, tablesToMigrate); const results: TableMigrationResult[] = []; let totalRows = 0; for (const table of tablesToMigrate) { const sourceCount = sourceCounts.get(table) ?? 0; if (sourceCount === 0) { onProgress(`[migrate-tier] ${table}: 0 rows — skipping.`); results.push({ table, rowsMigrated: 0, skipped: false }); continue; } onProgress(`[migrate-tier] ${table}: migrating ${sourceCount.toString()} rows...`); let offset = 0; let tableTotal = 0; let lastSuccessfulId: string | undefined; try { while (offset < sourceCount) { const rows = await source.readTable(table, { limit: batchSize, offset }); if (rows.length === 0) break; const normalised = rows.map((row) => normaliseSourceRow(table, row, sourceHasVector)); await target.upsertBatch(table, normalised); lastSuccessfulId = rows[rows.length - 1]?.['id'] as string | undefined; tableTotal += rows.length; offset += rows.length; onProgress( `[migrate-tier] ${table}: ${tableTotal.toString()}/${sourceCount.toString()} rows written`, ); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); throw new Error( `[migrate-tier] Failed on table "${table}" after ${tableTotal.toString()} rows ` + `(last id: ${lastSuccessfulId ?? 'none'}). Error: ${errMsg}\n` + `Remediation: Re-run with --allow-non-empty to resume (upsert is idempotent).`, ); } results.push({ table, rowsMigrated: tableTotal, skipped: false }); totalRows += tableTotal; onProgress(`[migrate-tier] ${table}: done (${tableTotal.toString()} rows).`); } // Add skipped table records. for (const skipped of SKIP_TABLES) { results.push({ table: skipped, rowsMigrated: 0, skipped: true, skipReason: 'ephemeral or environment-specific — re-issue on target', }); } onProgress(`[migrate-tier] Complete. ${totalRows.toString()} total rows migrated.`); return { tables: results, totalRows, dryRun: false }; }