/** * Tier Detection — pre-flight service reachability probes. * * Lifted from apps/gateway/src/bootstrap/tier-detector.ts so both the gateway * and the mosaic CLI can share the same probe logic without duplicating code or * creating circular workspace dependencies. * * Library choices: * - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm). * - Valkey: `ioredis` (compatible with Valkey; same URL convention used by bullmq). */ import postgres from 'postgres'; import { Redis } from 'ioredis'; import { redactErrMsg } from './redact-error.js'; /* ------------------------------------------------------------------ */ /* Local structural type — avoids circular dependency */ /* ------------------------------------------------------------------ */ /** * Minimal structural shape required for tier detection. * Mirrors the relevant fields of MosaicConfig (from @mosaicstack/config) without * creating a dependency cycle (config depends on storage for StorageConfig). * Any object that satisfies MosaicConfig also satisfies this type. */ export interface TierConfig { tier: 'local' | 'standalone' | 'federated'; storage: | { type: 'pglite'; dataDir?: string } | { type: 'postgres'; url: string; enableVector?: boolean } | { type: 'files'; dataDir: string; format?: 'json' | 'md' }; queue: { type: string; url?: string }; } /* ------------------------------------------------------------------ */ /* Public types */ /* ------------------------------------------------------------------ */ export interface ServiceCheck { name: 'postgres' | 'valkey' | 'pgvector'; status: 'ok' | 'fail' | 'skipped'; host?: string; port?: number; durationMs: number; error?: { message: string; remediation: string }; } export interface TierHealthReport { tier: 'local' | 'standalone' | 'federated'; configPath?: string; overall: 'green' | 'yellow' | 'red'; services: ServiceCheck[]; } /* ------------------------------------------------------------------ */ /* Structured error type */ /* ------------------------------------------------------------------ */ export class TierDetectionError extends Error { public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config'; public readonly host: string; public readonly port: number; public readonly remediation: string; constructor(opts: { service: 'postgres' | 'valkey' | 'pgvector' | 'config'; host: string; port: number; remediation: string; cause?: unknown; }) { const message = `[tier-detector] ${opts.service} unreachable or unusable at ` + `${opts.host}:${opts.port} — ${opts.remediation}`; super(message, { cause: opts.cause }); this.name = 'TierDetectionError'; this.service = opts.service; this.host = opts.host; this.port = opts.port; this.remediation = opts.remediation; } } /* ------------------------------------------------------------------ */ /* URL helpers */ /* ------------------------------------------------------------------ */ /** Extract host and port from a URL string, returning safe fallbacks on parse failure. */ function parseHostPort(url: string, defaultPort: number): { host: string; port: number } { try { const parsed = new URL(url); const host = parsed.hostname || 'unknown'; const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort; return { host, port }; } catch { return { host: 'unknown', port: defaultPort }; } } /* ------------------------------------------------------------------ */ /* Internal probe results */ /* ------------------------------------------------------------------ */ interface ProbeResult { host: string; port: number; durationMs: number; error?: { message: string; remediation: string }; } /* ------------------------------------------------------------------ */ /* Postgres probe */ /* ------------------------------------------------------------------ */ async function probePostgres(url: string): Promise { const { host, port } = parseHostPort(url, 5432); let sql: ReturnType | undefined; try { sql = postgres(url, { max: 1, connect_timeout: 5, idle_timeout: 5, }); // Run a trivial query to confirm connectivity. await sql`SELECT 1`; } catch (cause) { throw new TierDetectionError({ service: 'postgres', host, port, remediation: 'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`', cause, }); } finally { if (sql) { await sql.end({ timeout: 2 }).catch(() => { // Ignore cleanup errors — we already have what we need. }); } } } async function probePostgresMeasured(url: string): Promise { const { host, port } = parseHostPort(url, 5432); const start = Date.now(); let sql: ReturnType | undefined; try { sql = postgres(url, { max: 1, connect_timeout: 5, idle_timeout: 5, }); await sql`SELECT 1`; return { host, port, durationMs: Date.now() - start }; } catch (cause) { return { host, port, durationMs: Date.now() - start, error: { message: redactErrMsg(cause instanceof Error ? cause.message : String(cause)), remediation: 'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`', }, }; } finally { if (sql) { await sql.end({ timeout: 2 }).catch(() => {}); } } } /* ------------------------------------------------------------------ */ /* pgvector probe */ /* ------------------------------------------------------------------ */ async function probePgvector(url: string): Promise { const { host, port } = parseHostPort(url, 5432); let sql: ReturnType | undefined; try { sql = postgres(url, { max: 1, connect_timeout: 5, idle_timeout: 5, }); // This succeeds whether the extension is already installed or freshly created. // It errors only if the pgvector shared library is missing from the Postgres binary. await sql`CREATE EXTENSION IF NOT EXISTS vector`; } catch (cause) { const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : ''; const isLibraryMissing = causeMsg.includes('extension "vector" is not available'); const remediation = isLibraryMissing ? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.' : 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.'; throw new TierDetectionError({ service: 'pgvector', host, port, remediation, cause, }); } finally { if (sql) { await sql.end({ timeout: 2 }).catch(() => { // Ignore cleanup errors. }); } } } async function probePgvectorMeasured(url: string): Promise { const { host, port } = parseHostPort(url, 5432); const start = Date.now(); let sql: ReturnType | undefined; try { sql = postgres(url, { max: 1, connect_timeout: 5, idle_timeout: 5, }); await sql`CREATE EXTENSION IF NOT EXISTS vector`; return { host, port, durationMs: Date.now() - start }; } catch (cause) { const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : ''; const isLibraryMissing = causeMsg.includes('extension "vector" is not available'); const remediation = isLibraryMissing ? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.' : 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.'; return { host, port, durationMs: Date.now() - start, error: { message: redactErrMsg(cause instanceof Error ? cause.message : String(cause)), remediation, }, }; } finally { if (sql) { await sql.end({ timeout: 2 }).catch(() => {}); } } } /* ------------------------------------------------------------------ */ /* Valkey probe */ /* ------------------------------------------------------------------ */ const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; async function probeValkey(url: string): Promise { const { host, port } = parseHostPort(url, 6380); const client = new Redis(url, { enableReadyCheck: false, maxRetriesPerRequest: 0, retryStrategy: () => null, // no retries — fail fast lazyConnect: true, connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt }); try { await client.connect(); const pong = await client.ping(); if (pong !== 'PONG') { throw new Error(`Unexpected PING response: ${pong}`); } } catch (cause) { throw new TierDetectionError({ service: 'valkey', host, port, remediation: 'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`', cause, }); } finally { client.disconnect(); } } async function probeValkeyMeasured(url: string): Promise { const { host, port } = parseHostPort(url, 6380); const start = Date.now(); const client = new Redis(url, { enableReadyCheck: false, maxRetriesPerRequest: 0, retryStrategy: () => null, lazyConnect: true, connectTimeout: 5000, }); try { await client.connect(); const pong = await client.ping(); if (pong !== 'PONG') { throw new Error(`Unexpected PING response: ${pong}`); } return { host, port, durationMs: Date.now() - start }; } catch (cause) { return { host, port, durationMs: Date.now() - start, error: { message: redactErrMsg(cause instanceof Error ? cause.message : String(cause)), remediation: 'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`', }, }; } finally { client.disconnect(); } } /* ------------------------------------------------------------------ */ /* Public entry points */ /* ------------------------------------------------------------------ */ /** * Assert that all services required by `config.tier` are reachable. * * - `local` — no-op (PGlite is in-process; no external services). * - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq'). * - `federated` — assert Postgres + Valkey + pgvector installability. * * Throws `TierDetectionError` on the first failure with host:port and * a remediation hint. */ export async function detectAndAssertTier(config: TierConfig): Promise { if (config.tier === 'local') { // PGlite runs in-process — nothing to probe. return; } const pgUrl = config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic'; const valkeyUrl = config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null; if (config.tier === 'standalone') { await probePostgres(pgUrl); if (valkeyUrl) { await probeValkey(valkeyUrl); } return; } // tier === 'federated' // Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL. if (config.queue.type !== 'bullmq') { throw new TierDetectionError({ service: 'config', host: 'localhost', port: 0, remediation: "Federated tier requires queue.type === 'bullmq'. " + "Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.", }); } const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL; await probePostgres(pgUrl); await probeValkey(federatedValkeyUrl); await probePgvector(pgUrl); } /** * Non-throwing variant for `mosaic gateway doctor`. * * Probes ALL required services even if some fail, returning a structured report. * Services not required for the current tier are reported as `skipped`. * * Overall status: * - `green` — all required services OK * - `yellow` — all required services OK, but a non-critical check failed * (currently unused — reserved for future optional probes) * - `red` — at least one required service failed */ export async function probeServiceHealth( config: TierConfig, configPath?: string, ): Promise { const tier = config.tier; // local tier: PGlite is in-process, no external services needed. if (tier === 'local') { return { tier, configPath, overall: 'green', services: [ { name: 'postgres', status: 'skipped', durationMs: 0 }, { name: 'valkey', status: 'skipped', durationMs: 0 }, { name: 'pgvector', status: 'skipped', durationMs: 0 }, ], }; } const pgUrl = config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic'; const valkeyUrl = config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null; const services: ServiceCheck[] = []; let hasFailure = false; if (tier === 'standalone') { // Postgres — required const pgResult = await probePostgresMeasured(pgUrl); if (pgResult.error) { hasFailure = true; services.push({ name: 'postgres', status: 'fail', host: pgResult.host, port: pgResult.port, durationMs: pgResult.durationMs, error: pgResult.error, }); } else { services.push({ name: 'postgres', status: 'ok', host: pgResult.host, port: pgResult.port, durationMs: pgResult.durationMs, }); } // Valkey — required if bullmq if (valkeyUrl) { const vkResult = await probeValkeyMeasured(valkeyUrl); if (vkResult.error) { hasFailure = true; services.push({ name: 'valkey', status: 'fail', host: vkResult.host, port: vkResult.port, durationMs: vkResult.durationMs, error: vkResult.error, }); } else { services.push({ name: 'valkey', status: 'ok', host: vkResult.host, port: vkResult.port, durationMs: vkResult.durationMs, }); } } else { services.push({ name: 'valkey', status: 'skipped', durationMs: 0 }); } // pgvector — not required for standalone services.push({ name: 'pgvector', status: 'skipped', durationMs: 0 }); return { tier, configPath, overall: hasFailure ? 'red' : 'green', services, }; } // tier === 'federated' // Postgres — required const pgResult = await probePostgresMeasured(pgUrl); if (pgResult.error) { hasFailure = true; services.push({ name: 'postgres', status: 'fail', host: pgResult.host, port: pgResult.port, durationMs: pgResult.durationMs, error: pgResult.error, }); } else { services.push({ name: 'postgres', status: 'ok', host: pgResult.host, port: pgResult.port, durationMs: pgResult.durationMs, }); } // Valkey — required for federated (queue.type must be bullmq) if (config.queue.type !== 'bullmq') { hasFailure = true; services.push({ name: 'valkey', status: 'fail', host: 'localhost', port: 0, durationMs: 0, error: { message: "Federated tier requires queue.type === 'bullmq'", remediation: "Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.", }, }); } else { const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL; const vkResult = await probeValkeyMeasured(federatedValkeyUrl); if (vkResult.error) { hasFailure = true; services.push({ name: 'valkey', status: 'fail', host: vkResult.host, port: vkResult.port, durationMs: vkResult.durationMs, error: vkResult.error, }); } else { services.push({ name: 'valkey', status: 'ok', host: vkResult.host, port: vkResult.port, durationMs: vkResult.durationMs, }); } } // pgvector — required for federated const pvResult = await probePgvectorMeasured(pgUrl); if (pvResult.error) { hasFailure = true; services.push({ name: 'pgvector', status: 'fail', host: pvResult.host, port: pvResult.port, durationMs: pvResult.durationMs, error: pvResult.error, }); } else { services.push({ name: 'pgvector', status: 'ok', host: pvResult.host, port: pvResult.port, durationMs: pvResult.durationMs, }); } return { tier, configPath, overall: hasFailure ? 'red' : 'green', services, }; }