feat(#371): track LLM task completions via Mosaic Telemetry
- Create LlmTelemetryTrackerService for non-blocking event emission - Normalize token usage across Anthropic, OpenAI, Ollama providers - Add cost table with per-token pricing in microdollars - Instrument chat, chatStream, and embed methods - Infer task type from calling context - Aggregate streaming tokens after stream ends with fallback estimation - Add 69 unit tests for tracker service, cost table, and LLM service Refs #371 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
import { Injectable, OnModuleInit, Logger, ServiceUnavailableException } from "@nestjs/common";
|
||||
import { LlmManagerService } from "./llm-manager.service";
|
||||
import { LlmTelemetryTrackerService, estimateTokens } from "./llm-telemetry-tracker.service";
|
||||
import type { ChatRequestDto, ChatResponseDto, EmbedRequestDto, EmbedResponseDto } from "./dto";
|
||||
import type { LlmProviderHealthStatus } from "./providers/llm-provider.interface";
|
||||
import type { LlmProviderHealthStatus, LlmProviderType } from "./providers/llm-provider.interface";
|
||||
|
||||
/**
|
||||
* LLM Service
|
||||
*
|
||||
* High-level service for LLM operations. Delegates to providers via LlmManagerService.
|
||||
* Maintains backward compatibility with the original API while supporting multiple providers.
|
||||
* Automatically tracks completions via Mosaic Telemetry (non-blocking).
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
@@ -33,7 +35,10 @@ import type { LlmProviderHealthStatus } from "./providers/llm-provider.interface
|
||||
export class LlmService implements OnModuleInit {
|
||||
private readonly logger = new Logger(LlmService.name);
|
||||
|
||||
constructor(private readonly llmManager: LlmManagerService) {
|
||||
constructor(
|
||||
private readonly llmManager: LlmManagerService,
|
||||
private readonly telemetryTracker: LlmTelemetryTrackerService
|
||||
) {
|
||||
this.logger.log("LLM service initialized");
|
||||
}
|
||||
|
||||
@@ -91,14 +96,45 @@ export class LlmService implements OnModuleInit {
|
||||
* Perform a synchronous chat completion.
|
||||
*
|
||||
* @param request - Chat request with messages and configuration
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @returns Complete chat response
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async chat(request: ChatRequestDto): Promise<ChatResponseDto> {
|
||||
async chat(request: ChatRequestDto, callingContext?: string): Promise<ChatResponseDto> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
return await provider.chat(request);
|
||||
providerType = provider.type;
|
||||
const response = await provider.chat(request);
|
||||
|
||||
// Fire-and-forget telemetry tracking
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: response.model,
|
||||
providerType,
|
||||
operation: "chat",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: response.promptEvalCount ?? 0,
|
||||
outputTokens: response.evalCount ?? 0,
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
|
||||
return response;
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chat",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Chat failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Chat completion failed: ${errorMessage}`);
|
||||
@@ -107,20 +143,75 @@ export class LlmService implements OnModuleInit {
|
||||
/**
|
||||
* Perform a streaming chat completion.
|
||||
* Yields response chunks as they arrive from the provider.
|
||||
* Aggregates token usage and tracks telemetry after the stream ends.
|
||||
*
|
||||
* @param request - Chat request with messages and configuration
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @yields Chat response chunks
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async *chatStream(request: ChatRequestDto): AsyncGenerator<ChatResponseDto, void, unknown> {
|
||||
async *chatStream(
|
||||
request: ChatRequestDto,
|
||||
callingContext?: string
|
||||
): AsyncGenerator<ChatResponseDto, void, unknown> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
let aggregatedContent = "";
|
||||
let lastChunkInputTokens = 0;
|
||||
let lastChunkOutputTokens = 0;
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
providerType = provider.type;
|
||||
const stream = provider.chatStream(request);
|
||||
|
||||
for await (const chunk of stream) {
|
||||
// Accumulate content for token estimation
|
||||
aggregatedContent += chunk.message.content;
|
||||
|
||||
// Some providers include token counts on the final chunk
|
||||
if (chunk.promptEvalCount !== undefined) {
|
||||
lastChunkInputTokens = chunk.promptEvalCount;
|
||||
}
|
||||
if (chunk.evalCount !== undefined) {
|
||||
lastChunkOutputTokens = chunk.evalCount;
|
||||
}
|
||||
|
||||
yield chunk;
|
||||
}
|
||||
|
||||
// After stream completes, track telemetry
|
||||
// Use actual token counts if available, otherwise estimate from content length
|
||||
const inputTokens =
|
||||
lastChunkInputTokens > 0
|
||||
? lastChunkInputTokens
|
||||
: estimateTokens(request.messages.map((m) => m.content).join(" "));
|
||||
const outputTokens =
|
||||
lastChunkOutputTokens > 0 ? lastChunkOutputTokens : estimateTokens(aggregatedContent);
|
||||
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chatStream",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chatStream",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Stream failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Streaming failed: ${errorMessage}`);
|
||||
@@ -130,14 +221,48 @@ export class LlmService implements OnModuleInit {
|
||||
* Generate embeddings for the given input texts.
|
||||
*
|
||||
* @param request - Embedding request with model and input texts
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @returns Embeddings response with vector arrays
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async embed(request: EmbedRequestDto): Promise<EmbedResponseDto> {
|
||||
async embed(request: EmbedRequestDto, callingContext?: string): Promise<EmbedResponseDto> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
return await provider.embed(request);
|
||||
providerType = provider.type;
|
||||
const response = await provider.embed(request);
|
||||
|
||||
// Estimate input tokens from the input text
|
||||
const inputTokens = estimateTokens(request.input.join(" "));
|
||||
|
||||
// Fire-and-forget telemetry tracking
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: response.model,
|
||||
providerType,
|
||||
operation: "embed",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens,
|
||||
outputTokens: 0, // Embeddings don't produce output tokens
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
|
||||
return response;
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "embed",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Embed failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Embedding failed: ${errorMessage}`);
|
||||
|
||||
Reference in New Issue
Block a user