feat(gateway,storage): mosaic gateway doctor with tier health JSON (FED-M1-06) (#475)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful

This commit was merged in pull request #475.
This commit is contained in:
2026-04-20 01:00:39 +00:00
parent ccad30dd27
commit 1a4b1ebbf1
11 changed files with 1233 additions and 289 deletions

View File

@@ -0,0 +1,555 @@
/**
* Tier Detection — pre-flight service reachability probes.
*
* Lifted from apps/gateway/src/bootstrap/tier-detector.ts so both the gateway
* and the mosaic CLI can share the same probe logic without duplicating code or
* creating circular workspace dependencies.
*
* Library choices:
* - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm).
* - Valkey: `ioredis` (compatible with Valkey; same URL convention used by bullmq).
*/
import postgres from 'postgres';
import { Redis } from 'ioredis';
/* ------------------------------------------------------------------ */
/* Local structural type — avoids circular dependency */
/* ------------------------------------------------------------------ */
/**
* Minimal structural shape required for tier detection.
* Mirrors the relevant fields of MosaicConfig (from @mosaicstack/config) without
* creating a dependency cycle (config depends on storage for StorageConfig).
* Any object that satisfies MosaicConfig also satisfies this type.
*/
export interface TierConfig {
tier: 'local' | 'standalone' | 'federated';
storage:
| { type: 'pglite'; dataDir?: string }
| { type: 'postgres'; url: string; enableVector?: boolean }
| { type: 'files'; dataDir: string; format?: 'json' | 'md' };
queue: { type: string; url?: string };
}
/* ------------------------------------------------------------------ */
/* Public types */
/* ------------------------------------------------------------------ */
export interface ServiceCheck {
name: 'postgres' | 'valkey' | 'pgvector';
status: 'ok' | 'fail' | 'skipped';
host?: string;
port?: number;
durationMs: number;
error?: { message: string; remediation: string };
}
export interface TierHealthReport {
tier: 'local' | 'standalone' | 'federated';
configPath?: string;
overall: 'green' | 'yellow' | 'red';
services: ServiceCheck[];
}
/* ------------------------------------------------------------------ */
/* Structured error type */
/* ------------------------------------------------------------------ */
export class TierDetectionError extends Error {
public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config';
public readonly host: string;
public readonly port: number;
public readonly remediation: string;
constructor(opts: {
service: 'postgres' | 'valkey' | 'pgvector' | 'config';
host: string;
port: number;
remediation: string;
cause?: unknown;
}) {
const message =
`[tier-detector] ${opts.service} unreachable or unusable at ` +
`${opts.host}:${opts.port}${opts.remediation}`;
super(message, { cause: opts.cause });
this.name = 'TierDetectionError';
this.service = opts.service;
this.host = opts.host;
this.port = opts.port;
this.remediation = opts.remediation;
}
}
/* ------------------------------------------------------------------ */
/* URL helpers */
/* ------------------------------------------------------------------ */
/** Extract host and port from a URL string, returning safe fallbacks on parse failure. */
function parseHostPort(url: string, defaultPort: number): { host: string; port: number } {
try {
const parsed = new URL(url);
const host = parsed.hostname || 'unknown';
const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort;
return { host, port };
} catch {
return { host: 'unknown', port: defaultPort };
}
}
/* ------------------------------------------------------------------ */
/* Internal probe results */
/* ------------------------------------------------------------------ */
interface ProbeResult {
host: string;
port: number;
durationMs: number;
error?: { message: string; remediation: string };
}
/* ------------------------------------------------------------------ */
/* Postgres probe */
/* ------------------------------------------------------------------ */
async function probePostgres(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// Run a trivial query to confirm connectivity.
await sql`SELECT 1`;
} catch (cause) {
throw new TierDetectionError({
service: 'postgres',
host,
port,
remediation:
'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`',
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors — we already have what we need.
});
}
}
}
async function probePostgresMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 5432);
const start = Date.now();
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
await sql`SELECT 1`;
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
return {
host,
port,
durationMs: Date.now() - start,
error: {
message: cause instanceof Error ? cause.message : String(cause),
remediation:
'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`',
},
};
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {});
}
}
}
/* ------------------------------------------------------------------ */
/* pgvector probe */
/* ------------------------------------------------------------------ */
async function probePgvector(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// This succeeds whether the extension is already installed or freshly created.
// It errors only if the pgvector shared library is missing from the Postgres binary.
await sql`CREATE EXTENSION IF NOT EXISTS vector`;
} catch (cause) {
const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : '';
const isLibraryMissing = causeMsg.includes('extension "vector" is not available');
const remediation = isLibraryMissing
? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.'
: 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.';
throw new TierDetectionError({
service: 'pgvector',
host,
port,
remediation,
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors.
});
}
}
}
async function probePgvectorMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 5432);
const start = Date.now();
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
await sql`CREATE EXTENSION IF NOT EXISTS vector`;
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : '';
const isLibraryMissing = causeMsg.includes('extension "vector" is not available');
const remediation = isLibraryMissing
? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.'
: 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.';
return {
host,
port,
durationMs: Date.now() - start,
error: { message: cause instanceof Error ? cause.message : String(cause), remediation },
};
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {});
}
}
}
/* ------------------------------------------------------------------ */
/* Valkey probe */
/* ------------------------------------------------------------------ */
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
async function probeValkey(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 6380);
const client = new Redis(url, {
enableReadyCheck: false,
maxRetriesPerRequest: 0,
retryStrategy: () => null, // no retries — fail fast
lazyConnect: true,
connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt
});
try {
await client.connect();
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Unexpected PING response: ${pong}`);
}
} catch (cause) {
throw new TierDetectionError({
service: 'valkey',
host,
port,
remediation:
'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`',
cause,
});
} finally {
client.disconnect();
}
}
async function probeValkeyMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 6380);
const start = Date.now();
const client = new Redis(url, {
enableReadyCheck: false,
maxRetriesPerRequest: 0,
retryStrategy: () => null,
lazyConnect: true,
connectTimeout: 5000,
});
try {
await client.connect();
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Unexpected PING response: ${pong}`);
}
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
return {
host,
port,
durationMs: Date.now() - start,
error: {
message: cause instanceof Error ? cause.message : String(cause),
remediation:
'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`',
},
};
} finally {
client.disconnect();
}
}
/* ------------------------------------------------------------------ */
/* Public entry points */
/* ------------------------------------------------------------------ */
/**
* Assert that all services required by `config.tier` are reachable.
*
* - `local` — no-op (PGlite is in-process; no external services).
* - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq').
* - `federated` — assert Postgres + Valkey + pgvector installability.
*
* Throws `TierDetectionError` on the first failure with host:port and
* a remediation hint.
*/
export async function detectAndAssertTier(config: TierConfig): Promise<void> {
if (config.tier === 'local') {
// PGlite runs in-process — nothing to probe.
return;
}
const pgUrl =
config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic';
const valkeyUrl =
config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null;
if (config.tier === 'standalone') {
await probePostgres(pgUrl);
if (valkeyUrl) {
await probeValkey(valkeyUrl);
}
return;
}
// tier === 'federated'
// Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL.
if (config.queue.type !== 'bullmq') {
throw new TierDetectionError({
service: 'config',
host: 'localhost',
port: 0,
remediation:
"Federated tier requires queue.type === 'bullmq'. " +
"Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.",
});
}
const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL;
await probePostgres(pgUrl);
await probeValkey(federatedValkeyUrl);
await probePgvector(pgUrl);
}
/**
* Non-throwing variant for `mosaic gateway doctor`.
*
* Probes ALL required services even if some fail, returning a structured report.
* Services not required for the current tier are reported as `skipped`.
*
* Overall status:
* - `green` — all required services OK
* - `yellow` — all required services OK, but a non-critical check failed
* (currently unused — reserved for future optional probes)
* - `red` — at least one required service failed
*/
export async function probeServiceHealth(
config: TierConfig,
configPath?: string,
): Promise<TierHealthReport> {
const tier = config.tier;
// local tier: PGlite is in-process, no external services needed.
if (tier === 'local') {
return {
tier,
configPath,
overall: 'green',
services: [
{ name: 'postgres', status: 'skipped', durationMs: 0 },
{ name: 'valkey', status: 'skipped', durationMs: 0 },
{ name: 'pgvector', status: 'skipped', durationMs: 0 },
],
};
}
const pgUrl =
config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic';
const valkeyUrl =
config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null;
const services: ServiceCheck[] = [];
let hasFailure = false;
if (tier === 'standalone') {
// Postgres — required
const pgResult = await probePostgresMeasured(pgUrl);
if (pgResult.error) {
hasFailure = true;
services.push({
name: 'postgres',
status: 'fail',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
error: pgResult.error,
});
} else {
services.push({
name: 'postgres',
status: 'ok',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
});
}
// Valkey — required if bullmq
if (valkeyUrl) {
const vkResult = await probeValkeyMeasured(valkeyUrl);
if (vkResult.error) {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
error: vkResult.error,
});
} else {
services.push({
name: 'valkey',
status: 'ok',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
});
}
} else {
services.push({ name: 'valkey', status: 'skipped', durationMs: 0 });
}
// pgvector — not required for standalone
services.push({ name: 'pgvector', status: 'skipped', durationMs: 0 });
return {
tier,
configPath,
overall: hasFailure ? 'red' : 'green',
services,
};
}
// tier === 'federated'
// Postgres — required
const pgResult = await probePostgresMeasured(pgUrl);
if (pgResult.error) {
hasFailure = true;
services.push({
name: 'postgres',
status: 'fail',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
error: pgResult.error,
});
} else {
services.push({
name: 'postgres',
status: 'ok',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
});
}
// Valkey — required for federated (queue.type must be bullmq)
if (config.queue.type !== 'bullmq') {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: 'localhost',
port: 0,
durationMs: 0,
error: {
message: "Federated tier requires queue.type === 'bullmq'",
remediation:
"Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.",
},
});
} else {
const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL;
const vkResult = await probeValkeyMeasured(federatedValkeyUrl);
if (vkResult.error) {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
error: vkResult.error,
});
} else {
services.push({
name: 'valkey',
status: 'ok',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
});
}
}
// pgvector — required for federated
const pvResult = await probePgvectorMeasured(pgUrl);
if (pvResult.error) {
hasFailure = true;
services.push({
name: 'pgvector',
status: 'fail',
host: pvResult.host,
port: pvResult.port,
durationMs: pvResult.durationMs,
error: pvResult.error,
});
} else {
services.push({
name: 'pgvector',
status: 'ok',
host: pvResult.host,
port: pvResult.port,
durationMs: pvResult.durationMs,
});
}
return {
tier,
configPath,
overall: hasFailure ? 'red' : 'green',
services,
};
}