From 1a4b1ebbf13ba5e805d890c39b29fd8728d84508 Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Mon, 20 Apr 2026 01:00:39 +0000 Subject: [PATCH] feat(gateway,storage): mosaic gateway doctor with tier health JSON (FED-M1-06) (#475) --- apps/gateway/src/bootstrap/tier-detector.ts | 220 ------- apps/gateway/src/main.ts | 2 +- docs/federation/TASKS.md | 4 +- .../src/commands/gateway-doctor.spec.ts | 294 ++++++++++ .../mosaic/src/commands/gateway-doctor.ts | 143 +++++ packages/mosaic/src/commands/gateway.ts | 11 + packages/storage/package.json | 1 + packages/storage/src/index.ts | 2 + .../storage/src/tier-detection.spec.ts | 236 +++++++- packages/storage/src/tier-detection.ts | 555 ++++++++++++++++++ pnpm-lock.yaml | 54 +- 11 files changed, 1233 insertions(+), 289 deletions(-) delete mode 100644 apps/gateway/src/bootstrap/tier-detector.ts create mode 100644 packages/mosaic/src/commands/gateway-doctor.spec.ts create mode 100644 packages/mosaic/src/commands/gateway-doctor.ts rename apps/gateway/src/bootstrap/tier-detector.spec.ts => packages/storage/src/tier-detection.spec.ts (60%) create mode 100644 packages/storage/src/tier-detection.ts diff --git a/apps/gateway/src/bootstrap/tier-detector.ts b/apps/gateway/src/bootstrap/tier-detector.ts deleted file mode 100644 index 5b5fb3c..0000000 --- a/apps/gateway/src/bootstrap/tier-detector.ts +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Tier Detector — pre-flight service reachability assertions. - * - * Runs BEFORE NestFactory.create() to surface actionable errors immediately - * rather than crashing mid-boot with an opaque stack trace. - * - * Library choices: - * - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm). - * The spec mentions `pg`, but only `postgres` is installed in this monorepo. - * - Valkey: `ioredis` (already a dep via @mosaicstack/queue → bullmq; same URL - * convention used by the bullmq adapter). - */ - -import postgres from 'postgres'; -import { Redis } from 'ioredis'; -import type { MosaicConfig } from '@mosaicstack/config'; - -/* ------------------------------------------------------------------ */ -/* Structured error type */ -/* ------------------------------------------------------------------ */ - -export class TierDetectionError extends Error { - public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config'; - public readonly host: string; - public readonly port: number; - public readonly remediation: string; - - constructor(opts: { - service: 'postgres' | 'valkey' | 'pgvector' | 'config'; - host: string; - port: number; - remediation: string; - cause?: unknown; - }) { - const message = - `[tier-detector] ${opts.service} unreachable or unusable at ` + - `${opts.host}:${opts.port} — ${opts.remediation}`; - super(message, { cause: opts.cause }); - this.name = 'TierDetectionError'; - this.service = opts.service; - this.host = opts.host; - this.port = opts.port; - this.remediation = opts.remediation; - } -} - -/* ------------------------------------------------------------------ */ -/* URL helpers */ -/* ------------------------------------------------------------------ */ - -/** Extract host and port from a URL string, returning safe fallbacks on parse failure. */ -function parseHostPort(url: string, defaultPort: number): { host: string; port: number } { - try { - const parsed = new URL(url); - const host = parsed.hostname || 'unknown'; - const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort; - return { host, port }; - } catch { - return { host: 'unknown', port: defaultPort }; - } -} - -/* ------------------------------------------------------------------ */ -/* Postgres probe */ -/* ------------------------------------------------------------------ */ - -async function probePostgres(url: string): Promise { - const { host, port } = parseHostPort(url, 5432); - let sql: ReturnType | undefined; - try { - sql = postgres(url, { - max: 1, - connect_timeout: 5, - idle_timeout: 5, - }); - // Run a trivial query to confirm connectivity. - await sql`SELECT 1`; - } catch (cause) { - throw new TierDetectionError({ - service: 'postgres', - host, - port, - remediation: - 'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`', - cause, - }); - } finally { - if (sql) { - await sql.end({ timeout: 2 }).catch(() => { - // Ignore cleanup errors — we already have what we need. - }); - } - } -} - -/* ------------------------------------------------------------------ */ -/* pgvector probe */ -/* ------------------------------------------------------------------ */ - -async function probePgvector(url: string): Promise { - const { host, port } = parseHostPort(url, 5432); - let sql: ReturnType | undefined; - try { - sql = postgres(url, { - max: 1, - connect_timeout: 5, - idle_timeout: 5, - }); - // This succeeds whether the extension is already installed or freshly created. - // It errors only if the pgvector shared library is missing from the Postgres binary. - await sql`CREATE EXTENSION IF NOT EXISTS vector`; - } catch (cause) { - const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : ''; - const isLibraryMissing = causeMsg.includes('extension "vector" is not available'); - const remediation = isLibraryMissing - ? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.' - : 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.'; - throw new TierDetectionError({ - service: 'pgvector', - host, - port, - remediation, - cause, - }); - } finally { - if (sql) { - await sql.end({ timeout: 2 }).catch(() => { - // Ignore cleanup errors. - }); - } - } -} - -/* ------------------------------------------------------------------ */ -/* Valkey probe */ -/* ------------------------------------------------------------------ */ - -const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; - -async function probeValkey(url: string): Promise { - const { host, port } = parseHostPort(url, 6380); - const client = new Redis(url, { - enableReadyCheck: false, - maxRetriesPerRequest: 0, - retryStrategy: () => null, // no retries — fail fast - lazyConnect: true, - connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt - }); - - try { - await client.connect(); - const pong = await client.ping(); - if (pong !== 'PONG') { - throw new Error(`Unexpected PING response: ${pong}`); - } - } catch (cause) { - throw new TierDetectionError({ - service: 'valkey', - host, - port, - remediation: - 'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`', - cause, - }); - } finally { - client.disconnect(); - } -} - -/* ------------------------------------------------------------------ */ -/* Public entry point */ -/* ------------------------------------------------------------------ */ - -/** - * Assert that all services required by `config.tier` are reachable. - * - * - `local` — no-op (PGlite is in-process; no external services). - * - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq'). - * - `federated` — assert Postgres + Valkey + pgvector installability. - * - * Throws `TierDetectionError` on the first failure with host:port and - * a remediation hint. - */ -export async function detectAndAssertTier(config: MosaicConfig): Promise { - if (config.tier === 'local') { - // PGlite runs in-process — nothing to probe. - return; - } - - const pgUrl = - config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic'; - - const valkeyUrl = - config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null; - - if (config.tier === 'standalone') { - await probePostgres(pgUrl); - if (valkeyUrl) { - await probeValkey(valkeyUrl); - } - return; - } - - // tier === 'federated' - // Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL. - if (config.queue.type !== 'bullmq') { - throw new TierDetectionError({ - service: 'config', - host: 'localhost', - port: 0, - remediation: - "Federated tier requires queue.type === 'bullmq'. " + - "Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.", - }); - } - const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL; - await probePostgres(pgUrl); - await probeValkey(federatedValkeyUrl); - await probePgvector(pgUrl); -} diff --git a/apps/gateway/src/main.ts b/apps/gateway/src/main.ts index 60a1be4..f70e88a 100644 --- a/apps/gateway/src/main.ts +++ b/apps/gateway/src/main.ts @@ -25,7 +25,7 @@ import { AppModule } from './app.module.js'; import { mountAuthHandler } from './auth/auth.controller.js'; import { mountMcpHandler } from './mcp/mcp.controller.js'; import { McpService } from './mcp/mcp.service.js'; -import { detectAndAssertTier, TierDetectionError } from './bootstrap/tier-detector.js'; +import { detectAndAssertTier, TierDetectionError } from '@mosaicstack/storage'; async function bootstrap(): Promise { const logger = new Logger('Bootstrap'); diff --git a/docs/federation/TASKS.md b/docs/federation/TASKS.md index b6c8d3b..b764399 100644 --- a/docs/federation/TASKS.md +++ b/docs/federation/TASKS.md @@ -21,8 +21,8 @@ Goal: Gateway runs in `federated` tier with containerized PG+pgvector+Valkey. No | FED-M1-02 | done | Author `docker-compose.federated.yml` as an overlay profile: Postgres 17 + pgvector extension (port 5433), Valkey (6380), named volumes, healthchecks. Compose-up should boot cleanly on a clean machine. | #460 | sonnet | feat/federation-m1-compose | FED-M1-01 | 5K | Shipped in PR #471. Overlay defines `postgres-federated`/`valkey-federated`, profile-gated, with pg-init for pgvector extension. | | FED-M1-03 | done | Add pgvector support to `packages/storage/src/adapters/postgres.ts`: create extension on init (idempotent), expose vector column type in schema helpers. No adapter changes for non-federated tiers. | #460 | sonnet | feat/federation-m1-pgvector | FED-M1-02 | 8K | Shipped in PR #472. `enableVector` flag on postgres StorageConfig; idempotent CREATE EXTENSION before migrations. | | FED-M1-04 | done | Implement `apps/gateway/src/bootstrap/tier-detector.ts`: reads config, asserts PG/Valkey/pgvector reachable for `federated`, fail-fast with actionable error message on failure. Unit tests for each failure mode. | #460 | sonnet | feat/federation-m1-detector | FED-M1-03 | 8K | Shipped in PR #473. 12 tests; 5s timeouts on probes; pgvector library/permission discrimination; rejects non-bullmq for federated. | -| FED-M1-05 | in-progress | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | sonnet | feat/federation-m1-migrate | FED-M1-04 | 10K | Do NOT run automatically. CLI subcommand `mosaic migrate tier --to federated --dry-run`. Safety rails. | -| FED-M1-06 | not-started | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. | +| FED-M1-05 | done | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | sonnet | feat/federation-m1-migrate | FED-M1-04 | 10K | Shipped in PR #474. `mosaic storage migrate-tier`; DrizzleMigrationSource (corrects P0 found in review); 32 tests; idempotent. | +| FED-M1-06 | in-progress | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. | | FED-M1-07 | not-started | Integration test: gateway boots in `federated` tier with docker-compose `federated` profile; refuses to boot when PG unreachable (asserts fail-fast); pgvector extension query succeeds. | #460 | sonnet | feat/federation-m1-integration | FED-M1-04 | 8K | Vitest + docker-compose test profile. One test file per assertion; real services, no mocks. | | FED-M1-08 | not-started | Integration test for migration script: seed a local PGlite with representative data (tasks, notes, users, teams), run migration, assert row counts + key samples equal on federated PG. | #460 | sonnet | feat/federation-m1-migrate-test | FED-M1-05 | 6K | Runs against docker-compose federated profile; uses temp PGlite file; deterministic seed. | | FED-M1-09 | not-started | Standalone regression: full agent-session E2E on existing `standalone` tier with a gateway built from this branch. Must pass without referencing any federation module. | #460 | haiku | feat/federation-m1-regression | FED-M1-07 | 4K | Reuse existing e2e harness; just re-point at the federation branch build. Canary that we didn't break it. | diff --git a/packages/mosaic/src/commands/gateway-doctor.spec.ts b/packages/mosaic/src/commands/gateway-doctor.spec.ts new file mode 100644 index 0000000..767297a --- /dev/null +++ b/packages/mosaic/src/commands/gateway-doctor.spec.ts @@ -0,0 +1,294 @@ +/** + * Unit tests for gateway-doctor.ts (mosaic gateway doctor). + * + * All external I/O is mocked — no live services required. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { TierHealthReport } from '@mosaicstack/storage'; + +/* ------------------------------------------------------------------ */ +/* Shared mock state */ +/* ------------------------------------------------------------------ */ + +const mocks = vi.hoisted(() => { + const mockLoadConfig = vi.fn(); + const mockProbeServiceHealth = vi.fn(); + const mockExistsSync = vi.fn(); + + return { mockLoadConfig, mockProbeServiceHealth, mockExistsSync }; +}); + +/* ------------------------------------------------------------------ */ +/* Module mocks */ +/* ------------------------------------------------------------------ */ + +vi.mock('@mosaicstack/config', () => ({ + loadConfig: mocks.mockLoadConfig, +})); + +vi.mock('@mosaicstack/storage', () => ({ + probeServiceHealth: mocks.mockProbeServiceHealth, +})); + +vi.mock('node:fs', () => ({ + existsSync: mocks.mockExistsSync, +})); + +/* ------------------------------------------------------------------ */ +/* Import SUT */ +/* ------------------------------------------------------------------ */ + +import { runGatewayDoctor } from './gateway-doctor.js'; +import type { MosaicConfig } from '@mosaicstack/config'; + +/* ------------------------------------------------------------------ */ +/* Fixtures */ +/* ------------------------------------------------------------------ */ + +const STANDALONE_CONFIG: MosaicConfig = { + tier: 'standalone', + storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@localhost:5432/mosaic' }, + queue: { type: 'bullmq', url: 'redis://localhost:6380' }, + memory: { type: 'keyword' }, +}; + +const GREEN_REPORT: TierHealthReport = { + tier: 'standalone', + configPath: '/some/mosaic.config.json', + overall: 'green', + services: [ + { name: 'postgres', status: 'ok', host: 'localhost', port: 5432, durationMs: 42 }, + { name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 10 }, + { name: 'pgvector', status: 'skipped', durationMs: 0 }, + ], +}; + +const RED_REPORT: TierHealthReport = { + tier: 'standalone', + configPath: '/some/mosaic.config.json', + overall: 'red', + services: [ + { + name: 'postgres', + status: 'fail', + host: 'localhost', + port: 5432, + durationMs: 5001, + error: { + message: 'connection refused', + remediation: 'Start Postgres: `docker compose ...`', + }, + }, + { name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 8 }, + { name: 'pgvector', status: 'skipped', durationMs: 0 }, + ], +}; + +const FEDERATED_GREEN_REPORT: TierHealthReport = { + tier: 'federated', + configPath: '/some/mosaic.config.json', + overall: 'green', + services: [ + { name: 'postgres', status: 'ok', host: 'localhost', port: 5433, durationMs: 30 }, + { name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 5 }, + { name: 'pgvector', status: 'ok', host: 'localhost', port: 5433, durationMs: 25 }, + ], +}; + +/* ------------------------------------------------------------------ */ +/* Process helpers */ +/* ------------------------------------------------------------------ */ + +let stdoutCapture = ''; +let exitCode: number | undefined; + +function captureOutput(): void { + stdoutCapture = ''; + exitCode = undefined; + + vi.spyOn(process.stdout, 'write').mockImplementation((chunk) => { + stdoutCapture += typeof chunk === 'string' ? chunk : chunk.toString(); + return true; + }); + vi.spyOn(process.stderr, 'write').mockImplementation(() => true); + vi.spyOn(process, 'exit').mockImplementation((code?: string | number | null) => { + exitCode = typeof code === 'number' ? code : code != null ? Number(code) : undefined; + throw new Error(`process.exit(${String(code)})`); + }); + vi.spyOn(console, 'log').mockImplementation((...args: unknown[]) => { + stdoutCapture += args.join(' ') + '\n'; + }); +} + +/* ------------------------------------------------------------------ */ +/* Tests */ +/* ------------------------------------------------------------------ */ + +describe('runGatewayDoctor', () => { + beforeEach(() => { + vi.clearAllMocks(); + captureOutput(); + + // By default: no config file on disk (env-detection path) + mocks.mockExistsSync.mockReturnValue(false); + mocks.mockLoadConfig.mockReturnValue(STANDALONE_CONFIG); + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + /* ---------------------------------------------------------------- */ + /* 1. JSON mode: parseable JSON matching the schema */ + /* ---------------------------------------------------------------- */ + + it('JSON mode emits parseable JSON matching TierHealthReport schema', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + + await runGatewayDoctor({ json: true }); + + const parsed = JSON.parse(stdoutCapture) as TierHealthReport; + expect(parsed.tier).toBe('standalone'); + expect(parsed.overall).toBe('green'); + expect(Array.isArray(parsed.services)).toBe(true); + expect(parsed.services).toHaveLength(3); + + // Validate shape of each service check + for (const svc of parsed.services) { + expect(['postgres', 'valkey', 'pgvector']).toContain(svc.name); + expect(['ok', 'fail', 'skipped']).toContain(svc.status); + expect(typeof svc.durationMs).toBe('number'); + } + + // JSON mode must be silent on console.log — output goes to process.stdout only. + expect(console.log).not.toHaveBeenCalled(); + }); + + it('JSON mode for federated with 3 ok services', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(FEDERATED_GREEN_REPORT); + + await runGatewayDoctor({ json: true }); + + const parsed = JSON.parse(stdoutCapture) as TierHealthReport; + expect(parsed.tier).toBe('federated'); + expect(parsed.overall).toBe('green'); + expect(parsed.services.every((s) => s.status === 'ok')).toBe(true); + + // JSON mode must be silent on console.log — output goes to process.stdout only. + expect(console.log).not.toHaveBeenCalled(); + }); + + /* ---------------------------------------------------------------- */ + /* 2. Plain text mode: service lines and overall verdict */ + /* ---------------------------------------------------------------- */ + + it('plain text mode includes service lines for each service', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + + await runGatewayDoctor({}); + + expect(stdoutCapture).toContain('postgres'); + expect(stdoutCapture).toContain('valkey'); + expect(stdoutCapture).toContain('pgvector'); + }); + + it('plain text mode includes Overall verdict', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + + await runGatewayDoctor({}); + + expect(stdoutCapture).toContain('Overall: GREEN'); + }); + + it('plain text mode shows tier and config path in header', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + + await runGatewayDoctor({}); + + expect(stdoutCapture).toContain('Tier: standalone'); + }); + + it('plain text mode shows remediation for failed services', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT); + + try { + await runGatewayDoctor({}); + } catch { + // process.exit throws in test + } + + expect(stdoutCapture).toContain('Remediations:'); + expect(stdoutCapture).toContain('Start Postgres'); + }); + + /* ---------------------------------------------------------------- */ + /* 3. Exit codes */ + /* ---------------------------------------------------------------- */ + + it('exits with code 1 when overall is red', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT); + + await expect(runGatewayDoctor({})).rejects.toThrow('process.exit(1)'); + expect(exitCode).toBe(1); + }); + + it('exits with code 0 (no exit call) when overall is green', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT); + + await runGatewayDoctor({}); + + // process.exit should NOT have been called for green. + expect(exitCode).toBeUndefined(); + }); + + it('JSON mode exits with code 1 when overall is red', async () => { + mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT); + + await expect(runGatewayDoctor({ json: true })).rejects.toThrow('process.exit(1)'); + expect(exitCode).toBe(1); + }); + + /* ---------------------------------------------------------------- */ + /* 4. --config path override is honored */ + /* ---------------------------------------------------------------- */ + + it('passes --config path to loadConfig when provided', async () => { + const customPath = '/custom/path/mosaic.config.json'; + + await runGatewayDoctor({ config: customPath }); + + // loadConfig should have been called with the resolved custom path. + expect(mocks.mockLoadConfig).toHaveBeenCalledWith( + expect.stringContaining('mosaic.config.json'), + ); + // The exact call should include the custom path (resolved). + const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined]; + expect(calledPath).toContain('custom/path/mosaic.config.json'); + }); + + it('calls loadConfig without path when no --config and no file on disk', async () => { + mocks.mockExistsSync.mockReturnValue(false); + + await runGatewayDoctor({}); + + const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined]; + // When no file found, resolveConfigPath returns undefined, so loadConfig is called with undefined + expect(calledPath).toBeUndefined(); + }); + + it('finds config from cwd when mosaic.config.json exists there', async () => { + // First candidate (cwd/mosaic.config.json) exists. + mocks.mockExistsSync.mockImplementation((p: unknown) => { + return typeof p === 'string' && p.endsWith('mosaic.config.json'); + }); + + await runGatewayDoctor({}); + + const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined]; + expect(calledPath).toBeDefined(); + expect(typeof calledPath).toBe('string'); + expect(calledPath!.endsWith('mosaic.config.json')).toBe(true); + }); +}); diff --git a/packages/mosaic/src/commands/gateway-doctor.ts b/packages/mosaic/src/commands/gateway-doctor.ts new file mode 100644 index 0000000..e554729 --- /dev/null +++ b/packages/mosaic/src/commands/gateway-doctor.ts @@ -0,0 +1,143 @@ +/** + * gateway-doctor.ts — `mosaic gateway doctor` implementation. + * + * Reports current tier and per-service health (PG, Valkey, pgvector) for the + * Mosaic gateway. Supports machine-readable JSON output for CI. + * + * Exit codes: + * 0 — overall green or yellow + * 1 — overall red (at least one required service failed) + */ + +import { existsSync } from 'node:fs'; +import { resolve, join } from 'node:path'; +import { homedir } from 'node:os'; +import { loadConfig } from '@mosaicstack/config'; +import { probeServiceHealth } from '@mosaicstack/storage'; +import type { TierHealthReport, ServiceCheck } from '@mosaicstack/storage'; + +/* ------------------------------------------------------------------ */ +/* Config resolution */ +/* ------------------------------------------------------------------ */ + +const CONFIG_CANDIDATES = [ + resolve(process.cwd(), 'mosaic.config.json'), + join(homedir(), '.mosaic', 'mosaic.config.json'), +]; + +/** + * Resolve the config path to report in output. + * + * Priority: + * 1. Explicit `--config ` flag + * 2. `./mosaic.config.json` (cwd) + * 3. `~/.mosaic/mosaic.config.json` + * 4. undefined — `loadConfig()` falls back to env-var detection + * + * `loadConfig()` itself already handles priority 1-3 when passed an explicit + * path, and falls back to env-detection when none exists. We resolve here + * only so we can surface the path in the health report. + */ +function resolveConfigPath(explicit?: string): string | undefined { + if (explicit) return resolve(explicit); + for (const candidate of CONFIG_CANDIDATES) { + if (existsSync(candidate)) return candidate; + } + return undefined; +} + +/* ------------------------------------------------------------------ */ +/* Output helpers */ +/* ------------------------------------------------------------------ */ + +const TICK = '\u2713'; // ✓ +const CROSS = '\u2717'; // ✗ +const SKIP = '-'; + +function padRight(s: string, n: number): string { + return s + ' '.repeat(Math.max(0, n - s.length)); +} + +function serviceLabel(svc: ServiceCheck): string { + const hostPort = + svc.host !== undefined && svc.port !== undefined ? `${svc.host}:${svc.port.toString()}` : ''; + const duration = `(${svc.durationMs.toString()}ms)`; + + switch (svc.status) { + case 'ok': + return ` ${TICK} ${padRight(svc.name, 10)} ${padRight(hostPort, 22)} ${duration}`; + case 'fail': { + const errMsg = svc.error?.message ?? 'unknown error'; + return ` ${CROSS} ${padRight(svc.name, 10)} ${padRight(hostPort, 22)} ${duration} \u2192 ${errMsg}`; + } + case 'skipped': + return ` ${SKIP} ${padRight(svc.name, 10)} (skipped)`; + } +} + +function printReport(report: TierHealthReport): void { + const configDisplay = report.configPath ?? '(auto-detected)'; + console.log(`Tier: ${report.tier} Config: ${configDisplay}`); + console.log(''); + + for (const svc of report.services) { + console.log(serviceLabel(svc)); + } + + console.log(''); + + // Print remediations for failed services. + const failed = report.services.filter((s) => s.status === 'fail' && s.error); + if (failed.length > 0) { + console.log('Remediations:'); + for (const svc of failed) { + if (svc.error) { + console.log(` ${svc.name}: ${svc.error.remediation}`); + } + } + console.log(''); + } + + console.log(`Overall: ${report.overall.toUpperCase()}`); +} + +/* ------------------------------------------------------------------ */ +/* Main runner */ +/* ------------------------------------------------------------------ */ + +export interface GatewayDoctorOptions { + json?: boolean; + config?: string; +} + +export async function runGatewayDoctor(opts: GatewayDoctorOptions): Promise { + const configPath = resolveConfigPath(opts.config); + + let mosaicConfig; + try { + mosaicConfig = loadConfig(configPath); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (opts.json) { + process.stdout.write( + JSON.stringify({ error: `Failed to load config: ${msg}` }, null, 2) + '\n', + ); + } else { + process.stderr.write(`Error: Failed to load config: ${msg}\n`); + } + process.exit(1); + } + + const report = await probeServiceHealth(mosaicConfig, configPath); + + if (opts.json) { + process.stdout.write(JSON.stringify(report, null, 2) + '\n'); + } else { + printReport(report); + } + + // Exit 1 if overall is red. + if (report.overall === 'red') { + process.exit(1); + } +} diff --git a/packages/mosaic/src/commands/gateway.ts b/packages/mosaic/src/commands/gateway.ts index 870292b..4a7837d 100644 --- a/packages/mosaic/src/commands/gateway.ts +++ b/packages/mosaic/src/commands/gateway.ts @@ -206,4 +206,15 @@ export function registerGatewayCommand(program: Command): void { const { runUninstall } = await import('./gateway/uninstall.js'); await runUninstall(); }); + + // ─── doctor ───────────────────────────────────────────────────────────────── + + gw.command('doctor') + .description('Check gateway tier and per-service health (PG, Valkey, pgvector)') + .option('--json', 'Emit TierHealthReport as JSON to stdout (suppresses all other output)') + .option('--config ', 'Path to mosaic.config.json (defaults to cwd or ~/.mosaic/)') + .action(async (cmdOpts: { json?: boolean; config?: string }) => { + const { runGatewayDoctor } = await import('./gateway-doctor.js'); + await runGatewayDoctor({ json: cmdOpts.json, config: cmdOpts.config }); + }); } diff --git a/packages/storage/package.json b/packages/storage/package.json index f144a8a..01294d8 100644 --- a/packages/storage/package.json +++ b/packages/storage/package.json @@ -25,6 +25,7 @@ "@mosaicstack/db": "workspace:^", "@mosaicstack/types": "workspace:*", "commander": "^13.0.0", + "ioredis": "^5.10.0", "postgres": "^3.4.8" }, "devDependencies": { diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 18e06f7..1f3f0e4 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -1,4 +1,6 @@ export type { StorageAdapter, StorageConfig } from './types.js'; +export { TierDetectionError, detectAndAssertTier, probeServiceHealth } from './tier-detection.js'; +export type { ServiceCheck, TierHealthReport } from './tier-detection.js'; export { createStorageAdapter, registerStorageAdapter } from './factory.js'; export { PostgresAdapter } from './adapters/postgres.js'; export { PgliteAdapter } from './adapters/pglite.js'; diff --git a/apps/gateway/src/bootstrap/tier-detector.spec.ts b/packages/storage/src/tier-detection.spec.ts similarity index 60% rename from apps/gateway/src/bootstrap/tier-detector.spec.ts rename to packages/storage/src/tier-detection.spec.ts index 3c6ca13..abce213 100644 --- a/apps/gateway/src/bootstrap/tier-detector.spec.ts +++ b/packages/storage/src/tier-detection.spec.ts @@ -1,5 +1,5 @@ /** - * Unit tests for tier-detector.ts. + * Unit tests for tier-detection.ts. * * All external I/O (postgres, ioredis) is mocked — no live services required. * @@ -59,28 +59,26 @@ vi.mock('ioredis', () => ({ /* Import SUT after mocks are registered */ /* ------------------------------------------------------------------ */ -import { detectAndAssertTier, TierDetectionError } from './tier-detector.js'; -import type { MosaicConfig } from '@mosaicstack/config'; +import { detectAndAssertTier, probeServiceHealth, TierDetectionError } from './tier-detection.js'; +import type { TierConfig } from './tier-detection.js'; /* ------------------------------------------------------------------ */ /* Config fixtures */ /* ------------------------------------------------------------------ */ -const LOCAL_CONFIG: MosaicConfig = { +const LOCAL_CONFIG: TierConfig = { tier: 'local', storage: { type: 'pglite', dataDir: '.mosaic/pglite' }, - queue: { type: 'local', dataDir: '.mosaic/queue' }, - memory: { type: 'keyword' }, + queue: { type: 'local' }, }; -const STANDALONE_CONFIG: MosaicConfig = { +const STANDALONE_CONFIG: TierConfig = { tier: 'standalone', storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@db-host:5432/mosaic' }, queue: { type: 'bullmq', url: 'redis://valkey-host:6380' }, - memory: { type: 'keyword' }, }; -const FEDERATED_CONFIG: MosaicConfig = { +const FEDERATED_CONFIG: TierConfig = { tier: 'federated', storage: { type: 'postgres', @@ -88,7 +86,6 @@ const FEDERATED_CONFIG: MosaicConfig = { enableVector: true, }, queue: { type: 'bullmq', url: 'redis://valkey-host:6380' }, - memory: { type: 'pgvector' }, }; /* ------------------------------------------------------------------ */ @@ -232,15 +229,17 @@ describe('detectAndAssertTier', () => { }); /* ---------------------------------------------------------------- */ - /* 7. probeValkey honors connectTimeout */ + /* 7. probeValkey honors connectTimeout and lazyConnect */ /* ---------------------------------------------------------------- */ it('constructs the ioredis Redis client with connectTimeout: 5000', async () => { await detectAndAssertTier(STANDALONE_CONFIG); expect(mocks.MockRedis).toHaveBeenCalledOnce(); - const [, options] = mocks.MockRedis.mock.calls[0] as [string, Record]; - expect(options).toMatchObject({ connectTimeout: 5000 }); + expect(mocks.MockRedis).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ connectTimeout: 5000, lazyConnect: true }), + ); }); /* ---------------------------------------------------------------- */ @@ -269,7 +268,7 @@ describe('detectAndAssertTier', () => { /* ---------------------------------------------------------------- */ it('mentions CREATE permission or superuser in remediation for a generic pgvector error', async () => { - // SELECT 1 succeeds; CREATE EXTENSION fails with a permission error (not the library-missing message). + // SELECT 1 succeeds; CREATE EXTENSION fails with a permission error. mocks.mockSqlFn .mockResolvedValueOnce([]) // SELECT 1 (probePostgres) .mockRejectedValueOnce(new Error('permission denied to create extension')); @@ -293,15 +292,14 @@ describe('detectAndAssertTier', () => { /* ---------------------------------------------------------------- */ it('throws TierDetectionError with service=config for federated tier with queue.type !== bullmq', async () => { - const badConfig: MosaicConfig = { + const badConfig: TierConfig = { tier: 'federated', storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@db-host:5433/mosaic', enableVector: true, }, - queue: { type: 'local', dataDir: '.mosaic/queue' }, - memory: { type: 'pgvector' }, + queue: { type: 'local' }, }; try { @@ -342,3 +340,207 @@ describe('detectAndAssertTier', () => { expect(caught!.message).toContain('db-host:5432'); }); }); + +/* ------------------------------------------------------------------ */ +/* probeServiceHealth tests */ +/* ------------------------------------------------------------------ */ + +describe('probeServiceHealth', () => { + beforeEach(() => { + vi.clearAllMocks(); + + mocks.mockSqlFn.mockResolvedValue([]); + mocks.mockEnd.mockResolvedValue(undefined); + mocks.mockRedisConnect.mockResolvedValue(undefined); + mocks.mockRedisPing.mockResolvedValue('PONG'); + + mocks.mockPostgresConstructor.mockImplementation(() => { + const sql = mocks.mockSqlFn as ReturnType; + (sql as unknown as Record)['end'] = mocks.mockEnd; + return sql; + }); + mocks.MockRedis.mockImplementation(() => ({ + connect: mocks.mockRedisConnect, + ping: mocks.mockRedisPing, + disconnect: mocks.mockRedisDisconnect, + })); + }); + + /* ---------------------------------------------------------------- */ + /* 12. local tier — all skipped, green */ + /* ---------------------------------------------------------------- */ + + it('returns all services as skipped and overall green for local tier', async () => { + const report = await probeServiceHealth(LOCAL_CONFIG); + + expect(report.tier).toBe('local'); + expect(report.overall).toBe('green'); + expect(report.services).toHaveLength(3); + for (const svc of report.services) { + expect(svc.status).toBe('skipped'); + } + expect(mocks.mockPostgresConstructor).not.toHaveBeenCalled(); + expect(mocks.MockRedis).not.toHaveBeenCalled(); + }); + + /* ---------------------------------------------------------------- */ + /* 13. postgres fails, valkey ok → red */ + /* ---------------------------------------------------------------- */ + + it('returns red overall with postgres fail and valkey ok for standalone when postgres fails', async () => { + mocks.mockSqlFn.mockRejectedValue(new Error('connection refused')); + + const report = await probeServiceHealth(STANDALONE_CONFIG); + + expect(report.overall).toBe('red'); + + const pgCheck = report.services.find((s) => s.name === 'postgres'); + expect(pgCheck?.status).toBe('fail'); + expect(pgCheck?.error).toBeDefined(); + expect(pgCheck?.error?.remediation).toContain('docker compose'); + + const vkCheck = report.services.find((s) => s.name === 'valkey'); + expect(vkCheck?.status).toBe('ok'); + }); + + /* ---------------------------------------------------------------- */ + /* 14. federated all green → 3 services ok */ + /* ---------------------------------------------------------------- */ + + it('returns green overall with all 3 services ok for federated when all pass', async () => { + mocks.mockSqlFn.mockResolvedValue([]); + + const report = await probeServiceHealth(FEDERATED_CONFIG); + + expect(report.tier).toBe('federated'); + expect(report.overall).toBe('green'); + expect(report.services).toHaveLength(3); + + for (const svc of report.services) { + expect(svc.status).toBe('ok'); + } + }); + + /* ---------------------------------------------------------------- */ + /* 15. durationMs is a non-negative number for every service check */ + /* ---------------------------------------------------------------- */ + + it('sets durationMs as a non-negative number for every service check', async () => { + mocks.mockSqlFn.mockResolvedValue([]); + + const report = await probeServiceHealth(FEDERATED_CONFIG); + + for (const svc of report.services) { + expect(typeof svc.durationMs).toBe('number'); + expect(svc.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it('sets durationMs >= 0 even for skipped services (local tier)', async () => { + const report = await probeServiceHealth(LOCAL_CONFIG); + + for (const svc of report.services) { + expect(typeof svc.durationMs).toBe('number'); + expect(svc.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + /* ---------------------------------------------------------------- */ + /* 16. configPath is passed through to the report */ + /* ---------------------------------------------------------------- */ + + it('includes configPath in the report when provided', async () => { + const report = await probeServiceHealth(LOCAL_CONFIG, '/etc/mosaic/mosaic.config.json'); + expect(report.configPath).toBe('/etc/mosaic/mosaic.config.json'); + }); + + /* ---------------------------------------------------------------- */ + /* 17. standalone — valkey fails, postgres ok → red */ + /* ---------------------------------------------------------------- */ + + it('returns red with valkey fail and postgres ok for standalone when valkey fails', async () => { + mocks.mockSqlFn.mockResolvedValue([]); + mocks.mockRedisConnect.mockRejectedValue(new Error('ECONNREFUSED')); + + const report = await probeServiceHealth(STANDALONE_CONFIG); + + expect(report.overall).toBe('red'); + + const pgCheck = report.services.find((s) => s.name === 'postgres'); + expect(pgCheck?.status).toBe('ok'); + + const vkCheck = report.services.find((s) => s.name === 'valkey'); + expect(vkCheck?.status).toBe('fail'); + expect(vkCheck?.error).toBeDefined(); + }); + + /* ---------------------------------------------------------------- */ + /* 18. federated — pgvector fails → red with remediation */ + /* ---------------------------------------------------------------- */ + + it('returns red with pgvector fail for federated when pgvector probe fails', async () => { + mocks.mockSqlFn + .mockResolvedValueOnce([]) // postgres SELECT 1 + .mockRejectedValueOnce(new Error('extension "vector" is not available')); // pgvector + + const report = await probeServiceHealth(FEDERATED_CONFIG); + + expect(report.overall).toBe('red'); + + const pvCheck = report.services.find((s) => s.name === 'pgvector'); + expect(pvCheck?.status).toBe('fail'); + expect(pvCheck?.error?.remediation).toContain('pgvector/pgvector:pg17'); + }); + + /* ---------------------------------------------------------------- */ + /* 19. federated — non-bullmq queue → red config error, no network */ + /* ---------------------------------------------------------------- */ + + it('returns red overall with config error when federated tier has non-bullmq queue (no network call)', async () => { + const federatedBadQueueConfig: TierConfig = { + tier: 'federated', + storage: { + type: 'postgres', + url: 'postgresql://mosaic:mosaic@db-host:5433/mosaic', + enableVector: true, + }, + queue: { type: 'local' }, + }; + + const report = await probeServiceHealth(federatedBadQueueConfig); + + expect(report.overall).toBe('red'); + + const valkey = report.services.find((s) => s.name === 'valkey'); + expect(valkey?.status).toBe('fail'); + expect(valkey?.error?.remediation).toMatch(/bullmq/i); + + // Critically: no network call was made — MockRedis constructor must NOT have been called. + expect(mocks.MockRedis).not.toHaveBeenCalled(); + }); + + /* ---------------------------------------------------------------- */ + /* 20. durationMs actually measures real elapsed time */ + /* ---------------------------------------------------------------- */ + + it('measures real elapsed time for service probes', async () => { + const DELAY_MS = 25; + + // Make the postgres mock introduce a real wall-clock delay. + mocks.mockSqlFn.mockImplementation( + () => + new Promise((resolve) => + setTimeout(() => { + resolve([]); + }, DELAY_MS), + ), + ); + + const report = await probeServiceHealth(STANDALONE_CONFIG); + + const pgCheck = report.services.find((s) => s.name === 'postgres'); + expect(pgCheck).toBeDefined(); + // Must be >= 20ms (small slack for jitter). Would be 0 if timer were stubbed. + expect(pgCheck!.durationMs).toBeGreaterThanOrEqual(20); + }); +}); diff --git a/packages/storage/src/tier-detection.ts b/packages/storage/src/tier-detection.ts new file mode 100644 index 0000000..54324b2 --- /dev/null +++ b/packages/storage/src/tier-detection.ts @@ -0,0 +1,555 @@ +/** + * Tier Detection — pre-flight service reachability probes. + * + * Lifted from apps/gateway/src/bootstrap/tier-detector.ts so both the gateway + * and the mosaic CLI can share the same probe logic without duplicating code or + * creating circular workspace dependencies. + * + * Library choices: + * - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm). + * - Valkey: `ioredis` (compatible with Valkey; same URL convention used by bullmq). + */ + +import postgres from 'postgres'; +import { Redis } from 'ioredis'; + +/* ------------------------------------------------------------------ */ +/* Local structural type — avoids circular dependency */ +/* ------------------------------------------------------------------ */ + +/** + * Minimal structural shape required for tier detection. + * Mirrors the relevant fields of MosaicConfig (from @mosaicstack/config) without + * creating a dependency cycle (config depends on storage for StorageConfig). + * Any object that satisfies MosaicConfig also satisfies this type. + */ +export interface TierConfig { + tier: 'local' | 'standalone' | 'federated'; + storage: + | { type: 'pglite'; dataDir?: string } + | { type: 'postgres'; url: string; enableVector?: boolean } + | { type: 'files'; dataDir: string; format?: 'json' | 'md' }; + queue: { type: string; url?: string }; +} + +/* ------------------------------------------------------------------ */ +/* Public types */ +/* ------------------------------------------------------------------ */ + +export interface ServiceCheck { + name: 'postgres' | 'valkey' | 'pgvector'; + status: 'ok' | 'fail' | 'skipped'; + host?: string; + port?: number; + durationMs: number; + error?: { message: string; remediation: string }; +} + +export interface TierHealthReport { + tier: 'local' | 'standalone' | 'federated'; + configPath?: string; + overall: 'green' | 'yellow' | 'red'; + services: ServiceCheck[]; +} + +/* ------------------------------------------------------------------ */ +/* Structured error type */ +/* ------------------------------------------------------------------ */ + +export class TierDetectionError extends Error { + public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config'; + public readonly host: string; + public readonly port: number; + public readonly remediation: string; + + constructor(opts: { + service: 'postgres' | 'valkey' | 'pgvector' | 'config'; + host: string; + port: number; + remediation: string; + cause?: unknown; + }) { + const message = + `[tier-detector] ${opts.service} unreachable or unusable at ` + + `${opts.host}:${opts.port} — ${opts.remediation}`; + super(message, { cause: opts.cause }); + this.name = 'TierDetectionError'; + this.service = opts.service; + this.host = opts.host; + this.port = opts.port; + this.remediation = opts.remediation; + } +} + +/* ------------------------------------------------------------------ */ +/* URL helpers */ +/* ------------------------------------------------------------------ */ + +/** Extract host and port from a URL string, returning safe fallbacks on parse failure. */ +function parseHostPort(url: string, defaultPort: number): { host: string; port: number } { + try { + const parsed = new URL(url); + const host = parsed.hostname || 'unknown'; + const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort; + return { host, port }; + } catch { + return { host: 'unknown', port: defaultPort }; + } +} + +/* ------------------------------------------------------------------ */ +/* Internal probe results */ +/* ------------------------------------------------------------------ */ + +interface ProbeResult { + host: string; + port: number; + durationMs: number; + error?: { message: string; remediation: string }; +} + +/* ------------------------------------------------------------------ */ +/* Postgres probe */ +/* ------------------------------------------------------------------ */ + +async function probePostgres(url: string): Promise { + const { host, port } = parseHostPort(url, 5432); + let sql: ReturnType | undefined; + try { + sql = postgres(url, { + max: 1, + connect_timeout: 5, + idle_timeout: 5, + }); + // Run a trivial query to confirm connectivity. + await sql`SELECT 1`; + } catch (cause) { + throw new TierDetectionError({ + service: 'postgres', + host, + port, + remediation: + 'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`', + cause, + }); + } finally { + if (sql) { + await sql.end({ timeout: 2 }).catch(() => { + // Ignore cleanup errors — we already have what we need. + }); + } + } +} + +async function probePostgresMeasured(url: string): Promise { + const { host, port } = parseHostPort(url, 5432); + const start = Date.now(); + let sql: ReturnType | undefined; + try { + sql = postgres(url, { + max: 1, + connect_timeout: 5, + idle_timeout: 5, + }); + await sql`SELECT 1`; + return { host, port, durationMs: Date.now() - start }; + } catch (cause) { + return { + host, + port, + durationMs: Date.now() - start, + error: { + message: cause instanceof Error ? cause.message : String(cause), + remediation: + 'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`', + }, + }; + } finally { + if (sql) { + await sql.end({ timeout: 2 }).catch(() => {}); + } + } +} + +/* ------------------------------------------------------------------ */ +/* pgvector probe */ +/* ------------------------------------------------------------------ */ + +async function probePgvector(url: string): Promise { + const { host, port } = parseHostPort(url, 5432); + let sql: ReturnType | undefined; + try { + sql = postgres(url, { + max: 1, + connect_timeout: 5, + idle_timeout: 5, + }); + // This succeeds whether the extension is already installed or freshly created. + // It errors only if the pgvector shared library is missing from the Postgres binary. + await sql`CREATE EXTENSION IF NOT EXISTS vector`; + } catch (cause) { + const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : ''; + const isLibraryMissing = causeMsg.includes('extension "vector" is not available'); + const remediation = isLibraryMissing + ? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.' + : 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.'; + throw new TierDetectionError({ + service: 'pgvector', + host, + port, + remediation, + cause, + }); + } finally { + if (sql) { + await sql.end({ timeout: 2 }).catch(() => { + // Ignore cleanup errors. + }); + } + } +} + +async function probePgvectorMeasured(url: string): Promise { + const { host, port } = parseHostPort(url, 5432); + const start = Date.now(); + let sql: ReturnType | undefined; + try { + sql = postgres(url, { + max: 1, + connect_timeout: 5, + idle_timeout: 5, + }); + await sql`CREATE EXTENSION IF NOT EXISTS vector`; + return { host, port, durationMs: Date.now() - start }; + } catch (cause) { + const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : ''; + const isLibraryMissing = causeMsg.includes('extension "vector" is not available'); + const remediation = isLibraryMissing + ? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.' + : 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.'; + return { + host, + port, + durationMs: Date.now() - start, + error: { message: cause instanceof Error ? cause.message : String(cause), remediation }, + }; + } finally { + if (sql) { + await sql.end({ timeout: 2 }).catch(() => {}); + } + } +} + +/* ------------------------------------------------------------------ */ +/* Valkey probe */ +/* ------------------------------------------------------------------ */ + +const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; + +async function probeValkey(url: string): Promise { + const { host, port } = parseHostPort(url, 6380); + const client = new Redis(url, { + enableReadyCheck: false, + maxRetriesPerRequest: 0, + retryStrategy: () => null, // no retries — fail fast + lazyConnect: true, + connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt + }); + + try { + await client.connect(); + const pong = await client.ping(); + if (pong !== 'PONG') { + throw new Error(`Unexpected PING response: ${pong}`); + } + } catch (cause) { + throw new TierDetectionError({ + service: 'valkey', + host, + port, + remediation: + 'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`', + cause, + }); + } finally { + client.disconnect(); + } +} + +async function probeValkeyMeasured(url: string): Promise { + const { host, port } = parseHostPort(url, 6380); + const start = Date.now(); + const client = new Redis(url, { + enableReadyCheck: false, + maxRetriesPerRequest: 0, + retryStrategy: () => null, + lazyConnect: true, + connectTimeout: 5000, + }); + try { + await client.connect(); + const pong = await client.ping(); + if (pong !== 'PONG') { + throw new Error(`Unexpected PING response: ${pong}`); + } + return { host, port, durationMs: Date.now() - start }; + } catch (cause) { + return { + host, + port, + durationMs: Date.now() - start, + error: { + message: cause instanceof Error ? cause.message : String(cause), + remediation: + 'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`', + }, + }; + } finally { + client.disconnect(); + } +} + +/* ------------------------------------------------------------------ */ +/* Public entry points */ +/* ------------------------------------------------------------------ */ + +/** + * Assert that all services required by `config.tier` are reachable. + * + * - `local` — no-op (PGlite is in-process; no external services). + * - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq'). + * - `federated` — assert Postgres + Valkey + pgvector installability. + * + * Throws `TierDetectionError` on the first failure with host:port and + * a remediation hint. + */ +export async function detectAndAssertTier(config: TierConfig): Promise { + if (config.tier === 'local') { + // PGlite runs in-process — nothing to probe. + return; + } + + const pgUrl = + config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic'; + + const valkeyUrl = + config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null; + + if (config.tier === 'standalone') { + await probePostgres(pgUrl); + if (valkeyUrl) { + await probeValkey(valkeyUrl); + } + return; + } + + // tier === 'federated' + // Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL. + if (config.queue.type !== 'bullmq') { + throw new TierDetectionError({ + service: 'config', + host: 'localhost', + port: 0, + remediation: + "Federated tier requires queue.type === 'bullmq'. " + + "Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.", + }); + } + const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL; + await probePostgres(pgUrl); + await probeValkey(federatedValkeyUrl); + await probePgvector(pgUrl); +} + +/** + * Non-throwing variant for `mosaic gateway doctor`. + * + * Probes ALL required services even if some fail, returning a structured report. + * Services not required for the current tier are reported as `skipped`. + * + * Overall status: + * - `green` — all required services OK + * - `yellow` — all required services OK, but a non-critical check failed + * (currently unused — reserved for future optional probes) + * - `red` — at least one required service failed + */ +export async function probeServiceHealth( + config: TierConfig, + configPath?: string, +): Promise { + const tier = config.tier; + + // local tier: PGlite is in-process, no external services needed. + if (tier === 'local') { + return { + tier, + configPath, + overall: 'green', + services: [ + { name: 'postgres', status: 'skipped', durationMs: 0 }, + { name: 'valkey', status: 'skipped', durationMs: 0 }, + { name: 'pgvector', status: 'skipped', durationMs: 0 }, + ], + }; + } + + const pgUrl = + config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic'; + + const valkeyUrl = + config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null; + + const services: ServiceCheck[] = []; + let hasFailure = false; + + if (tier === 'standalone') { + // Postgres — required + const pgResult = await probePostgresMeasured(pgUrl); + if (pgResult.error) { + hasFailure = true; + services.push({ + name: 'postgres', + status: 'fail', + host: pgResult.host, + port: pgResult.port, + durationMs: pgResult.durationMs, + error: pgResult.error, + }); + } else { + services.push({ + name: 'postgres', + status: 'ok', + host: pgResult.host, + port: pgResult.port, + durationMs: pgResult.durationMs, + }); + } + + // Valkey — required if bullmq + if (valkeyUrl) { + const vkResult = await probeValkeyMeasured(valkeyUrl); + if (vkResult.error) { + hasFailure = true; + services.push({ + name: 'valkey', + status: 'fail', + host: vkResult.host, + port: vkResult.port, + durationMs: vkResult.durationMs, + error: vkResult.error, + }); + } else { + services.push({ + name: 'valkey', + status: 'ok', + host: vkResult.host, + port: vkResult.port, + durationMs: vkResult.durationMs, + }); + } + } else { + services.push({ name: 'valkey', status: 'skipped', durationMs: 0 }); + } + + // pgvector — not required for standalone + services.push({ name: 'pgvector', status: 'skipped', durationMs: 0 }); + + return { + tier, + configPath, + overall: hasFailure ? 'red' : 'green', + services, + }; + } + + // tier === 'federated' + // Postgres — required + const pgResult = await probePostgresMeasured(pgUrl); + if (pgResult.error) { + hasFailure = true; + services.push({ + name: 'postgres', + status: 'fail', + host: pgResult.host, + port: pgResult.port, + durationMs: pgResult.durationMs, + error: pgResult.error, + }); + } else { + services.push({ + name: 'postgres', + status: 'ok', + host: pgResult.host, + port: pgResult.port, + durationMs: pgResult.durationMs, + }); + } + + // Valkey — required for federated (queue.type must be bullmq) + if (config.queue.type !== 'bullmq') { + hasFailure = true; + services.push({ + name: 'valkey', + status: 'fail', + host: 'localhost', + port: 0, + durationMs: 0, + error: { + message: "Federated tier requires queue.type === 'bullmq'", + remediation: + "Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.", + }, + }); + } else { + const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL; + const vkResult = await probeValkeyMeasured(federatedValkeyUrl); + if (vkResult.error) { + hasFailure = true; + services.push({ + name: 'valkey', + status: 'fail', + host: vkResult.host, + port: vkResult.port, + durationMs: vkResult.durationMs, + error: vkResult.error, + }); + } else { + services.push({ + name: 'valkey', + status: 'ok', + host: vkResult.host, + port: vkResult.port, + durationMs: vkResult.durationMs, + }); + } + } + + // pgvector — required for federated + const pvResult = await probePgvectorMeasured(pgUrl); + if (pvResult.error) { + hasFailure = true; + services.push({ + name: 'pgvector', + status: 'fail', + host: pvResult.host, + port: pvResult.port, + durationMs: pvResult.durationMs, + error: pvResult.error, + }); + } else { + services.push({ + name: 'pgvector', + status: 'ok', + host: pvResult.host, + port: pvResult.port, + durationMs: pvResult.durationMs, + }); + } + + return { + tier, + configPath, + overall: hasFailure ? 'red' : 'green', + services, + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7e90200..bc9da3f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -648,6 +648,9 @@ importers: commander: specifier: ^13.0.0 version: 13.1.0 + ioredis: + specifier: ^5.10.0 + version: 5.10.0 postgres: specifier: ^3.4.8 version: 3.4.8 @@ -698,10 +701,10 @@ importers: dependencies: '@mariozechner/pi-agent-core': specifier: ^0.63.1 - version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) + version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) '@mariozechner/pi-ai': specifier: ^0.63.1 - version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) + version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) '@sinclair/typebox': specifier: ^0.34.41 version: 0.34.48 @@ -7259,12 +7262,6 @@ snapshots: '@jridgewell/gen-mapping': 0.3.13 '@jridgewell/trace-mapping': 0.3.31 - '@anthropic-ai/sdk@0.73.0(zod@3.25.76)': - dependencies: - json-schema-to-ts: 3.1.1 - optionalDependencies: - zod: 3.25.76 - '@anthropic-ai/sdk@0.73.0(zod@4.3.6)': dependencies: json-schema-to-ts: 3.1.1 @@ -8606,18 +8603,6 @@ snapshots: - ws - zod - '@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)': - dependencies: - '@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) - transitivePeerDependencies: - - '@modelcontextprotocol/sdk' - - aws-crt - - bufferutil - - supports-color - - utf-8-validate - - ws - - zod - '@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': dependencies: '@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) @@ -8666,30 +8651,6 @@ snapshots: - ws - zod - '@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)': - dependencies: - '@anthropic-ai/sdk': 0.73.0(zod@3.25.76) - '@aws-sdk/client-bedrock-runtime': 3.1008.0 - '@google/genai': 1.45.0(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6)) - '@mistralai/mistralai': 1.14.1 - '@sinclair/typebox': 0.34.48 - ajv: 8.18.0 - ajv-formats: 3.0.1(ajv@8.18.0) - chalk: 5.6.2 - openai: 6.26.0(ws@8.20.0)(zod@3.25.76) - partial-json: 0.1.7 - proxy-agent: 6.5.0 - undici: 7.24.3 - zod-to-json-schema: 3.25.1(zod@3.25.76) - transitivePeerDependencies: - - '@modelcontextprotocol/sdk' - - aws-crt - - bufferutil - - supports-color - - utf-8-validate - - ws - - zod - '@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': dependencies: '@anthropic-ai/sdk': 0.73.0(zod@4.3.6) @@ -13185,11 +13146,6 @@ snapshots: dependencies: mimic-function: 5.0.1 - openai@6.26.0(ws@8.20.0)(zod@3.25.76): - optionalDependencies: - ws: 8.20.0 - zod: 3.25.76 - openai@6.26.0(ws@8.20.0)(zod@4.3.6): optionalDependencies: ws: 8.20.0