fix(storage): redact credentials in driver errors + advisory lock (FED-M1-10) (#479)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/push/publish Pipeline failed

This commit was merged in pull request #479.
This commit is contained in:
2026-04-20 02:02:57 +00:00
parent 1e2b8ac8de
commit dc4afee848
6 changed files with 261 additions and 70 deletions

View File

@@ -29,6 +29,7 @@
import postgres from 'postgres';
import * as schema from '@mosaicstack/db';
import { sql as drizzleSql } from '@mosaicstack/db';
import { redactErrMsg } from './redact-error.js';
/* ------------------------------------------------------------------ */
/* Types */
@@ -72,6 +73,20 @@ export interface MigrationTarget {
/** Close the target connection. */
close(): Promise<void>;
/**
* Attempt to acquire a session-level Postgres advisory lock for migrate-tier.
* Returns true if the lock was acquired, false if another process holds it.
* Targets that do not support advisory locks (e.g. test mocks) may omit this
* by not implementing the method — the caller skips locking gracefully.
*/
tryAcquireAdvisoryLock?(): Promise<boolean>;
/**
* Release the session-level advisory lock acquired by tryAcquireAdvisoryLock.
* Must be called in a finally block.
*/
releaseAdvisoryLock?(): Promise<void>;
}
export interface MigrateTierOptions {
@@ -107,9 +122,28 @@ export interface MigrateTierResult {
/**
* SKIP_TABLES: ephemeral or environment-specific tables not worth migrating.
*
* - sessions: TTL'd auth sessions — invalid in new environment.
* - verifications: one-time tokens (email verify, etc.) — already expired.
* - admin_tokens: hashed tokens bound to old environment keys — re-issue.
* WHY these tables are skipped:
* - sessions: TTL'd auth sessions — they are invalid in the new environment
* and would immediately expire or fail JWT verification anyway.
* - verifications: one-time tokens (email verify, password-reset links, etc.)
* — they have already expired or been consumed; re-sending is
* the correct action on the new environment.
* - admin_tokens: hashed tokens bound to the old environment's secret keys —
* the hash is environment-specific and must be re-issued on
* the target.
*
* WHY these tables are NOT skipped (intentionally migrated):
* - accounts (OAuth tokens): durable credentials bound to the user's identity,
* not to the deployment environment. OAuth tokens survive environment changes
* and should follow the user to the federated tier.
* - provider_credentials (AI provider keys): durable, user-owned API keys for
* AI providers (e.g. OpenAI, Anthropic). These are bound to the user, not
* the server, and must be preserved so AI features work immediately after
* migration.
*
* OPERATOR NOTE: If migrating to a shared or multi-tenant federated tier, review
* whether `accounts` and `provider_credentials` should be wiped post-migration
* to prevent unintended cross-tenant credential exposure.
*/
export const SKIP_TABLES = new Set(['sessions', 'verifications', 'admin_tokens']);
@@ -482,6 +516,33 @@ export class PostgresMigrationTarget implements MigrationTarget {
return rows.length > 0;
}
/**
* Attempt to acquire a non-blocking session-level Postgres advisory lock
* keyed by hashtext('mosaic-migrate-tier'). Returns true if acquired,
* false if another session already holds the lock.
*
* The lock is session-scoped: it is automatically released when the
* connection closes, and also explicitly released via releaseAdvisoryLock().
*/
async tryAcquireAdvisoryLock(): Promise<boolean> {
const rows = await this.sql`
SELECT pg_try_advisory_lock(hashtext('mosaic-migrate-tier')) AS acquired
`;
const row = rows[0] as { acquired: boolean } | undefined;
return row?.acquired ?? false;
}
/**
* Release the session-level advisory lock previously acquired by
* tryAcquireAdvisoryLock(). Safe to call even if the lock was not held
* (pg_advisory_unlock returns false but does not throw).
*/
async releaseAdvisoryLock(): Promise<void> {
await this.sql`
SELECT pg_advisory_unlock(hashtext('mosaic-migrate-tier'))
`;
}
async close(): Promise<void> {
await this.sql.end();
}
@@ -647,68 +708,92 @@ export async function runMigrateTier(
return { tables, totalRows: 0, dryRun: true };
}
// Check preconditions before writing.
await checkTargetPreconditions(target, allowNonEmpty, tablesToMigrate);
// Acquire a Postgres advisory lock on the target BEFORE checking preconditions
// so that two concurrent invocations cannot both pass the non-empty guard and
// race each other. Use non-blocking pg_try_advisory_lock so we fail fast
// instead of deadlocking.
//
// Targets that don't implement tryAcquireAdvisoryLock (e.g. test mocks) skip
// this step — the optional chaining guard handles that case.
const lockAcquired = target.tryAcquireAdvisoryLock ? await target.tryAcquireAdvisoryLock() : true; // mocks / test doubles — no locking needed
const results: TableMigrationResult[] = [];
let totalRows = 0;
if (!lockAcquired) {
throw new Error(
'Another migrate-tier process is already running against this target. ' +
'Wait for it to complete or check for stuck locks via ' +
"SELECT * FROM pg_locks WHERE locktype='advisory'.",
);
}
for (const table of tablesToMigrate) {
const sourceCount = sourceCounts.get(table) ?? 0;
try {
// Check preconditions before writing.
await checkTargetPreconditions(target, allowNonEmpty, tablesToMigrate);
if (sourceCount === 0) {
onProgress(`[migrate-tier] ${table}: 0 rows — skipping.`);
results.push({ table, rowsMigrated: 0, skipped: false });
continue;
}
const results: TableMigrationResult[] = [];
let totalRows = 0;
onProgress(`[migrate-tier] ${table}: migrating ${sourceCount.toString()} rows...`);
for (const table of tablesToMigrate) {
const sourceCount = sourceCounts.get(table) ?? 0;
let offset = 0;
let tableTotal = 0;
let lastSuccessfulId: string | undefined;
if (sourceCount === 0) {
onProgress(`[migrate-tier] ${table}: 0 rows — skipping.`);
results.push({ table, rowsMigrated: 0, skipped: false });
continue;
}
try {
while (offset < sourceCount) {
const rows = await source.readTable(table, { limit: batchSize, offset });
if (rows.length === 0) break;
onProgress(`[migrate-tier] ${table}: migrating ${sourceCount.toString()} rows...`);
const normalised = rows.map((row) => normaliseSourceRow(table, row, sourceHasVector));
let offset = 0;
let tableTotal = 0;
let lastSuccessfulId: string | undefined;
await target.upsertBatch(table, normalised);
try {
while (offset < sourceCount) {
const rows = await source.readTable(table, { limit: batchSize, offset });
if (rows.length === 0) break;
lastSuccessfulId = rows[rows.length - 1]?.['id'] as string | undefined;
tableTotal += rows.length;
offset += rows.length;
const normalised = rows.map((row) => normaliseSourceRow(table, row, sourceHasVector));
onProgress(
`[migrate-tier] ${table}: ${tableTotal.toString()}/${sourceCount.toString()} rows written`,
await target.upsertBatch(table, normalised);
lastSuccessfulId = rows[rows.length - 1]?.['id'] as string | undefined;
tableTotal += rows.length;
offset += rows.length;
onProgress(
`[migrate-tier] ${table}: ${tableTotal.toString()}/${sourceCount.toString()} rows written`,
);
}
} catch (err) {
const errMsg = redactErrMsg(err instanceof Error ? err.message : String(err));
throw new Error(
`[migrate-tier] Failed on table "${table}" after ${tableTotal.toString()} rows ` +
`(last id: ${lastSuccessfulId ?? 'none'}). Error: ${errMsg}\n` +
`Remediation: Re-run with --allow-non-empty to resume (upsert is idempotent).`,
);
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
throw new Error(
`[migrate-tier] Failed on table "${table}" after ${tableTotal.toString()} rows ` +
`(last id: ${lastSuccessfulId ?? 'none'}). Error: ${errMsg}\n` +
`Remediation: Re-run with --allow-non-empty to resume (upsert is idempotent).`,
);
results.push({ table, rowsMigrated: tableTotal, skipped: false });
totalRows += tableTotal;
onProgress(`[migrate-tier] ${table}: done (${tableTotal.toString()} rows).`);
}
results.push({ table, rowsMigrated: tableTotal, skipped: false });
totalRows += tableTotal;
onProgress(`[migrate-tier] ${table}: done (${tableTotal.toString()} rows).`);
}
// Add skipped table records.
for (const skipped of SKIP_TABLES) {
results.push({
table: skipped,
rowsMigrated: 0,
skipped: true,
skipReason: 'ephemeral or environment-specific — re-issue on target',
});
}
// Add skipped table records.
for (const skipped of SKIP_TABLES) {
results.push({
table: skipped,
rowsMigrated: 0,
skipped: true,
skipReason: 'ephemeral or environment-specific — re-issue on target',
});
onProgress(`[migrate-tier] Complete. ${totalRows.toString()} total rows migrated.`);
return { tables: results, totalRows, dryRun: false };
} finally {
// Release the advisory lock regardless of success or failure.
if (target.releaseAdvisoryLock) {
await target.releaseAdvisoryLock();
}
}
onProgress(`[migrate-tier] Complete. ${totalRows.toString()} total rows migrated.`);
return { tables: results, totalRows, dryRun: false };
}