Files
stack/apps/api/src/llm/llm.service.ts
Jason Woltje fcecf3654b feat(#371): track LLM task completions via Mosaic Telemetry
- Create LlmTelemetryTrackerService for non-blocking event emission
- Normalize token usage across Anthropic, OpenAI, Ollama providers
- Add cost table with per-token pricing in microdollars
- Instrument chat, chatStream, and embed methods
- Infer task type from calling context
- Aggregate streaming tokens after stream ends with fallback estimation
- Add 69 unit tests for tracker service, cost table, and LLM service

Refs #371

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 02:10:22 -06:00

272 lines
9.3 KiB
TypeScript

import { Injectable, OnModuleInit, Logger, ServiceUnavailableException } from "@nestjs/common";
import { LlmManagerService } from "./llm-manager.service";
import { LlmTelemetryTrackerService, estimateTokens } from "./llm-telemetry-tracker.service";
import type { ChatRequestDto, ChatResponseDto, EmbedRequestDto, EmbedResponseDto } from "./dto";
import type { LlmProviderHealthStatus, LlmProviderType } from "./providers/llm-provider.interface";
/**
* LLM Service
*
* High-level service for LLM operations. Delegates to providers via LlmManagerService.
* Maintains backward compatibility with the original API while supporting multiple providers.
* Automatically tracks completions via Mosaic Telemetry (non-blocking).
*
* @example
* ```typescript
* // Chat completion
* const response = await llmService.chat({
* model: "llama3.2",
* messages: [{ role: "user", content: "Hello" }]
* });
*
* // Streaming chat
* for await (const chunk of llmService.chatStream(request)) {
* console.log(chunk.message.content);
* }
*
* // Generate embeddings
* const embeddings = await llmService.embed({
* model: "llama3.2",
* input: ["text to embed"]
* });
* ```
*/
@Injectable()
export class LlmService implements OnModuleInit {
private readonly logger = new Logger(LlmService.name);
constructor(
private readonly llmManager: LlmManagerService,
private readonly telemetryTracker: LlmTelemetryTrackerService
) {
this.logger.log("LLM service initialized");
}
/**
* Check health status on module initialization.
* Logs the status but does not fail if unhealthy.
*/
async onModuleInit(): Promise<void> {
const health = await this.checkHealth();
if (health.healthy) {
const endpoint = health.endpoint ?? "default endpoint";
this.logger.log(`LLM provider healthy: ${health.provider} at ${endpoint}`);
} else {
const errorMsg = health.error ?? "unknown error";
this.logger.warn(`LLM provider unhealthy: ${errorMsg}`);
}
}
/**
* Check health of the default LLM provider.
* Returns health status without throwing errors.
*
* @returns Health status of the default provider
*/
async checkHealth(): Promise<LlmProviderHealthStatus> {
try {
const provider = await this.llmManager.getDefaultProvider();
return await provider.checkHealth();
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Health check failed: ${errorMessage}`);
return {
healthy: false,
provider: "unknown",
error: errorMessage,
};
}
}
/**
* List all available models from the default provider.
*
* @returns Array of model names
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
*/
async listModels(): Promise<string[]> {
try {
const provider = await this.llmManager.getDefaultProvider();
return await provider.listModels();
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to list models: ${errorMessage}`);
throw new ServiceUnavailableException(`Failed to list models: ${errorMessage}`);
}
}
/**
* Perform a synchronous chat completion.
*
* @param request - Chat request with messages and configuration
* @param callingContext - Optional context hint for telemetry task type inference
* @returns Complete chat response
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
*/
async chat(request: ChatRequestDto, callingContext?: string): Promise<ChatResponseDto> {
const startTime = Date.now();
let providerType: LlmProviderType = "ollama";
try {
const provider = await this.llmManager.getDefaultProvider();
providerType = provider.type;
const response = await provider.chat(request);
// Fire-and-forget telemetry tracking
this.telemetryTracker.trackLlmCompletion({
model: response.model,
providerType,
operation: "chat",
durationMs: Date.now() - startTime,
inputTokens: response.promptEvalCount ?? 0,
outputTokens: response.evalCount ?? 0,
callingContext,
success: true,
});
return response;
} catch (error: unknown) {
// Track failure (fire-and-forget)
this.telemetryTracker.trackLlmCompletion({
model: request.model,
providerType,
operation: "chat",
durationMs: Date.now() - startTime,
inputTokens: 0,
outputTokens: 0,
callingContext,
success: false,
});
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Chat failed: ${errorMessage}`);
throw new ServiceUnavailableException(`Chat completion failed: ${errorMessage}`);
}
}
/**
* Perform a streaming chat completion.
* Yields response chunks as they arrive from the provider.
* Aggregates token usage and tracks telemetry after the stream ends.
*
* @param request - Chat request with messages and configuration
* @param callingContext - Optional context hint for telemetry task type inference
* @yields Chat response chunks
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
*/
async *chatStream(
request: ChatRequestDto,
callingContext?: string
): AsyncGenerator<ChatResponseDto, void, unknown> {
const startTime = Date.now();
let providerType: LlmProviderType = "ollama";
let aggregatedContent = "";
let lastChunkInputTokens = 0;
let lastChunkOutputTokens = 0;
try {
const provider = await this.llmManager.getDefaultProvider();
providerType = provider.type;
const stream = provider.chatStream(request);
for await (const chunk of stream) {
// Accumulate content for token estimation
aggregatedContent += chunk.message.content;
// Some providers include token counts on the final chunk
if (chunk.promptEvalCount !== undefined) {
lastChunkInputTokens = chunk.promptEvalCount;
}
if (chunk.evalCount !== undefined) {
lastChunkOutputTokens = chunk.evalCount;
}
yield chunk;
}
// After stream completes, track telemetry
// Use actual token counts if available, otherwise estimate from content length
const inputTokens =
lastChunkInputTokens > 0
? lastChunkInputTokens
: estimateTokens(request.messages.map((m) => m.content).join(" "));
const outputTokens =
lastChunkOutputTokens > 0 ? lastChunkOutputTokens : estimateTokens(aggregatedContent);
this.telemetryTracker.trackLlmCompletion({
model: request.model,
providerType,
operation: "chatStream",
durationMs: Date.now() - startTime,
inputTokens,
outputTokens,
callingContext,
success: true,
});
} catch (error: unknown) {
// Track failure (fire-and-forget)
this.telemetryTracker.trackLlmCompletion({
model: request.model,
providerType,
operation: "chatStream",
durationMs: Date.now() - startTime,
inputTokens: 0,
outputTokens: 0,
callingContext,
success: false,
});
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Stream failed: ${errorMessage}`);
throw new ServiceUnavailableException(`Streaming failed: ${errorMessage}`);
}
}
/**
* Generate embeddings for the given input texts.
*
* @param request - Embedding request with model and input texts
* @param callingContext - Optional context hint for telemetry task type inference
* @returns Embeddings response with vector arrays
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
*/
async embed(request: EmbedRequestDto, callingContext?: string): Promise<EmbedResponseDto> {
const startTime = Date.now();
let providerType: LlmProviderType = "ollama";
try {
const provider = await this.llmManager.getDefaultProvider();
providerType = provider.type;
const response = await provider.embed(request);
// Estimate input tokens from the input text
const inputTokens = estimateTokens(request.input.join(" "));
// Fire-and-forget telemetry tracking
this.telemetryTracker.trackLlmCompletion({
model: response.model,
providerType,
operation: "embed",
durationMs: Date.now() - startTime,
inputTokens,
outputTokens: 0, // Embeddings don't produce output tokens
callingContext,
success: true,
});
return response;
} catch (error: unknown) {
// Track failure (fire-and-forget)
this.telemetryTracker.trackLlmCompletion({
model: request.model,
providerType,
operation: "embed",
durationMs: Date.now() - startTime,
inputTokens: 0,
outputTokens: 0,
callingContext,
success: false,
});
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Embed failed: ${errorMessage}`);
throw new ServiceUnavailableException(`Embedding failed: ${errorMessage}`);
}
}
}