feat(storage): mosaic storage migrate-tier with dry-run + idempotency (FED-M1-05) (#474)
This commit was merged in pull request #474.
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import type { Command } from 'commander';
|
||||
import type { MigrationSource } from './migrate-tier.js';
|
||||
|
||||
/**
|
||||
* Reads the DATABASE_URL environment variable and redacts the password portion.
|
||||
@@ -209,6 +210,203 @@ export function registerStorageCommand(parent: Command): void {
|
||||
}
|
||||
});
|
||||
|
||||
// ── storage migrate-tier ─────────────────────────────────────────────────
|
||||
|
||||
storage
|
||||
.command('migrate-tier')
|
||||
.description('Migrate data from tier: local/standalone → tier: federated (Postgres + pgvector)')
|
||||
.requiredOption(
|
||||
'--to <tier>',
|
||||
'Target tier to migrate to (only "federated" is supported)',
|
||||
'federated',
|
||||
)
|
||||
.requiredOption('--target-url <url>', 'Target federated Postgres connection string (required)')
|
||||
.option(
|
||||
'--source-config <path>',
|
||||
'Path to mosaic.config.json (default: cwd/mosaic.config.json)',
|
||||
)
|
||||
.option('--dry-run', 'Print what would be migrated without writing anything')
|
||||
.option('--yes', 'Skip interactive confirmation prompt (required for non-TTY environments)')
|
||||
.option('--batch-size <n>', 'Rows per transaction batch', '1000')
|
||||
.option('--allow-non-empty', 'Allow writing to a non-empty target (upsert — idempotent)')
|
||||
.action(
|
||||
async (opts: {
|
||||
to: string;
|
||||
targetUrl: string;
|
||||
sourceConfig?: string;
|
||||
dryRun?: boolean;
|
||||
yes?: boolean;
|
||||
batchSize?: string;
|
||||
allowNonEmpty?: boolean;
|
||||
}) => {
|
||||
if (opts.to !== 'federated') {
|
||||
console.error(
|
||||
`[migrate-tier] --to "${opts.to}" is not supported. Only "federated" is allowed.`,
|
||||
);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
const batchSize = parseInt(opts.batchSize ?? '1000', 10);
|
||||
if (isNaN(batchSize) || batchSize < 1) {
|
||||
console.error('[migrate-tier] --batch-size must be a positive integer.');
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
// Redact target URL password for display.
|
||||
function redactUrl(url: string): string {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
if (parsed.password) parsed.password = '***';
|
||||
return parsed.toString();
|
||||
} catch {
|
||||
return url.replace(/:([^@/]+)@/, ':***@');
|
||||
}
|
||||
}
|
||||
|
||||
const redactedTarget = redactUrl(opts.targetUrl);
|
||||
const isDryRun = opts.dryRun ?? false;
|
||||
const allowNonEmpty = opts.allowNonEmpty ?? false;
|
||||
|
||||
// Determine source tier from environment.
|
||||
const sourceTier = activeTier();
|
||||
const sourceDesc = configSource();
|
||||
|
||||
console.log('');
|
||||
console.log('[migrate-tier] ─────────────────────────────────────────');
|
||||
console.log(`[migrate-tier] Source tier: ${sourceTier}`);
|
||||
console.log(`[migrate-tier] Source: ${sourceDesc}`);
|
||||
console.log(`[migrate-tier] Target tier: federated (Postgres + pgvector)`);
|
||||
console.log(`[migrate-tier] Target: ${redactedTarget}`);
|
||||
console.log(`[migrate-tier] Batch size: ${batchSize.toString()}`);
|
||||
console.log(`[migrate-tier] Dry run: ${isDryRun.toString()}`);
|
||||
console.log(`[migrate-tier] Allow non-empty: ${allowNonEmpty.toString()}`);
|
||||
console.log('[migrate-tier] ─────────────────────────────────────────');
|
||||
console.log('');
|
||||
|
||||
// Lazy-import core migration logic to keep the CLI thin.
|
||||
const {
|
||||
runMigrateTier,
|
||||
PostgresMigrationTarget,
|
||||
DrizzleMigrationSource,
|
||||
getMigrationOrder,
|
||||
} = await import('./migrate-tier.js');
|
||||
|
||||
// Build source adapter using Drizzle-backed DrizzleMigrationSource.
|
||||
// Both local (PGlite) and standalone (Postgres) sources expose the same
|
||||
// normalized Drizzle schema — this is where the actual domain data lives.
|
||||
let sourceAdapter: MigrationSource;
|
||||
if (sourceTier === 'pglite') {
|
||||
const { createPgliteDb } = await import('@mosaicstack/db');
|
||||
const pgliteDataDir = process.env['PGLITE_DATA_DIR'];
|
||||
if (!pgliteDataDir) {
|
||||
console.error(
|
||||
'[migrate-tier] PGLITE_DATA_DIR is not set. ' +
|
||||
'Cannot open PGlite source — set it to the data directory path.',
|
||||
);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
const handle = createPgliteDb(pgliteDataDir);
|
||||
// Local/PGlite sources do not have pgvector registered — the embedding
|
||||
// column is omitted from the insights SELECT and set to null on target.
|
||||
sourceAdapter = new DrizzleMigrationSource(handle.db, /* sourceHasVector= */ false);
|
||||
} else {
|
||||
const { createDb } = await import('@mosaicstack/db');
|
||||
const url = process.env['DATABASE_URL'];
|
||||
if (!url) {
|
||||
console.error('[migrate-tier] DATABASE_URL is not set for postgres source.');
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
const handle = createDb(url);
|
||||
// Standalone Postgres may or may not have pgvector — assume it does not
|
||||
// (it is a non-federated tier) so embedding is treated as null.
|
||||
sourceAdapter = new DrizzleMigrationSource(handle.db, /* sourceHasVector= */ false);
|
||||
}
|
||||
|
||||
// Print per-table row counts for the confirmation prompt.
|
||||
const tablesToMigrate = getMigrationOrder();
|
||||
const counts: Array<{ table: string; count: number }> = [];
|
||||
for (const table of tablesToMigrate) {
|
||||
const n = await sourceAdapter.count(table);
|
||||
counts.push({ table, count: n });
|
||||
}
|
||||
|
||||
console.log('[migrate-tier] Source row counts:');
|
||||
for (const { table, count } of counts) {
|
||||
console.log(` ${table}: ${count.toString()}`);
|
||||
}
|
||||
console.log(' sessions: SKIPPED (ephemeral)');
|
||||
console.log(' verifications: SKIPPED (ephemeral)');
|
||||
console.log(' admin_tokens: SKIPPED (environment-specific)');
|
||||
console.log('');
|
||||
|
||||
// Interactive confirmation unless --yes or dry-run.
|
||||
const isTTY = process.stdin.isTTY;
|
||||
if (!isDryRun) {
|
||||
if (!opts.yes && !isTTY) {
|
||||
console.error(
|
||||
'[migrate-tier] Not running in a TTY and --yes was not passed. ' +
|
||||
'Pass --yes to confirm in headless environments.',
|
||||
);
|
||||
process.exitCode = 1;
|
||||
await sourceAdapter.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!opts.yes) {
|
||||
const { createInterface } = await import('node:readline');
|
||||
const rl = createInterface({ input: process.stdin, output: process.stdout });
|
||||
const answer = await new Promise<string>((resolve) => {
|
||||
rl.question(`This will WRITE to ${redactedTarget}. Continue? [y/N] `, (ans) => {
|
||||
rl.close();
|
||||
resolve(ans);
|
||||
});
|
||||
});
|
||||
if (answer.trim().toLowerCase() !== 'y') {
|
||||
console.log('[migrate-tier] Aborted.');
|
||||
await sourceAdapter.close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const target = new PostgresMigrationTarget(opts.targetUrl);
|
||||
|
||||
try {
|
||||
const result = await runMigrateTier(
|
||||
sourceAdapter,
|
||||
target,
|
||||
{
|
||||
targetUrl: opts.targetUrl,
|
||||
dryRun: isDryRun,
|
||||
allowNonEmpty,
|
||||
batchSize,
|
||||
onProgress: (msg) => console.log(msg),
|
||||
},
|
||||
/* sourceHasVector= */ sourceTier === 'postgres',
|
||||
);
|
||||
|
||||
if (result.dryRun) {
|
||||
console.log('[migrate-tier] Dry run complete. No data was written.');
|
||||
} else {
|
||||
console.log(
|
||||
`[migrate-tier] Migration complete. ${result.totalRows.toString()} rows migrated.`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[migrate-tier] ERROR: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
process.exitCode = 1;
|
||||
} finally {
|
||||
await Promise.all([sourceAdapter.close(), target.close()]);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// ── storage migrate ──────────────────────────────────────────────────────
|
||||
|
||||
storage
|
||||
|
||||
Reference in New Issue
Block a user