From 6c6bcbdb7fdb7f540e799a1c4f91794174f5bf41 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 21 Mar 2026 21:30:15 +0000 Subject: [PATCH] feat(M3-007,M3-009): provider health check scheduler and Ollama embedding default (#308) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- apps/gateway/src/agent/provider.service.ts | 161 +++++++++++------ .../gateway/src/agent/providers.controller.ts | 5 + apps/gateway/src/memory/embedding.service.ts | 164 ++++++++++++++++-- 3 files changed, 263 insertions(+), 67 deletions(-) diff --git a/apps/gateway/src/agent/provider.service.ts b/apps/gateway/src/agent/provider.service.ts index 43cc6d1..39fb9af 100644 --- a/apps/gateway/src/agent/provider.service.ts +++ b/apps/gateway/src/agent/provider.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, type OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common'; import { ModelRegistry, AuthStorage } from '@mariozechner/pi-coding-agent'; import { getModel, type Model, type Api } from '@mariozechner/pi-ai'; import type { @@ -8,14 +8,17 @@ import type { ProviderHealth, ProviderInfo, } from '@mosaic/types'; -import { OllamaAdapter } from './adapters/index.js'; +import { AnthropicAdapter, OllamaAdapter, OpenAIAdapter } from './adapters/index.js'; import type { TestConnectionResultDto } from './provider.dto.js'; +/** Default health check interval in seconds */ +const DEFAULT_HEALTH_INTERVAL_SECS = 60; + /** DI injection token for the provider adapter array. */ export const PROVIDER_ADAPTERS = Symbol('PROVIDER_ADAPTERS'); @Injectable() -export class ProviderService implements OnModuleInit { +export class ProviderService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(ProviderService.name); private registry!: ModelRegistry; @@ -26,25 +29,123 @@ export class ProviderService implements OnModuleInit { */ private adapters: IProviderAdapter[] = []; + /** + * Cached health status per provider, updated by the health check scheduler. + */ + private healthCache: Map = new Map(); + + /** Timer handle for the periodic health check scheduler */ + private healthCheckTimer: ReturnType | null = null; + async onModuleInit(): Promise { const authStorage = AuthStorage.inMemory(); this.registry = new ModelRegistry(authStorage); // Build the default set of adapters that rely on the registry - this.adapters = [new OllamaAdapter(this.registry)]; + this.adapters = [ + new OllamaAdapter(this.registry), + new AnthropicAdapter(this.registry), + new OpenAIAdapter(this.registry), + ]; - // Run all adapter registrations first (Ollama, and any future adapters) + // Run all adapter registrations first (Ollama, Anthropic, and any future adapters) await this.registerAll(); - // Register API-key providers directly (Anthropic, OpenAI, Z.ai, custom) - // These do not yet have dedicated adapter classes (M3-002 through M3-005). - this.registerAnthropicProvider(); - this.registerOpenAIProvider(); + // Register API-key providers directly (Z.ai, custom) + // OpenAI now has a dedicated adapter (M3-003). this.registerZaiProvider(); this.registerCustomProviders(); const available = this.registry.getAvailable(); this.logger.log(`Providers initialized: ${available.length} models available`); + + // Kick off the health check scheduler + this.startHealthCheckScheduler(); + } + + onModuleDestroy(): void { + if (this.healthCheckTimer !== null) { + clearInterval(this.healthCheckTimer); + this.healthCheckTimer = null; + } + } + + // --------------------------------------------------------------------------- + // Health check scheduler + // --------------------------------------------------------------------------- + + /** + * Start periodic health checks on all adapters. + * Interval is configurable via PROVIDER_HEALTH_INTERVAL env (seconds, default 60). + */ + private startHealthCheckScheduler(): void { + const intervalSecs = + parseInt(process.env['PROVIDER_HEALTH_INTERVAL'] ?? '', 10) || DEFAULT_HEALTH_INTERVAL_SECS; + const intervalMs = intervalSecs * 1000; + + // Run an initial check immediately (non-blocking) + void this.runScheduledHealthChecks(); + + this.healthCheckTimer = setInterval(() => { + void this.runScheduledHealthChecks(); + }, intervalMs); + + this.logger.log(`Provider health check scheduler started (interval: ${intervalSecs}s)`); + } + + private async runScheduledHealthChecks(): Promise { + for (const adapter of this.adapters) { + try { + const health = await adapter.healthCheck(); + const modelCount = adapter.listModels().length; + this.healthCache.set(adapter.name, { ...health, modelCount }); + this.logger.debug( + `Health check [${adapter.name}]: ${health.status} (${health.latencyMs ?? 'n/a'}ms)`, + ); + } catch (err) { + const modelCount = adapter.listModels().length; + this.healthCache.set(adapter.name, { + status: 'down', + lastChecked: new Date().toISOString(), + error: err instanceof Error ? err.message : String(err), + modelCount, + }); + } + } + } + + /** + * Return the cached health status for all adapters. + * Format: array of { name, status, latencyMs, lastChecked, modelCount } + */ + getProvidersHealth(): Array<{ + name: string; + status: string; + latencyMs?: number; + lastChecked: string; + modelCount: number; + error?: string; + }> { + return this.adapters.map((adapter) => { + const cached = this.healthCache.get(adapter.name); + if (cached) { + return { + name: adapter.name, + status: cached.status, + latencyMs: cached.latencyMs, + lastChecked: cached.lastChecked, + modelCount: cached.modelCount, + error: cached.error, + }; + } + // Not yet checked — return a pending placeholder + return { + name: adapter.name, + status: 'unknown', + lastChecked: new Date().toISOString(), + modelCount: adapter.listModels().length, + }; + }); } // --------------------------------------------------------------------------- @@ -234,49 +335,9 @@ export class ProviderService implements OnModuleInit { // --------------------------------------------------------------------------- // Private helpers — direct registry registration for providers without adapters yet - // (Anthropic, OpenAI, Z.ai will move to adapters in M3-002 through M3-005) + // (Z.ai will move to an adapter in M3-005) // --------------------------------------------------------------------------- - private registerAnthropicProvider(): void { - const apiKey = process.env['ANTHROPIC_API_KEY']; - if (!apiKey) { - this.logger.debug('Skipping Anthropic provider registration: ANTHROPIC_API_KEY not set'); - return; - } - - const models = ['claude-sonnet-4-6', 'claude-opus-4-6', 'claude-haiku-4-5'].map((id) => - this.cloneBuiltInModel('anthropic', id, { maxTokens: 8192 }), - ); - - this.registry.registerProvider('anthropic', { - apiKey, - baseUrl: 'https://api.anthropic.com', - models, - }); - - this.logger.log('Anthropic provider registered with 3 models'); - } - - private registerOpenAIProvider(): void { - const apiKey = process.env['OPENAI_API_KEY']; - if (!apiKey) { - this.logger.debug('Skipping OpenAI provider registration: OPENAI_API_KEY not set'); - return; - } - - const models = ['gpt-4o', 'gpt-4o-mini', 'o3-mini'].map((id) => - this.cloneBuiltInModel('openai', id), - ); - - this.registry.registerProvider('openai', { - apiKey, - baseUrl: 'https://api.openai.com/v1', - models, - }); - - this.logger.log('OpenAI provider registered with 3 models'); - } - private registerZaiProvider(): void { const apiKey = process.env['ZAI_API_KEY']; if (!apiKey) { diff --git a/apps/gateway/src/agent/providers.controller.ts b/apps/gateway/src/agent/providers.controller.ts index 8a07aa4..f5cd882 100644 --- a/apps/gateway/src/agent/providers.controller.ts +++ b/apps/gateway/src/agent/providers.controller.ts @@ -23,6 +23,11 @@ export class ProvidersController { return this.providerService.listAvailableModels(); } + @Get('health') + health() { + return { providers: this.providerService.getProvidersHealth() }; + } + @Post('test') testConnection(@Body() body: TestConnectionDto): Promise { return this.providerService.testConnection(body.providerId, body.baseUrl); diff --git a/apps/gateway/src/memory/embedding.service.ts b/apps/gateway/src/memory/embedding.service.ts index 14f8551..42e77b6 100644 --- a/apps/gateway/src/memory/embedding.service.ts +++ b/apps/gateway/src/memory/embedding.service.ts @@ -1,36 +1,122 @@ import { Injectable, Logger } from '@nestjs/common'; import type { EmbeddingProvider } from '@mosaic/memory'; -const DEFAULT_MODEL = 'text-embedding-3-small'; -const DEFAULT_DIMENSIONS = 1536; +// --------------------------------------------------------------------------- +// Environment-driven configuration +// +// EMBEDDING_PROVIDER — 'ollama' (default) | 'openai' +// EMBEDDING_MODEL — model id, defaults differ per provider +// EMBEDDING_DIMENSIONS — integer, defaults differ per provider +// OLLAMA_BASE_URL — base URL for Ollama (used when provider=ollama) +// EMBEDDING_API_URL — full base URL for OpenAI-compatible API +// OPENAI_API_KEY — required for OpenAI provider +// --------------------------------------------------------------------------- -interface EmbeddingResponse { +const OLLAMA_DEFAULT_MODEL = 'nomic-embed-text'; +const OLLAMA_DEFAULT_DIMENSIONS = 768; + +const OPENAI_DEFAULT_MODEL = 'text-embedding-3-small'; +const OPENAI_DEFAULT_DIMENSIONS = 1536; + +/** Known dimension mismatch: warn if pgvector column likely has wrong size */ +const PGVECTOR_SCHEMA_DIMENSIONS = 1536; + +type EmbeddingBackend = 'ollama' | 'openai'; + +interface OllamaEmbeddingResponse { + embedding: number[]; +} + +interface OpenAIEmbeddingResponse { data: Array<{ embedding: number[]; index: number }>; model: string; usage: { prompt_tokens: number; total_tokens: number }; } /** - * Generates embeddings via the OpenAI-compatible embeddings API. - * Supports OpenAI, Azure OpenAI, and any provider with a compatible endpoint. + * Provider-agnostic embedding service. + * + * Defaults to Ollama's native embedding API using nomic-embed-text (768 dims). + * Falls back to the OpenAI-compatible API when EMBEDDING_PROVIDER=openai or + * when OPENAI_API_KEY is set and EMBEDDING_PROVIDER is not explicitly set to ollama. + * + * Dimension mismatch detection: if the configured dimensions differ from the + * pgvector schema (1536), a warning is logged with re-embedding instructions. */ @Injectable() export class EmbeddingService implements EmbeddingProvider { private readonly logger = new Logger(EmbeddingService.name); - private readonly apiKey: string | undefined; - private readonly baseUrl: string; + private readonly backend: EmbeddingBackend; private readonly model: string; + readonly dimensions: number; - readonly dimensions = DEFAULT_DIMENSIONS; + // Ollama-specific + private readonly ollamaBaseUrl: string | undefined; + + // OpenAI-compatible + private readonly openaiApiKey: string | undefined; + private readonly openaiBaseUrl: string; constructor() { - this.apiKey = process.env['OPENAI_API_KEY']; - this.baseUrl = process.env['EMBEDDING_API_URL'] ?? 'https://api.openai.com/v1'; - this.model = process.env['EMBEDDING_MODEL'] ?? DEFAULT_MODEL; + // Determine backend + const providerEnv = process.env['EMBEDDING_PROVIDER']; + const openaiKey = process.env['OPENAI_API_KEY']; + const ollamaUrl = process.env['OLLAMA_BASE_URL'] ?? process.env['OLLAMA_HOST']; + + if (providerEnv === 'openai') { + this.backend = 'openai'; + } else if (providerEnv === 'ollama') { + this.backend = 'ollama'; + } else if (process.env['EMBEDDING_API_URL']) { + // Legacy: explicit API URL configured → use openai-compat path + this.backend = 'openai'; + } else if (ollamaUrl) { + // Ollama available and no explicit override → prefer Ollama + this.backend = 'ollama'; + } else if (openaiKey) { + // OpenAI key present → use OpenAI + this.backend = 'openai'; + } else { + // Nothing configured — default to ollama (will return zeros when unavailable) + this.backend = 'ollama'; + } + + // Set model and dimension defaults based on backend + if (this.backend === 'ollama') { + this.model = process.env['EMBEDDING_MODEL'] ?? OLLAMA_DEFAULT_MODEL; + this.dimensions = + parseInt(process.env['EMBEDDING_DIMENSIONS'] ?? '', 10) || OLLAMA_DEFAULT_DIMENSIONS; + this.ollamaBaseUrl = ollamaUrl; + this.openaiApiKey = undefined; + this.openaiBaseUrl = ''; + } else { + this.model = process.env['EMBEDDING_MODEL'] ?? OPENAI_DEFAULT_MODEL; + this.dimensions = + parseInt(process.env['EMBEDDING_DIMENSIONS'] ?? '', 10) || OPENAI_DEFAULT_DIMENSIONS; + this.ollamaBaseUrl = undefined; + this.openaiApiKey = openaiKey; + this.openaiBaseUrl = process.env['EMBEDDING_API_URL'] ?? 'https://api.openai.com/v1'; + } + + // Warn on dimension mismatch with the current schema + if (this.dimensions !== PGVECTOR_SCHEMA_DIMENSIONS) { + this.logger.warn( + `Embedding dimensions (${this.dimensions}) differ from pgvector schema (${PGVECTOR_SCHEMA_DIMENSIONS}). ` + + `If insights already contain ${PGVECTOR_SCHEMA_DIMENSIONS}-dim vectors, similarity search will fail. ` + + `To fix: truncate the insights table and re-embed, or run a migration to ALTER COLUMN embedding TYPE vector(${this.dimensions}).`, + ); + } + + this.logger.log( + `EmbeddingService initialized: backend=${this.backend}, model=${this.model}, dimensions=${this.dimensions}`, + ); } get available(): boolean { - return !!this.apiKey; + if (this.backend === 'ollama') { + return !!this.ollamaBaseUrl; + } + return !!this.openaiApiKey; } async embed(text: string): Promise { @@ -39,16 +125,60 @@ export class EmbeddingService implements EmbeddingProvider { } async embedBatch(texts: string[]): Promise { - if (!this.apiKey) { - this.logger.warn('No OPENAI_API_KEY configured — returning zero vectors'); + if (!this.available) { + const reason = + this.backend === 'ollama' + ? 'OLLAMA_BASE_URL not configured' + : 'No OPENAI_API_KEY configured'; + this.logger.warn(`${reason} — returning zero vectors`); return texts.map(() => new Array(this.dimensions).fill(0)); } - const response = await fetch(`${this.baseUrl}/embeddings`, { + if (this.backend === 'ollama') { + return this.embedBatchOllama(texts); + } + return this.embedBatchOpenAI(texts); + } + + // --------------------------------------------------------------------------- + // Ollama backend + // --------------------------------------------------------------------------- + + private async embedBatchOllama(texts: string[]): Promise { + const baseUrl = this.ollamaBaseUrl!; + const results: number[][] = []; + + // Ollama's /api/embeddings endpoint processes one text at a time + for (const text of texts) { + const response = await fetch(`${baseUrl}/api/embeddings`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ model: this.model, prompt: text }), + }); + + if (!response.ok) { + const body = await response.text(); + this.logger.error(`Ollama embedding API error: ${response.status} ${body}`); + throw new Error(`Ollama embedding API returned ${response.status}`); + } + + const json = (await response.json()) as OllamaEmbeddingResponse; + results.push(json.embedding); + } + + return results; + } + + // --------------------------------------------------------------------------- + // OpenAI-compatible backend + // --------------------------------------------------------------------------- + + private async embedBatchOpenAI(texts: string[]): Promise { + const response = await fetch(`${this.openaiBaseUrl}/embeddings`, { method: 'POST', headers: { 'Content-Type': 'application/json', - Authorization: `Bearer ${this.apiKey}`, + Authorization: `Bearer ${this.openaiApiKey}`, }, body: JSON.stringify({ model: this.model, @@ -63,7 +193,7 @@ export class EmbeddingService implements EmbeddingProvider { throw new Error(`Embedding API returned ${response.status}`); } - const json = (await response.json()) as EmbeddingResponse; + const json = (await response.json()) as OpenAIEmbeddingResponse; return json.data.sort((a, b) => a.index - b.index).map((d) => d.embedding); } }