feat(gateway,storage): mosaic gateway doctor with tier health JSON (FED-M1-06) #475

Merged
jason.woltje merged 1 commits from feat/federation-m1-doctor into main 2026-04-20 01:00:40 +00:00
11 changed files with 1233 additions and 289 deletions

View File

@@ -1,220 +0,0 @@
/**
* Tier Detector — pre-flight service reachability assertions.
*
* Runs BEFORE NestFactory.create() to surface actionable errors immediately
* rather than crashing mid-boot with an opaque stack trace.
*
* Library choices:
* - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm).
* The spec mentions `pg`, but only `postgres` is installed in this monorepo.
* - Valkey: `ioredis` (already a dep via @mosaicstack/queue → bullmq; same URL
* convention used by the bullmq adapter).
*/
import postgres from 'postgres';
import { Redis } from 'ioredis';
import type { MosaicConfig } from '@mosaicstack/config';
/* ------------------------------------------------------------------ */
/* Structured error type */
/* ------------------------------------------------------------------ */
export class TierDetectionError extends Error {
public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config';
public readonly host: string;
public readonly port: number;
public readonly remediation: string;
constructor(opts: {
service: 'postgres' | 'valkey' | 'pgvector' | 'config';
host: string;
port: number;
remediation: string;
cause?: unknown;
}) {
const message =
`[tier-detector] ${opts.service} unreachable or unusable at ` +
`${opts.host}:${opts.port}${opts.remediation}`;
super(message, { cause: opts.cause });
this.name = 'TierDetectionError';
this.service = opts.service;
this.host = opts.host;
this.port = opts.port;
this.remediation = opts.remediation;
}
}
/* ------------------------------------------------------------------ */
/* URL helpers */
/* ------------------------------------------------------------------ */
/** Extract host and port from a URL string, returning safe fallbacks on parse failure. */
function parseHostPort(url: string, defaultPort: number): { host: string; port: number } {
try {
const parsed = new URL(url);
const host = parsed.hostname || 'unknown';
const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort;
return { host, port };
} catch {
return { host: 'unknown', port: defaultPort };
}
}
/* ------------------------------------------------------------------ */
/* Postgres probe */
/* ------------------------------------------------------------------ */
async function probePostgres(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// Run a trivial query to confirm connectivity.
await sql`SELECT 1`;
} catch (cause) {
throw new TierDetectionError({
service: 'postgres',
host,
port,
remediation:
'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`',
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors — we already have what we need.
});
}
}
}
/* ------------------------------------------------------------------ */
/* pgvector probe */
/* ------------------------------------------------------------------ */
async function probePgvector(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// This succeeds whether the extension is already installed or freshly created.
// It errors only if the pgvector shared library is missing from the Postgres binary.
await sql`CREATE EXTENSION IF NOT EXISTS vector`;
} catch (cause) {
const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : '';
const isLibraryMissing = causeMsg.includes('extension "vector" is not available');
const remediation = isLibraryMissing
? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.'
: 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.';
throw new TierDetectionError({
service: 'pgvector',
host,
port,
remediation,
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors.
});
}
}
}
/* ------------------------------------------------------------------ */
/* Valkey probe */
/* ------------------------------------------------------------------ */
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
async function probeValkey(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 6380);
const client = new Redis(url, {
enableReadyCheck: false,
maxRetriesPerRequest: 0,
retryStrategy: () => null, // no retries — fail fast
lazyConnect: true,
connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt
});
try {
await client.connect();
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Unexpected PING response: ${pong}`);
}
} catch (cause) {
throw new TierDetectionError({
service: 'valkey',
host,
port,
remediation:
'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`',
cause,
});
} finally {
client.disconnect();
}
}
/* ------------------------------------------------------------------ */
/* Public entry point */
/* ------------------------------------------------------------------ */
/**
* Assert that all services required by `config.tier` are reachable.
*
* - `local` — no-op (PGlite is in-process; no external services).
* - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq').
* - `federated` — assert Postgres + Valkey + pgvector installability.
*
* Throws `TierDetectionError` on the first failure with host:port and
* a remediation hint.
*/
export async function detectAndAssertTier(config: MosaicConfig): Promise<void> {
if (config.tier === 'local') {
// PGlite runs in-process — nothing to probe.
return;
}
const pgUrl =
config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic';
const valkeyUrl =
config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null;
if (config.tier === 'standalone') {
await probePostgres(pgUrl);
if (valkeyUrl) {
await probeValkey(valkeyUrl);
}
return;
}
// tier === 'federated'
// Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL.
if (config.queue.type !== 'bullmq') {
throw new TierDetectionError({
service: 'config',
host: 'localhost',
port: 0,
remediation:
"Federated tier requires queue.type === 'bullmq'. " +
"Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.",
});
}
const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL;
await probePostgres(pgUrl);
await probeValkey(federatedValkeyUrl);
await probePgvector(pgUrl);
}

View File

@@ -25,7 +25,7 @@ import { AppModule } from './app.module.js';
import { mountAuthHandler } from './auth/auth.controller.js'; import { mountAuthHandler } from './auth/auth.controller.js';
import { mountMcpHandler } from './mcp/mcp.controller.js'; import { mountMcpHandler } from './mcp/mcp.controller.js';
import { McpService } from './mcp/mcp.service.js'; import { McpService } from './mcp/mcp.service.js';
import { detectAndAssertTier, TierDetectionError } from './bootstrap/tier-detector.js'; import { detectAndAssertTier, TierDetectionError } from '@mosaicstack/storage';
async function bootstrap(): Promise<void> { async function bootstrap(): Promise<void> {
const logger = new Logger('Bootstrap'); const logger = new Logger('Bootstrap');

View File

@@ -21,8 +21,8 @@ Goal: Gateway runs in `federated` tier with containerized PG+pgvector+Valkey. No
| FED-M1-02 | done | Author `docker-compose.federated.yml` as an overlay profile: Postgres 17 + pgvector extension (port 5433), Valkey (6380), named volumes, healthchecks. Compose-up should boot cleanly on a clean machine. | #460 | sonnet | feat/federation-m1-compose | FED-M1-01 | 5K | Shipped in PR #471. Overlay defines `postgres-federated`/`valkey-federated`, profile-gated, with pg-init for pgvector extension. | | FED-M1-02 | done | Author `docker-compose.federated.yml` as an overlay profile: Postgres 17 + pgvector extension (port 5433), Valkey (6380), named volumes, healthchecks. Compose-up should boot cleanly on a clean machine. | #460 | sonnet | feat/federation-m1-compose | FED-M1-01 | 5K | Shipped in PR #471. Overlay defines `postgres-federated`/`valkey-federated`, profile-gated, with pg-init for pgvector extension. |
| FED-M1-03 | done | Add pgvector support to `packages/storage/src/adapters/postgres.ts`: create extension on init (idempotent), expose vector column type in schema helpers. No adapter changes for non-federated tiers. | #460 | sonnet | feat/federation-m1-pgvector | FED-M1-02 | 8K | Shipped in PR #472. `enableVector` flag on postgres StorageConfig; idempotent CREATE EXTENSION before migrations. | | FED-M1-03 | done | Add pgvector support to `packages/storage/src/adapters/postgres.ts`: create extension on init (idempotent), expose vector column type in schema helpers. No adapter changes for non-federated tiers. | #460 | sonnet | feat/federation-m1-pgvector | FED-M1-02 | 8K | Shipped in PR #472. `enableVector` flag on postgres StorageConfig; idempotent CREATE EXTENSION before migrations. |
| FED-M1-04 | done | Implement `apps/gateway/src/bootstrap/tier-detector.ts`: reads config, asserts PG/Valkey/pgvector reachable for `federated`, fail-fast with actionable error message on failure. Unit tests for each failure mode. | #460 | sonnet | feat/federation-m1-detector | FED-M1-03 | 8K | Shipped in PR #473. 12 tests; 5s timeouts on probes; pgvector library/permission discrimination; rejects non-bullmq for federated. | | FED-M1-04 | done | Implement `apps/gateway/src/bootstrap/tier-detector.ts`: reads config, asserts PG/Valkey/pgvector reachable for `federated`, fail-fast with actionable error message on failure. Unit tests for each failure mode. | #460 | sonnet | feat/federation-m1-detector | FED-M1-03 | 8K | Shipped in PR #473. 12 tests; 5s timeouts on probes; pgvector library/permission discrimination; rejects non-bullmq for federated. |
| FED-M1-05 | in-progress | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | sonnet | feat/federation-m1-migrate | FED-M1-04 | 10K | Do NOT run automatically. CLI subcommand `mosaic migrate tier --to federated --dry-run`. Safety rails. | | FED-M1-05 | done | Write `scripts/migrate-to-federated.ts`: one-way migration from `local` (PGlite) / `standalone` (PG without pgvector) → `federated`. Dumps, transforms, loads; dry-run + confirm UX. Idempotent on re-run. | #460 | sonnet | feat/federation-m1-migrate | FED-M1-04 | 10K | Shipped in PR #474. `mosaic storage migrate-tier`; DrizzleMigrationSource (corrects P0 found in review); 32 tests; idempotent. |
| FED-M1-06 | not-started | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. | | FED-M1-06 | in-progress | Update `mosaic doctor`: report current tier, required services, actual health per service, pgvector presence, overall green/yellow/red. Machine-readable JSON output flag for CI use. | #460 | sonnet | feat/federation-m1-doctor | FED-M1-04 | 6K | Existing doctor output evolves; add `--json` flag. Green/yellow/red + remediation suggestions per issue. |
| FED-M1-07 | not-started | Integration test: gateway boots in `federated` tier with docker-compose `federated` profile; refuses to boot when PG unreachable (asserts fail-fast); pgvector extension query succeeds. | #460 | sonnet | feat/federation-m1-integration | FED-M1-04 | 8K | Vitest + docker-compose test profile. One test file per assertion; real services, no mocks. | | FED-M1-07 | not-started | Integration test: gateway boots in `federated` tier with docker-compose `federated` profile; refuses to boot when PG unreachable (asserts fail-fast); pgvector extension query succeeds. | #460 | sonnet | feat/federation-m1-integration | FED-M1-04 | 8K | Vitest + docker-compose test profile. One test file per assertion; real services, no mocks. |
| FED-M1-08 | not-started | Integration test for migration script: seed a local PGlite with representative data (tasks, notes, users, teams), run migration, assert row counts + key samples equal on federated PG. | #460 | sonnet | feat/federation-m1-migrate-test | FED-M1-05 | 6K | Runs against docker-compose federated profile; uses temp PGlite file; deterministic seed. | | FED-M1-08 | not-started | Integration test for migration script: seed a local PGlite with representative data (tasks, notes, users, teams), run migration, assert row counts + key samples equal on federated PG. | #460 | sonnet | feat/federation-m1-migrate-test | FED-M1-05 | 6K | Runs against docker-compose federated profile; uses temp PGlite file; deterministic seed. |
| FED-M1-09 | not-started | Standalone regression: full agent-session E2E on existing `standalone` tier with a gateway built from this branch. Must pass without referencing any federation module. | #460 | haiku | feat/federation-m1-regression | FED-M1-07 | 4K | Reuse existing e2e harness; just re-point at the federation branch build. Canary that we didn't break it. | | FED-M1-09 | not-started | Standalone regression: full agent-session E2E on existing `standalone` tier with a gateway built from this branch. Must pass without referencing any federation module. | #460 | haiku | feat/federation-m1-regression | FED-M1-07 | 4K | Reuse existing e2e harness; just re-point at the federation branch build. Canary that we didn't break it. |

View File

@@ -0,0 +1,294 @@
/**
* Unit tests for gateway-doctor.ts (mosaic gateway doctor).
*
* All external I/O is mocked — no live services required.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { TierHealthReport } from '@mosaicstack/storage';
/* ------------------------------------------------------------------ */
/* Shared mock state */
/* ------------------------------------------------------------------ */
const mocks = vi.hoisted(() => {
const mockLoadConfig = vi.fn();
const mockProbeServiceHealth = vi.fn();
const mockExistsSync = vi.fn();
return { mockLoadConfig, mockProbeServiceHealth, mockExistsSync };
});
/* ------------------------------------------------------------------ */
/* Module mocks */
/* ------------------------------------------------------------------ */
vi.mock('@mosaicstack/config', () => ({
loadConfig: mocks.mockLoadConfig,
}));
vi.mock('@mosaicstack/storage', () => ({
probeServiceHealth: mocks.mockProbeServiceHealth,
}));
vi.mock('node:fs', () => ({
existsSync: mocks.mockExistsSync,
}));
/* ------------------------------------------------------------------ */
/* Import SUT */
/* ------------------------------------------------------------------ */
import { runGatewayDoctor } from './gateway-doctor.js';
import type { MosaicConfig } from '@mosaicstack/config';
/* ------------------------------------------------------------------ */
/* Fixtures */
/* ------------------------------------------------------------------ */
const STANDALONE_CONFIG: MosaicConfig = {
tier: 'standalone',
storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@localhost:5432/mosaic' },
queue: { type: 'bullmq', url: 'redis://localhost:6380' },
memory: { type: 'keyword' },
};
const GREEN_REPORT: TierHealthReport = {
tier: 'standalone',
configPath: '/some/mosaic.config.json',
overall: 'green',
services: [
{ name: 'postgres', status: 'ok', host: 'localhost', port: 5432, durationMs: 42 },
{ name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 10 },
{ name: 'pgvector', status: 'skipped', durationMs: 0 },
],
};
const RED_REPORT: TierHealthReport = {
tier: 'standalone',
configPath: '/some/mosaic.config.json',
overall: 'red',
services: [
{
name: 'postgres',
status: 'fail',
host: 'localhost',
port: 5432,
durationMs: 5001,
error: {
message: 'connection refused',
remediation: 'Start Postgres: `docker compose ...`',
},
},
{ name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 8 },
{ name: 'pgvector', status: 'skipped', durationMs: 0 },
],
};
const FEDERATED_GREEN_REPORT: TierHealthReport = {
tier: 'federated',
configPath: '/some/mosaic.config.json',
overall: 'green',
services: [
{ name: 'postgres', status: 'ok', host: 'localhost', port: 5433, durationMs: 30 },
{ name: 'valkey', status: 'ok', host: 'localhost', port: 6380, durationMs: 5 },
{ name: 'pgvector', status: 'ok', host: 'localhost', port: 5433, durationMs: 25 },
],
};
/* ------------------------------------------------------------------ */
/* Process helpers */
/* ------------------------------------------------------------------ */
let stdoutCapture = '';
let exitCode: number | undefined;
function captureOutput(): void {
stdoutCapture = '';
exitCode = undefined;
vi.spyOn(process.stdout, 'write').mockImplementation((chunk) => {
stdoutCapture += typeof chunk === 'string' ? chunk : chunk.toString();
return true;
});
vi.spyOn(process.stderr, 'write').mockImplementation(() => true);
vi.spyOn(process, 'exit').mockImplementation((code?: string | number | null) => {
exitCode = typeof code === 'number' ? code : code != null ? Number(code) : undefined;
throw new Error(`process.exit(${String(code)})`);
});
vi.spyOn(console, 'log').mockImplementation((...args: unknown[]) => {
stdoutCapture += args.join(' ') + '\n';
});
}
/* ------------------------------------------------------------------ */
/* Tests */
/* ------------------------------------------------------------------ */
describe('runGatewayDoctor', () => {
beforeEach(() => {
vi.clearAllMocks();
captureOutput();
// By default: no config file on disk (env-detection path)
mocks.mockExistsSync.mockReturnValue(false);
mocks.mockLoadConfig.mockReturnValue(STANDALONE_CONFIG);
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
});
afterEach(() => {
vi.restoreAllMocks();
});
/* ---------------------------------------------------------------- */
/* 1. JSON mode: parseable JSON matching the schema */
/* ---------------------------------------------------------------- */
it('JSON mode emits parseable JSON matching TierHealthReport schema', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
await runGatewayDoctor({ json: true });
const parsed = JSON.parse(stdoutCapture) as TierHealthReport;
expect(parsed.tier).toBe('standalone');
expect(parsed.overall).toBe('green');
expect(Array.isArray(parsed.services)).toBe(true);
expect(parsed.services).toHaveLength(3);
// Validate shape of each service check
for (const svc of parsed.services) {
expect(['postgres', 'valkey', 'pgvector']).toContain(svc.name);
expect(['ok', 'fail', 'skipped']).toContain(svc.status);
expect(typeof svc.durationMs).toBe('number');
}
// JSON mode must be silent on console.log — output goes to process.stdout only.
expect(console.log).not.toHaveBeenCalled();
});
it('JSON mode for federated with 3 ok services', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(FEDERATED_GREEN_REPORT);
await runGatewayDoctor({ json: true });
const parsed = JSON.parse(stdoutCapture) as TierHealthReport;
expect(parsed.tier).toBe('federated');
expect(parsed.overall).toBe('green');
expect(parsed.services.every((s) => s.status === 'ok')).toBe(true);
// JSON mode must be silent on console.log — output goes to process.stdout only.
expect(console.log).not.toHaveBeenCalled();
});
/* ---------------------------------------------------------------- */
/* 2. Plain text mode: service lines and overall verdict */
/* ---------------------------------------------------------------- */
it('plain text mode includes service lines for each service', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
await runGatewayDoctor({});
expect(stdoutCapture).toContain('postgres');
expect(stdoutCapture).toContain('valkey');
expect(stdoutCapture).toContain('pgvector');
});
it('plain text mode includes Overall verdict', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
await runGatewayDoctor({});
expect(stdoutCapture).toContain('Overall: GREEN');
});
it('plain text mode shows tier and config path in header', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
await runGatewayDoctor({});
expect(stdoutCapture).toContain('Tier: standalone');
});
it('plain text mode shows remediation for failed services', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT);
try {
await runGatewayDoctor({});
} catch {
// process.exit throws in test
}
expect(stdoutCapture).toContain('Remediations:');
expect(stdoutCapture).toContain('Start Postgres');
});
/* ---------------------------------------------------------------- */
/* 3. Exit codes */
/* ---------------------------------------------------------------- */
it('exits with code 1 when overall is red', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT);
await expect(runGatewayDoctor({})).rejects.toThrow('process.exit(1)');
expect(exitCode).toBe(1);
});
it('exits with code 0 (no exit call) when overall is green', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(GREEN_REPORT);
await runGatewayDoctor({});
// process.exit should NOT have been called for green.
expect(exitCode).toBeUndefined();
});
it('JSON mode exits with code 1 when overall is red', async () => {
mocks.mockProbeServiceHealth.mockResolvedValue(RED_REPORT);
await expect(runGatewayDoctor({ json: true })).rejects.toThrow('process.exit(1)');
expect(exitCode).toBe(1);
});
/* ---------------------------------------------------------------- */
/* 4. --config path override is honored */
/* ---------------------------------------------------------------- */
it('passes --config path to loadConfig when provided', async () => {
const customPath = '/custom/path/mosaic.config.json';
await runGatewayDoctor({ config: customPath });
// loadConfig should have been called with the resolved custom path.
expect(mocks.mockLoadConfig).toHaveBeenCalledWith(
expect.stringContaining('mosaic.config.json'),
);
// The exact call should include the custom path (resolved).
const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined];
expect(calledPath).toContain('custom/path/mosaic.config.json');
});
it('calls loadConfig without path when no --config and no file on disk', async () => {
mocks.mockExistsSync.mockReturnValue(false);
await runGatewayDoctor({});
const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined];
// When no file found, resolveConfigPath returns undefined, so loadConfig is called with undefined
expect(calledPath).toBeUndefined();
});
it('finds config from cwd when mosaic.config.json exists there', async () => {
// First candidate (cwd/mosaic.config.json) exists.
mocks.mockExistsSync.mockImplementation((p: unknown) => {
return typeof p === 'string' && p.endsWith('mosaic.config.json');
});
await runGatewayDoctor({});
const [calledPath] = mocks.mockLoadConfig.mock.calls[0] as [string | undefined];
expect(calledPath).toBeDefined();
expect(typeof calledPath).toBe('string');
expect(calledPath!.endsWith('mosaic.config.json')).toBe(true);
});
});

View File

@@ -0,0 +1,143 @@
/**
* gateway-doctor.ts — `mosaic gateway doctor` implementation.
*
* Reports current tier and per-service health (PG, Valkey, pgvector) for the
* Mosaic gateway. Supports machine-readable JSON output for CI.
*
* Exit codes:
* 0 — overall green or yellow
* 1 — overall red (at least one required service failed)
*/
import { existsSync } from 'node:fs';
import { resolve, join } from 'node:path';
import { homedir } from 'node:os';
import { loadConfig } from '@mosaicstack/config';
import { probeServiceHealth } from '@mosaicstack/storage';
import type { TierHealthReport, ServiceCheck } from '@mosaicstack/storage';
/* ------------------------------------------------------------------ */
/* Config resolution */
/* ------------------------------------------------------------------ */
const CONFIG_CANDIDATES = [
resolve(process.cwd(), 'mosaic.config.json'),
join(homedir(), '.mosaic', 'mosaic.config.json'),
];
/**
* Resolve the config path to report in output.
*
* Priority:
* 1. Explicit `--config <path>` flag
* 2. `./mosaic.config.json` (cwd)
* 3. `~/.mosaic/mosaic.config.json`
* 4. undefined — `loadConfig()` falls back to env-var detection
*
* `loadConfig()` itself already handles priority 1-3 when passed an explicit
* path, and falls back to env-detection when none exists. We resolve here
* only so we can surface the path in the health report.
*/
function resolveConfigPath(explicit?: string): string | undefined {
if (explicit) return resolve(explicit);
for (const candidate of CONFIG_CANDIDATES) {
if (existsSync(candidate)) return candidate;
}
return undefined;
}
/* ------------------------------------------------------------------ */
/* Output helpers */
/* ------------------------------------------------------------------ */
const TICK = '\u2713'; // ✓
const CROSS = '\u2717'; // ✗
const SKIP = '-';
function padRight(s: string, n: number): string {
return s + ' '.repeat(Math.max(0, n - s.length));
}
function serviceLabel(svc: ServiceCheck): string {
const hostPort =
svc.host !== undefined && svc.port !== undefined ? `${svc.host}:${svc.port.toString()}` : '';
const duration = `(${svc.durationMs.toString()}ms)`;
switch (svc.status) {
case 'ok':
return ` ${TICK} ${padRight(svc.name, 10)} ${padRight(hostPort, 22)} ${duration}`;
case 'fail': {
const errMsg = svc.error?.message ?? 'unknown error';
return ` ${CROSS} ${padRight(svc.name, 10)} ${padRight(hostPort, 22)} ${duration} \u2192 ${errMsg}`;
}
case 'skipped':
return ` ${SKIP} ${padRight(svc.name, 10)} (skipped)`;
}
}
function printReport(report: TierHealthReport): void {
const configDisplay = report.configPath ?? '(auto-detected)';
console.log(`Tier: ${report.tier} Config: ${configDisplay}`);
console.log('');
for (const svc of report.services) {
console.log(serviceLabel(svc));
}
console.log('');
// Print remediations for failed services.
const failed = report.services.filter((s) => s.status === 'fail' && s.error);
if (failed.length > 0) {
console.log('Remediations:');
for (const svc of failed) {
if (svc.error) {
console.log(` ${svc.name}: ${svc.error.remediation}`);
}
}
console.log('');
}
console.log(`Overall: ${report.overall.toUpperCase()}`);
}
/* ------------------------------------------------------------------ */
/* Main runner */
/* ------------------------------------------------------------------ */
export interface GatewayDoctorOptions {
json?: boolean;
config?: string;
}
export async function runGatewayDoctor(opts: GatewayDoctorOptions): Promise<void> {
const configPath = resolveConfigPath(opts.config);
let mosaicConfig;
try {
mosaicConfig = loadConfig(configPath);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (opts.json) {
process.stdout.write(
JSON.stringify({ error: `Failed to load config: ${msg}` }, null, 2) + '\n',
);
} else {
process.stderr.write(`Error: Failed to load config: ${msg}\n`);
}
process.exit(1);
}
const report = await probeServiceHealth(mosaicConfig, configPath);
if (opts.json) {
process.stdout.write(JSON.stringify(report, null, 2) + '\n');
} else {
printReport(report);
}
// Exit 1 if overall is red.
if (report.overall === 'red') {
process.exit(1);
}
}

View File

@@ -206,4 +206,15 @@ export function registerGatewayCommand(program: Command): void {
const { runUninstall } = await import('./gateway/uninstall.js'); const { runUninstall } = await import('./gateway/uninstall.js');
await runUninstall(); await runUninstall();
}); });
// ─── doctor ─────────────────────────────────────────────────────────────────
gw.command('doctor')
.description('Check gateway tier and per-service health (PG, Valkey, pgvector)')
.option('--json', 'Emit TierHealthReport as JSON to stdout (suppresses all other output)')
.option('--config <path>', 'Path to mosaic.config.json (defaults to cwd or ~/.mosaic/)')
.action(async (cmdOpts: { json?: boolean; config?: string }) => {
const { runGatewayDoctor } = await import('./gateway-doctor.js');
await runGatewayDoctor({ json: cmdOpts.json, config: cmdOpts.config });
});
} }

View File

@@ -25,6 +25,7 @@
"@mosaicstack/db": "workspace:^", "@mosaicstack/db": "workspace:^",
"@mosaicstack/types": "workspace:*", "@mosaicstack/types": "workspace:*",
"commander": "^13.0.0", "commander": "^13.0.0",
"ioredis": "^5.10.0",
"postgres": "^3.4.8" "postgres": "^3.4.8"
}, },
"devDependencies": { "devDependencies": {

View File

@@ -1,4 +1,6 @@
export type { StorageAdapter, StorageConfig } from './types.js'; export type { StorageAdapter, StorageConfig } from './types.js';
export { TierDetectionError, detectAndAssertTier, probeServiceHealth } from './tier-detection.js';
export type { ServiceCheck, TierHealthReport } from './tier-detection.js';
export { createStorageAdapter, registerStorageAdapter } from './factory.js'; export { createStorageAdapter, registerStorageAdapter } from './factory.js';
export { PostgresAdapter } from './adapters/postgres.js'; export { PostgresAdapter } from './adapters/postgres.js';
export { PgliteAdapter } from './adapters/pglite.js'; export { PgliteAdapter } from './adapters/pglite.js';

View File

@@ -1,5 +1,5 @@
/** /**
* Unit tests for tier-detector.ts. * Unit tests for tier-detection.ts.
* *
* All external I/O (postgres, ioredis) is mocked no live services required. * All external I/O (postgres, ioredis) is mocked no live services required.
* *
@@ -59,28 +59,26 @@ vi.mock('ioredis', () => ({
/* Import SUT after mocks are registered */ /* Import SUT after mocks are registered */
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
import { detectAndAssertTier, TierDetectionError } from './tier-detector.js'; import { detectAndAssertTier, probeServiceHealth, TierDetectionError } from './tier-detection.js';
import type { MosaicConfig } from '@mosaicstack/config'; import type { TierConfig } from './tier-detection.js';
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
/* Config fixtures */ /* Config fixtures */
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
const LOCAL_CONFIG: MosaicConfig = { const LOCAL_CONFIG: TierConfig = {
tier: 'local', tier: 'local',
storage: { type: 'pglite', dataDir: '.mosaic/pglite' }, storage: { type: 'pglite', dataDir: '.mosaic/pglite' },
queue: { type: 'local', dataDir: '.mosaic/queue' }, queue: { type: 'local' },
memory: { type: 'keyword' },
}; };
const STANDALONE_CONFIG: MosaicConfig = { const STANDALONE_CONFIG: TierConfig = {
tier: 'standalone', tier: 'standalone',
storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@db-host:5432/mosaic' }, storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@db-host:5432/mosaic' },
queue: { type: 'bullmq', url: 'redis://valkey-host:6380' }, queue: { type: 'bullmq', url: 'redis://valkey-host:6380' },
memory: { type: 'keyword' },
}; };
const FEDERATED_CONFIG: MosaicConfig = { const FEDERATED_CONFIG: TierConfig = {
tier: 'federated', tier: 'federated',
storage: { storage: {
type: 'postgres', type: 'postgres',
@@ -88,7 +86,6 @@ const FEDERATED_CONFIG: MosaicConfig = {
enableVector: true, enableVector: true,
}, },
queue: { type: 'bullmq', url: 'redis://valkey-host:6380' }, queue: { type: 'bullmq', url: 'redis://valkey-host:6380' },
memory: { type: 'pgvector' },
}; };
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
@@ -232,15 +229,17 @@ describe('detectAndAssertTier', () => {
}); });
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
/* 7. probeValkey honors connectTimeout */ /* 7. probeValkey honors connectTimeout and lazyConnect */
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
it('constructs the ioredis Redis client with connectTimeout: 5000', async () => { it('constructs the ioredis Redis client with connectTimeout: 5000', async () => {
await detectAndAssertTier(STANDALONE_CONFIG); await detectAndAssertTier(STANDALONE_CONFIG);
expect(mocks.MockRedis).toHaveBeenCalledOnce(); expect(mocks.MockRedis).toHaveBeenCalledOnce();
const [, options] = mocks.MockRedis.mock.calls[0] as [string, Record<string, unknown>]; expect(mocks.MockRedis).toHaveBeenCalledWith(
expect(options).toMatchObject({ connectTimeout: 5000 }); expect.any(String),
expect.objectContaining({ connectTimeout: 5000, lazyConnect: true }),
);
}); });
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
@@ -269,7 +268,7 @@ describe('detectAndAssertTier', () => {
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
it('mentions CREATE permission or superuser in remediation for a generic pgvector error', async () => { it('mentions CREATE permission or superuser in remediation for a generic pgvector error', async () => {
// SELECT 1 succeeds; CREATE EXTENSION fails with a permission error (not the library-missing message). // SELECT 1 succeeds; CREATE EXTENSION fails with a permission error.
mocks.mockSqlFn mocks.mockSqlFn
.mockResolvedValueOnce([]) // SELECT 1 (probePostgres) .mockResolvedValueOnce([]) // SELECT 1 (probePostgres)
.mockRejectedValueOnce(new Error('permission denied to create extension')); .mockRejectedValueOnce(new Error('permission denied to create extension'));
@@ -293,15 +292,14 @@ describe('detectAndAssertTier', () => {
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
it('throws TierDetectionError with service=config for federated tier with queue.type !== bullmq', async () => { it('throws TierDetectionError with service=config for federated tier with queue.type !== bullmq', async () => {
const badConfig: MosaicConfig = { const badConfig: TierConfig = {
tier: 'federated', tier: 'federated',
storage: { storage: {
type: 'postgres', type: 'postgres',
url: 'postgresql://mosaic:mosaic@db-host:5433/mosaic', url: 'postgresql://mosaic:mosaic@db-host:5433/mosaic',
enableVector: true, enableVector: true,
}, },
queue: { type: 'local', dataDir: '.mosaic/queue' }, queue: { type: 'local' },
memory: { type: 'pgvector' },
}; };
try { try {
@@ -342,3 +340,207 @@ describe('detectAndAssertTier', () => {
expect(caught!.message).toContain('db-host:5432'); expect(caught!.message).toContain('db-host:5432');
}); });
}); });
/* ------------------------------------------------------------------ */
/* probeServiceHealth tests */
/* ------------------------------------------------------------------ */
describe('probeServiceHealth', () => {
beforeEach(() => {
vi.clearAllMocks();
mocks.mockSqlFn.mockResolvedValue([]);
mocks.mockEnd.mockResolvedValue(undefined);
mocks.mockRedisConnect.mockResolvedValue(undefined);
mocks.mockRedisPing.mockResolvedValue('PONG');
mocks.mockPostgresConstructor.mockImplementation(() => {
const sql = mocks.mockSqlFn as ReturnType<typeof mocks.mockSqlFn>;
(sql as unknown as Record<string, unknown>)['end'] = mocks.mockEnd;
return sql;
});
mocks.MockRedis.mockImplementation(() => ({
connect: mocks.mockRedisConnect,
ping: mocks.mockRedisPing,
disconnect: mocks.mockRedisDisconnect,
}));
});
/* ---------------------------------------------------------------- */
/* 12. local tier — all skipped, green */
/* ---------------------------------------------------------------- */
it('returns all services as skipped and overall green for local tier', async () => {
const report = await probeServiceHealth(LOCAL_CONFIG);
expect(report.tier).toBe('local');
expect(report.overall).toBe('green');
expect(report.services).toHaveLength(3);
for (const svc of report.services) {
expect(svc.status).toBe('skipped');
}
expect(mocks.mockPostgresConstructor).not.toHaveBeenCalled();
expect(mocks.MockRedis).not.toHaveBeenCalled();
});
/* ---------------------------------------------------------------- */
/* 13. postgres fails, valkey ok → red */
/* ---------------------------------------------------------------- */
it('returns red overall with postgres fail and valkey ok for standalone when postgres fails', async () => {
mocks.mockSqlFn.mockRejectedValue(new Error('connection refused'));
const report = await probeServiceHealth(STANDALONE_CONFIG);
expect(report.overall).toBe('red');
const pgCheck = report.services.find((s) => s.name === 'postgres');
expect(pgCheck?.status).toBe('fail');
expect(pgCheck?.error).toBeDefined();
expect(pgCheck?.error?.remediation).toContain('docker compose');
const vkCheck = report.services.find((s) => s.name === 'valkey');
expect(vkCheck?.status).toBe('ok');
});
/* ---------------------------------------------------------------- */
/* 14. federated all green → 3 services ok */
/* ---------------------------------------------------------------- */
it('returns green overall with all 3 services ok for federated when all pass', async () => {
mocks.mockSqlFn.mockResolvedValue([]);
const report = await probeServiceHealth(FEDERATED_CONFIG);
expect(report.tier).toBe('federated');
expect(report.overall).toBe('green');
expect(report.services).toHaveLength(3);
for (const svc of report.services) {
expect(svc.status).toBe('ok');
}
});
/* ---------------------------------------------------------------- */
/* 15. durationMs is a non-negative number for every service check */
/* ---------------------------------------------------------------- */
it('sets durationMs as a non-negative number for every service check', async () => {
mocks.mockSqlFn.mockResolvedValue([]);
const report = await probeServiceHealth(FEDERATED_CONFIG);
for (const svc of report.services) {
expect(typeof svc.durationMs).toBe('number');
expect(svc.durationMs).toBeGreaterThanOrEqual(0);
}
});
it('sets durationMs >= 0 even for skipped services (local tier)', async () => {
const report = await probeServiceHealth(LOCAL_CONFIG);
for (const svc of report.services) {
expect(typeof svc.durationMs).toBe('number');
expect(svc.durationMs).toBeGreaterThanOrEqual(0);
}
});
/* ---------------------------------------------------------------- */
/* 16. configPath is passed through to the report */
/* ---------------------------------------------------------------- */
it('includes configPath in the report when provided', async () => {
const report = await probeServiceHealth(LOCAL_CONFIG, '/etc/mosaic/mosaic.config.json');
expect(report.configPath).toBe('/etc/mosaic/mosaic.config.json');
});
/* ---------------------------------------------------------------- */
/* 17. standalone — valkey fails, postgres ok → red */
/* ---------------------------------------------------------------- */
it('returns red with valkey fail and postgres ok for standalone when valkey fails', async () => {
mocks.mockSqlFn.mockResolvedValue([]);
mocks.mockRedisConnect.mockRejectedValue(new Error('ECONNREFUSED'));
const report = await probeServiceHealth(STANDALONE_CONFIG);
expect(report.overall).toBe('red');
const pgCheck = report.services.find((s) => s.name === 'postgres');
expect(pgCheck?.status).toBe('ok');
const vkCheck = report.services.find((s) => s.name === 'valkey');
expect(vkCheck?.status).toBe('fail');
expect(vkCheck?.error).toBeDefined();
});
/* ---------------------------------------------------------------- */
/* 18. federated — pgvector fails → red with remediation */
/* ---------------------------------------------------------------- */
it('returns red with pgvector fail for federated when pgvector probe fails', async () => {
mocks.mockSqlFn
.mockResolvedValueOnce([]) // postgres SELECT 1
.mockRejectedValueOnce(new Error('extension "vector" is not available')); // pgvector
const report = await probeServiceHealth(FEDERATED_CONFIG);
expect(report.overall).toBe('red');
const pvCheck = report.services.find((s) => s.name === 'pgvector');
expect(pvCheck?.status).toBe('fail');
expect(pvCheck?.error?.remediation).toContain('pgvector/pgvector:pg17');
});
/* ---------------------------------------------------------------- */
/* 19. federated — non-bullmq queue → red config error, no network */
/* ---------------------------------------------------------------- */
it('returns red overall with config error when federated tier has non-bullmq queue (no network call)', async () => {
const federatedBadQueueConfig: TierConfig = {
tier: 'federated',
storage: {
type: 'postgres',
url: 'postgresql://mosaic:mosaic@db-host:5433/mosaic',
enableVector: true,
},
queue: { type: 'local' },
};
const report = await probeServiceHealth(federatedBadQueueConfig);
expect(report.overall).toBe('red');
const valkey = report.services.find((s) => s.name === 'valkey');
expect(valkey?.status).toBe('fail');
expect(valkey?.error?.remediation).toMatch(/bullmq/i);
// Critically: no network call was made — MockRedis constructor must NOT have been called.
expect(mocks.MockRedis).not.toHaveBeenCalled();
});
/* ---------------------------------------------------------------- */
/* 20. durationMs actually measures real elapsed time */
/* ---------------------------------------------------------------- */
it('measures real elapsed time for service probes', async () => {
const DELAY_MS = 25;
// Make the postgres mock introduce a real wall-clock delay.
mocks.mockSqlFn.mockImplementation(
() =>
new Promise((resolve) =>
setTimeout(() => {
resolve([]);
}, DELAY_MS),
),
);
const report = await probeServiceHealth(STANDALONE_CONFIG);
const pgCheck = report.services.find((s) => s.name === 'postgres');
expect(pgCheck).toBeDefined();
// Must be >= 20ms (small slack for jitter). Would be 0 if timer were stubbed.
expect(pgCheck!.durationMs).toBeGreaterThanOrEqual(20);
});
});

View File

@@ -0,0 +1,555 @@
/**
* Tier Detection — pre-flight service reachability probes.
*
* Lifted from apps/gateway/src/bootstrap/tier-detector.ts so both the gateway
* and the mosaic CLI can share the same probe logic without duplicating code or
* creating circular workspace dependencies.
*
* Library choices:
* - Postgres: `postgres` npm package (already a dep via @mosaicstack/db / drizzle-orm).
* - Valkey: `ioredis` (compatible with Valkey; same URL convention used by bullmq).
*/
import postgres from 'postgres';
import { Redis } from 'ioredis';
/* ------------------------------------------------------------------ */
/* Local structural type — avoids circular dependency */
/* ------------------------------------------------------------------ */
/**
* Minimal structural shape required for tier detection.
* Mirrors the relevant fields of MosaicConfig (from @mosaicstack/config) without
* creating a dependency cycle (config depends on storage for StorageConfig).
* Any object that satisfies MosaicConfig also satisfies this type.
*/
export interface TierConfig {
tier: 'local' | 'standalone' | 'federated';
storage:
| { type: 'pglite'; dataDir?: string }
| { type: 'postgres'; url: string; enableVector?: boolean }
| { type: 'files'; dataDir: string; format?: 'json' | 'md' };
queue: { type: string; url?: string };
}
/* ------------------------------------------------------------------ */
/* Public types */
/* ------------------------------------------------------------------ */
export interface ServiceCheck {
name: 'postgres' | 'valkey' | 'pgvector';
status: 'ok' | 'fail' | 'skipped';
host?: string;
port?: number;
durationMs: number;
error?: { message: string; remediation: string };
}
export interface TierHealthReport {
tier: 'local' | 'standalone' | 'federated';
configPath?: string;
overall: 'green' | 'yellow' | 'red';
services: ServiceCheck[];
}
/* ------------------------------------------------------------------ */
/* Structured error type */
/* ------------------------------------------------------------------ */
export class TierDetectionError extends Error {
public readonly service: 'postgres' | 'valkey' | 'pgvector' | 'config';
public readonly host: string;
public readonly port: number;
public readonly remediation: string;
constructor(opts: {
service: 'postgres' | 'valkey' | 'pgvector' | 'config';
host: string;
port: number;
remediation: string;
cause?: unknown;
}) {
const message =
`[tier-detector] ${opts.service} unreachable or unusable at ` +
`${opts.host}:${opts.port}${opts.remediation}`;
super(message, { cause: opts.cause });
this.name = 'TierDetectionError';
this.service = opts.service;
this.host = opts.host;
this.port = opts.port;
this.remediation = opts.remediation;
}
}
/* ------------------------------------------------------------------ */
/* URL helpers */
/* ------------------------------------------------------------------ */
/** Extract host and port from a URL string, returning safe fallbacks on parse failure. */
function parseHostPort(url: string, defaultPort: number): { host: string; port: number } {
try {
const parsed = new URL(url);
const host = parsed.hostname || 'unknown';
const port = parsed.port ? parseInt(parsed.port, 10) : defaultPort;
return { host, port };
} catch {
return { host: 'unknown', port: defaultPort };
}
}
/* ------------------------------------------------------------------ */
/* Internal probe results */
/* ------------------------------------------------------------------ */
interface ProbeResult {
host: string;
port: number;
durationMs: number;
error?: { message: string; remediation: string };
}
/* ------------------------------------------------------------------ */
/* Postgres probe */
/* ------------------------------------------------------------------ */
async function probePostgres(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// Run a trivial query to confirm connectivity.
await sql`SELECT 1`;
} catch (cause) {
throw new TierDetectionError({
service: 'postgres',
host,
port,
remediation:
'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`',
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors — we already have what we need.
});
}
}
}
async function probePostgresMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 5432);
const start = Date.now();
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
await sql`SELECT 1`;
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
return {
host,
port,
durationMs: Date.now() - start,
error: {
message: cause instanceof Error ? cause.message : String(cause),
remediation:
'Start Postgres: `docker compose -f docker-compose.federated.yml --profile federated up -d postgres-federated`',
},
};
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {});
}
}
}
/* ------------------------------------------------------------------ */
/* pgvector probe */
/* ------------------------------------------------------------------ */
async function probePgvector(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 5432);
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
// This succeeds whether the extension is already installed or freshly created.
// It errors only if the pgvector shared library is missing from the Postgres binary.
await sql`CREATE EXTENSION IF NOT EXISTS vector`;
} catch (cause) {
const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : '';
const isLibraryMissing = causeMsg.includes('extension "vector" is not available');
const remediation = isLibraryMissing
? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.'
: 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.';
throw new TierDetectionError({
service: 'pgvector',
host,
port,
remediation,
cause,
});
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {
// Ignore cleanup errors.
});
}
}
}
async function probePgvectorMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 5432);
const start = Date.now();
let sql: ReturnType<typeof postgres> | undefined;
try {
sql = postgres(url, {
max: 1,
connect_timeout: 5,
idle_timeout: 5,
});
await sql`CREATE EXTENSION IF NOT EXISTS vector`;
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
const causeMsg = cause instanceof Error ? cause.message.toLowerCase() : '';
const isLibraryMissing = causeMsg.includes('extension "vector" is not available');
const remediation = isLibraryMissing
? 'Use the `pgvector/pgvector:pg17` image, not the stock `postgres:17` image. See `docker-compose.federated.yml`.'
: 'The database role lacks permission to CREATE EXTENSION. Grant `CREATE` on the database, or run as a superuser.';
return {
host,
port,
durationMs: Date.now() - start,
error: { message: cause instanceof Error ? cause.message : String(cause), remediation },
};
} finally {
if (sql) {
await sql.end({ timeout: 2 }).catch(() => {});
}
}
}
/* ------------------------------------------------------------------ */
/* Valkey probe */
/* ------------------------------------------------------------------ */
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
async function probeValkey(url: string): Promise<void> {
const { host, port } = parseHostPort(url, 6380);
const client = new Redis(url, {
enableReadyCheck: false,
maxRetriesPerRequest: 0,
retryStrategy: () => null, // no retries — fail fast
lazyConnect: true,
connectTimeout: 5000, // fail-fast: 5-second hard cap on connection attempt
});
try {
await client.connect();
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Unexpected PING response: ${pong}`);
}
} catch (cause) {
throw new TierDetectionError({
service: 'valkey',
host,
port,
remediation:
'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`',
cause,
});
} finally {
client.disconnect();
}
}
async function probeValkeyMeasured(url: string): Promise<ProbeResult> {
const { host, port } = parseHostPort(url, 6380);
const start = Date.now();
const client = new Redis(url, {
enableReadyCheck: false,
maxRetriesPerRequest: 0,
retryStrategy: () => null,
lazyConnect: true,
connectTimeout: 5000,
});
try {
await client.connect();
const pong = await client.ping();
if (pong !== 'PONG') {
throw new Error(`Unexpected PING response: ${pong}`);
}
return { host, port, durationMs: Date.now() - start };
} catch (cause) {
return {
host,
port,
durationMs: Date.now() - start,
error: {
message: cause instanceof Error ? cause.message : String(cause),
remediation:
'Start Valkey: `docker compose -f docker-compose.federated.yml --profile federated up -d valkey-federated`',
},
};
} finally {
client.disconnect();
}
}
/* ------------------------------------------------------------------ */
/* Public entry points */
/* ------------------------------------------------------------------ */
/**
* Assert that all services required by `config.tier` are reachable.
*
* - `local` — no-op (PGlite is in-process; no external services).
* - `standalone` — assert Postgres + Valkey (if queue.type === 'bullmq').
* - `federated` — assert Postgres + Valkey + pgvector installability.
*
* Throws `TierDetectionError` on the first failure with host:port and
* a remediation hint.
*/
export async function detectAndAssertTier(config: TierConfig): Promise<void> {
if (config.tier === 'local') {
// PGlite runs in-process — nothing to probe.
return;
}
const pgUrl =
config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic';
const valkeyUrl =
config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null;
if (config.tier === 'standalone') {
await probePostgres(pgUrl);
if (valkeyUrl) {
await probeValkey(valkeyUrl);
}
return;
}
// tier === 'federated'
// Reject misconfigured queue upfront — federated requires bullmq + a Valkey URL.
if (config.queue.type !== 'bullmq') {
throw new TierDetectionError({
service: 'config',
host: 'localhost',
port: 0,
remediation:
"Federated tier requires queue.type === 'bullmq'. " +
"Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.",
});
}
const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL;
await probePostgres(pgUrl);
await probeValkey(federatedValkeyUrl);
await probePgvector(pgUrl);
}
/**
* Non-throwing variant for `mosaic gateway doctor`.
*
* Probes ALL required services even if some fail, returning a structured report.
* Services not required for the current tier are reported as `skipped`.
*
* Overall status:
* - `green` — all required services OK
* - `yellow` — all required services OK, but a non-critical check failed
* (currently unused — reserved for future optional probes)
* - `red` — at least one required service failed
*/
export async function probeServiceHealth(
config: TierConfig,
configPath?: string,
): Promise<TierHealthReport> {
const tier = config.tier;
// local tier: PGlite is in-process, no external services needed.
if (tier === 'local') {
return {
tier,
configPath,
overall: 'green',
services: [
{ name: 'postgres', status: 'skipped', durationMs: 0 },
{ name: 'valkey', status: 'skipped', durationMs: 0 },
{ name: 'pgvector', status: 'skipped', durationMs: 0 },
],
};
}
const pgUrl =
config.storage.type === 'postgres' ? config.storage.url : 'postgresql://localhost:5432/mosaic';
const valkeyUrl =
config.queue.type === 'bullmq' ? (config.queue.url ?? DEFAULT_VALKEY_URL) : null;
const services: ServiceCheck[] = [];
let hasFailure = false;
if (tier === 'standalone') {
// Postgres — required
const pgResult = await probePostgresMeasured(pgUrl);
if (pgResult.error) {
hasFailure = true;
services.push({
name: 'postgres',
status: 'fail',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
error: pgResult.error,
});
} else {
services.push({
name: 'postgres',
status: 'ok',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
});
}
// Valkey — required if bullmq
if (valkeyUrl) {
const vkResult = await probeValkeyMeasured(valkeyUrl);
if (vkResult.error) {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
error: vkResult.error,
});
} else {
services.push({
name: 'valkey',
status: 'ok',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
});
}
} else {
services.push({ name: 'valkey', status: 'skipped', durationMs: 0 });
}
// pgvector — not required for standalone
services.push({ name: 'pgvector', status: 'skipped', durationMs: 0 });
return {
tier,
configPath,
overall: hasFailure ? 'red' : 'green',
services,
};
}
// tier === 'federated'
// Postgres — required
const pgResult = await probePostgresMeasured(pgUrl);
if (pgResult.error) {
hasFailure = true;
services.push({
name: 'postgres',
status: 'fail',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
error: pgResult.error,
});
} else {
services.push({
name: 'postgres',
status: 'ok',
host: pgResult.host,
port: pgResult.port,
durationMs: pgResult.durationMs,
});
}
// Valkey — required for federated (queue.type must be bullmq)
if (config.queue.type !== 'bullmq') {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: 'localhost',
port: 0,
durationMs: 0,
error: {
message: "Federated tier requires queue.type === 'bullmq'",
remediation:
"Set queue: { type: 'bullmq', url: 'redis://...' } in your mosaic.config.json.",
},
});
} else {
const federatedValkeyUrl = config.queue.url ?? DEFAULT_VALKEY_URL;
const vkResult = await probeValkeyMeasured(federatedValkeyUrl);
if (vkResult.error) {
hasFailure = true;
services.push({
name: 'valkey',
status: 'fail',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
error: vkResult.error,
});
} else {
services.push({
name: 'valkey',
status: 'ok',
host: vkResult.host,
port: vkResult.port,
durationMs: vkResult.durationMs,
});
}
}
// pgvector — required for federated
const pvResult = await probePgvectorMeasured(pgUrl);
if (pvResult.error) {
hasFailure = true;
services.push({
name: 'pgvector',
status: 'fail',
host: pvResult.host,
port: pvResult.port,
durationMs: pvResult.durationMs,
error: pvResult.error,
});
} else {
services.push({
name: 'pgvector',
status: 'ok',
host: pvResult.host,
port: pvResult.port,
durationMs: pvResult.durationMs,
});
}
return {
tier,
configPath,
overall: hasFailure ? 'red' : 'green',
services,
};
}

54
pnpm-lock.yaml generated
View File

@@ -648,6 +648,9 @@ importers:
commander: commander:
specifier: ^13.0.0 specifier: ^13.0.0
version: 13.1.0 version: 13.1.0
ioredis:
specifier: ^5.10.0
version: 5.10.0
postgres: postgres:
specifier: ^3.4.8 specifier: ^3.4.8
version: 3.4.8 version: 3.4.8
@@ -698,10 +701,10 @@ importers:
dependencies: dependencies:
'@mariozechner/pi-agent-core': '@mariozechner/pi-agent-core':
specifier: ^0.63.1 specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
'@mariozechner/pi-ai': '@mariozechner/pi-ai':
specifier: ^0.63.1 specifier: ^0.63.1
version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76) version: 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
'@sinclair/typebox': '@sinclair/typebox':
specifier: ^0.34.41 specifier: ^0.34.41
version: 0.34.48 version: 0.34.48
@@ -7259,12 +7262,6 @@ snapshots:
'@jridgewell/gen-mapping': 0.3.13 '@jridgewell/gen-mapping': 0.3.13
'@jridgewell/trace-mapping': 0.3.31 '@jridgewell/trace-mapping': 0.3.31
'@anthropic-ai/sdk@0.73.0(zod@3.25.76)':
dependencies:
json-schema-to-ts: 3.1.1
optionalDependencies:
zod: 3.25.76
'@anthropic-ai/sdk@0.73.0(zod@4.3.6)': '@anthropic-ai/sdk@0.73.0(zod@4.3.6)':
dependencies: dependencies:
json-schema-to-ts: 3.1.1 json-schema-to-ts: 3.1.1
@@ -8606,18 +8603,6 @@ snapshots:
- ws - ws
- zod - zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': '@mariozechner/pi-agent-core@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies: dependencies:
'@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) '@mariozechner/pi-ai': 0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
@@ -8666,30 +8651,6 @@ snapshots:
- ws - ws
- zod - zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@3.25.76)':
dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@3.25.76)
'@aws-sdk/client-bedrock-runtime': 3.1008.0
'@google/genai': 1.45.0(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))
'@mistralai/mistralai': 1.14.1
'@sinclair/typebox': 0.34.48
ajv: 8.18.0
ajv-formats: 3.0.1(ajv@8.18.0)
chalk: 5.6.2
openai: 6.26.0(ws@8.20.0)(zod@3.25.76)
partial-json: 0.1.7
proxy-agent: 6.5.0
undici: 7.24.3
zod-to-json-schema: 3.25.1(zod@3.25.76)
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- aws-crt
- bufferutil
- supports-color
- utf-8-validate
- ws
- zod
'@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)': '@mariozechner/pi-ai@0.63.2(@modelcontextprotocol/sdk@1.28.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)':
dependencies: dependencies:
'@anthropic-ai/sdk': 0.73.0(zod@4.3.6) '@anthropic-ai/sdk': 0.73.0(zod@4.3.6)
@@ -13185,11 +13146,6 @@ snapshots:
dependencies: dependencies:
mimic-function: 5.0.1 mimic-function: 5.0.1
openai@6.26.0(ws@8.20.0)(zod@3.25.76):
optionalDependencies:
ws: 8.20.0
zod: 3.25.76
openai@6.26.0(ws@8.20.0)(zod@4.3.6): openai@6.26.0(ws@8.20.0)(zod@4.3.6):
optionalDependencies: optionalDependencies:
ws: 8.20.0 ws: 8.20.0