- Create LlmTelemetryTrackerService for non-blocking event emission - Normalize token usage across Anthropic, OpenAI, Ollama providers - Add cost table with per-token pricing in microdollars - Instrument chat, chatStream, and embed methods - Infer task type from calling context - Aggregate streaming tokens after stream ends with fallback estimation - Add 69 unit tests for tracker service, cost table, and LLM service Refs #371 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
272 lines
9.3 KiB
TypeScript
272 lines
9.3 KiB
TypeScript
import { Injectable, OnModuleInit, Logger, ServiceUnavailableException } from "@nestjs/common";
|
|
import { LlmManagerService } from "./llm-manager.service";
|
|
import { LlmTelemetryTrackerService, estimateTokens } from "./llm-telemetry-tracker.service";
|
|
import type { ChatRequestDto, ChatResponseDto, EmbedRequestDto, EmbedResponseDto } from "./dto";
|
|
import type { LlmProviderHealthStatus, LlmProviderType } from "./providers/llm-provider.interface";
|
|
|
|
/**
|
|
* LLM Service
|
|
*
|
|
* High-level service for LLM operations. Delegates to providers via LlmManagerService.
|
|
* Maintains backward compatibility with the original API while supporting multiple providers.
|
|
* Automatically tracks completions via Mosaic Telemetry (non-blocking).
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* // Chat completion
|
|
* const response = await llmService.chat({
|
|
* model: "llama3.2",
|
|
* messages: [{ role: "user", content: "Hello" }]
|
|
* });
|
|
*
|
|
* // Streaming chat
|
|
* for await (const chunk of llmService.chatStream(request)) {
|
|
* console.log(chunk.message.content);
|
|
* }
|
|
*
|
|
* // Generate embeddings
|
|
* const embeddings = await llmService.embed({
|
|
* model: "llama3.2",
|
|
* input: ["text to embed"]
|
|
* });
|
|
* ```
|
|
*/
|
|
@Injectable()
|
|
export class LlmService implements OnModuleInit {
|
|
private readonly logger = new Logger(LlmService.name);
|
|
|
|
constructor(
|
|
private readonly llmManager: LlmManagerService,
|
|
private readonly telemetryTracker: LlmTelemetryTrackerService
|
|
) {
|
|
this.logger.log("LLM service initialized");
|
|
}
|
|
|
|
/**
|
|
* Check health status on module initialization.
|
|
* Logs the status but does not fail if unhealthy.
|
|
*/
|
|
async onModuleInit(): Promise<void> {
|
|
const health = await this.checkHealth();
|
|
if (health.healthy) {
|
|
const endpoint = health.endpoint ?? "default endpoint";
|
|
this.logger.log(`LLM provider healthy: ${health.provider} at ${endpoint}`);
|
|
} else {
|
|
const errorMsg = health.error ?? "unknown error";
|
|
this.logger.warn(`LLM provider unhealthy: ${errorMsg}`);
|
|
}
|
|
}
|
|
/**
|
|
* Check health of the default LLM provider.
|
|
* Returns health status without throwing errors.
|
|
*
|
|
* @returns Health status of the default provider
|
|
*/
|
|
async checkHealth(): Promise<LlmProviderHealthStatus> {
|
|
try {
|
|
const provider = await this.llmManager.getDefaultProvider();
|
|
return await provider.checkHealth();
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Health check failed: ${errorMessage}`);
|
|
return {
|
|
healthy: false,
|
|
provider: "unknown",
|
|
error: errorMessage,
|
|
};
|
|
}
|
|
}
|
|
/**
|
|
* List all available models from the default provider.
|
|
*
|
|
* @returns Array of model names
|
|
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
|
*/
|
|
async listModels(): Promise<string[]> {
|
|
try {
|
|
const provider = await this.llmManager.getDefaultProvider();
|
|
return await provider.listModels();
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Failed to list models: ${errorMessage}`);
|
|
throw new ServiceUnavailableException(`Failed to list models: ${errorMessage}`);
|
|
}
|
|
}
|
|
/**
|
|
* Perform a synchronous chat completion.
|
|
*
|
|
* @param request - Chat request with messages and configuration
|
|
* @param callingContext - Optional context hint for telemetry task type inference
|
|
* @returns Complete chat response
|
|
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
|
*/
|
|
async chat(request: ChatRequestDto, callingContext?: string): Promise<ChatResponseDto> {
|
|
const startTime = Date.now();
|
|
let providerType: LlmProviderType = "ollama";
|
|
|
|
try {
|
|
const provider = await this.llmManager.getDefaultProvider();
|
|
providerType = provider.type;
|
|
const response = await provider.chat(request);
|
|
|
|
// Fire-and-forget telemetry tracking
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: response.model,
|
|
providerType,
|
|
operation: "chat",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens: response.promptEvalCount ?? 0,
|
|
outputTokens: response.evalCount ?? 0,
|
|
callingContext,
|
|
success: true,
|
|
});
|
|
|
|
return response;
|
|
} catch (error: unknown) {
|
|
// Track failure (fire-and-forget)
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: request.model,
|
|
providerType,
|
|
operation: "chat",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens: 0,
|
|
outputTokens: 0,
|
|
callingContext,
|
|
success: false,
|
|
});
|
|
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Chat failed: ${errorMessage}`);
|
|
throw new ServiceUnavailableException(`Chat completion failed: ${errorMessage}`);
|
|
}
|
|
}
|
|
/**
|
|
* Perform a streaming chat completion.
|
|
* Yields response chunks as they arrive from the provider.
|
|
* Aggregates token usage and tracks telemetry after the stream ends.
|
|
*
|
|
* @param request - Chat request with messages and configuration
|
|
* @param callingContext - Optional context hint for telemetry task type inference
|
|
* @yields Chat response chunks
|
|
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
|
*/
|
|
async *chatStream(
|
|
request: ChatRequestDto,
|
|
callingContext?: string
|
|
): AsyncGenerator<ChatResponseDto, void, unknown> {
|
|
const startTime = Date.now();
|
|
let providerType: LlmProviderType = "ollama";
|
|
let aggregatedContent = "";
|
|
let lastChunkInputTokens = 0;
|
|
let lastChunkOutputTokens = 0;
|
|
|
|
try {
|
|
const provider = await this.llmManager.getDefaultProvider();
|
|
providerType = provider.type;
|
|
const stream = provider.chatStream(request);
|
|
|
|
for await (const chunk of stream) {
|
|
// Accumulate content for token estimation
|
|
aggregatedContent += chunk.message.content;
|
|
|
|
// Some providers include token counts on the final chunk
|
|
if (chunk.promptEvalCount !== undefined) {
|
|
lastChunkInputTokens = chunk.promptEvalCount;
|
|
}
|
|
if (chunk.evalCount !== undefined) {
|
|
lastChunkOutputTokens = chunk.evalCount;
|
|
}
|
|
|
|
yield chunk;
|
|
}
|
|
|
|
// After stream completes, track telemetry
|
|
// Use actual token counts if available, otherwise estimate from content length
|
|
const inputTokens =
|
|
lastChunkInputTokens > 0
|
|
? lastChunkInputTokens
|
|
: estimateTokens(request.messages.map((m) => m.content).join(" "));
|
|
const outputTokens =
|
|
lastChunkOutputTokens > 0 ? lastChunkOutputTokens : estimateTokens(aggregatedContent);
|
|
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: request.model,
|
|
providerType,
|
|
operation: "chatStream",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens,
|
|
outputTokens,
|
|
callingContext,
|
|
success: true,
|
|
});
|
|
} catch (error: unknown) {
|
|
// Track failure (fire-and-forget)
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: request.model,
|
|
providerType,
|
|
operation: "chatStream",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens: 0,
|
|
outputTokens: 0,
|
|
callingContext,
|
|
success: false,
|
|
});
|
|
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Stream failed: ${errorMessage}`);
|
|
throw new ServiceUnavailableException(`Streaming failed: ${errorMessage}`);
|
|
}
|
|
}
|
|
/**
|
|
* Generate embeddings for the given input texts.
|
|
*
|
|
* @param request - Embedding request with model and input texts
|
|
* @param callingContext - Optional context hint for telemetry task type inference
|
|
* @returns Embeddings response with vector arrays
|
|
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
|
*/
|
|
async embed(request: EmbedRequestDto, callingContext?: string): Promise<EmbedResponseDto> {
|
|
const startTime = Date.now();
|
|
let providerType: LlmProviderType = "ollama";
|
|
|
|
try {
|
|
const provider = await this.llmManager.getDefaultProvider();
|
|
providerType = provider.type;
|
|
const response = await provider.embed(request);
|
|
|
|
// Estimate input tokens from the input text
|
|
const inputTokens = estimateTokens(request.input.join(" "));
|
|
|
|
// Fire-and-forget telemetry tracking
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: response.model,
|
|
providerType,
|
|
operation: "embed",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens,
|
|
outputTokens: 0, // Embeddings don't produce output tokens
|
|
callingContext,
|
|
success: true,
|
|
});
|
|
|
|
return response;
|
|
} catch (error: unknown) {
|
|
// Track failure (fire-and-forget)
|
|
this.telemetryTracker.trackLlmCompletion({
|
|
model: request.model,
|
|
providerType,
|
|
operation: "embed",
|
|
durationMs: Date.now() - startTime,
|
|
inputTokens: 0,
|
|
outputTokens: 0,
|
|
callingContext,
|
|
success: false,
|
|
});
|
|
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
this.logger.error(`Embed failed: ${errorMessage}`);
|
|
throw new ServiceUnavailableException(`Embedding failed: ${errorMessage}`);
|
|
}
|
|
}
|
|
}
|