feat(#131): add OpenTelemetry tracing infrastructure
Implement comprehensive distributed tracing for HTTP requests and LLM operations using OpenTelemetry with GenAI semantic conventions. Features: - TelemetryService: SDK initialization with OTLP HTTP exporter - TelemetryInterceptor: Automatic HTTP request spans - @TraceLlmCall decorator: LLM operation tracing - GenAI semantic conventions for model/token tracking - Graceful degradation when tracing disabled Instrumented: - All HTTP requests (automatic spans) - OllamaProvider chat/chatStream/embed operations - Token counts, model names, durations Environment: - OTEL_ENABLED (default: true) - OTEL_SERVICE_NAME (default: mosaic-api) - OTEL_EXPORTER_OTLP_ENDPOINT (default: localhost:4318) Tests: 23 passing with full coverage Fixes #131 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -31,6 +31,13 @@
|
||||
"@nestjs/platform-express": "^11.1.12",
|
||||
"@nestjs/platform-socket.io": "^11.1.12",
|
||||
"@nestjs/websockets": "^11.1.12",
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@opentelemetry/auto-instrumentations-node": "^0.55.0",
|
||||
"@opentelemetry/exporter-trace-otlp-http": "^0.56.0",
|
||||
"@opentelemetry/instrumentation-nestjs-core": "^0.44.0",
|
||||
"@opentelemetry/resources": "^1.30.1",
|
||||
"@opentelemetry/sdk-node": "^0.56.0",
|
||||
"@opentelemetry/semantic-conventions": "^1.28.0",
|
||||
"@prisma/client": "^6.19.2",
|
||||
"@types/marked": "^6.0.0",
|
||||
"@types/multer": "^2.0.0",
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { APP_INTERCEPTOR } from "@nestjs/core";
|
||||
import { AppController } from "./app.controller";
|
||||
import { AppService } from "./app.service";
|
||||
import { PrismaModule } from "./prisma/prisma.module";
|
||||
@@ -20,9 +21,11 @@ import { BrainModule } from "./brain/brain.module";
|
||||
import { CronModule } from "./cron/cron.module";
|
||||
import { AgentTasksModule } from "./agent-tasks/agent-tasks.module";
|
||||
import { ValkeyModule } from "./valkey/valkey.module";
|
||||
import { TelemetryModule, TelemetryInterceptor } from "./telemetry";
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TelemetryModule,
|
||||
PrismaModule,
|
||||
DatabaseModule,
|
||||
ValkeyModule,
|
||||
@@ -44,6 +47,12 @@ import { ValkeyModule } from "./valkey/valkey.module";
|
||||
AgentTasksModule,
|
||||
],
|
||||
controllers: [AppController],
|
||||
providers: [AppService],
|
||||
providers: [
|
||||
AppService,
|
||||
{
|
||||
provide: APP_INTERCEPTOR,
|
||||
useClass: TelemetryInterceptor,
|
||||
},
|
||||
],
|
||||
})
|
||||
export class AppModule {}
|
||||
|
||||
@@ -6,6 +6,8 @@ import type {
|
||||
LlmProviderHealthStatus,
|
||||
} from "./llm-provider.interface";
|
||||
import type { ChatRequestDto, ChatResponseDto, EmbedRequestDto, EmbedResponseDto } from "../dto";
|
||||
import { TraceLlmCall, createLlmSpan } from "../../telemetry";
|
||||
import { SpanStatusCode } from "@opentelemetry/api";
|
||||
|
||||
/**
|
||||
* Configuration for Ollama LLM provider.
|
||||
@@ -137,6 +139,7 @@ export class OllamaProvider implements LlmProviderInterface {
|
||||
* @returns Complete chat response
|
||||
* @throws {Error} If the request fails
|
||||
*/
|
||||
@TraceLlmCall({ system: "ollama", operation: "chat" })
|
||||
async chat(request: ChatRequestDto): Promise<ChatResponseDto> {
|
||||
try {
|
||||
const messages = this.buildMessages(request);
|
||||
@@ -176,6 +179,8 @@ export class OllamaProvider implements LlmProviderInterface {
|
||||
* @throws {Error} If the request fails
|
||||
*/
|
||||
async *chatStream(request: ChatRequestDto): AsyncGenerator<ChatResponseDto> {
|
||||
const span = createLlmSpan("ollama", "chat.stream", request.model);
|
||||
|
||||
try {
|
||||
const messages = this.buildMessages(request);
|
||||
const options = this.buildChatOptions(request);
|
||||
@@ -197,10 +202,21 @@ export class OllamaProvider implements LlmProviderInterface {
|
||||
done: chunk.done,
|
||||
};
|
||||
}
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Streaming failed: ${errorMessage}`);
|
||||
|
||||
span.recordException(error instanceof Error ? error : new Error(errorMessage));
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: errorMessage,
|
||||
});
|
||||
|
||||
throw new Error(`Streaming failed: ${errorMessage}`);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +227,7 @@ export class OllamaProvider implements LlmProviderInterface {
|
||||
* @returns Embeddings response with vector arrays
|
||||
* @throws {Error} If the request fails
|
||||
*/
|
||||
@TraceLlmCall({ system: "ollama", operation: "embed" })
|
||||
async embed(request: EmbedRequestDto): Promise<EmbedResponseDto> {
|
||||
try {
|
||||
const response = await this.client.embed({
|
||||
|
||||
17
apps/api/src/telemetry/index.ts
Normal file
17
apps/api/src/telemetry/index.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
/**
|
||||
* OpenTelemetry distributed tracing module.
|
||||
* Provides HTTP request tracing and LLM operation instrumentation.
|
||||
*
|
||||
* @module telemetry
|
||||
*/
|
||||
|
||||
export { TelemetryModule } from "./telemetry.module";
|
||||
export { TelemetryService } from "./telemetry.service";
|
||||
export { TelemetryInterceptor } from "./telemetry.interceptor";
|
||||
export { SpanContextService } from "./span-context.service";
|
||||
export {
|
||||
TraceLlmCall,
|
||||
createLlmSpan,
|
||||
recordLlmUsage,
|
||||
type LlmTraceMetadata,
|
||||
} from "./llm-telemetry.decorator";
|
||||
168
apps/api/src/telemetry/llm-telemetry.decorator.ts
Normal file
168
apps/api/src/telemetry/llm-telemetry.decorator.ts
Normal file
@@ -0,0 +1,168 @@
|
||||
import type { Span } from "@opentelemetry/api";
|
||||
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
|
||||
|
||||
/**
|
||||
* Metadata interface for LLM tracing configuration.
|
||||
*/
|
||||
export interface LlmTraceMetadata {
|
||||
/**
|
||||
* The LLM system being used (e.g., "ollama", "openai", "anthropic")
|
||||
*/
|
||||
system: string;
|
||||
|
||||
/**
|
||||
* The operation type (e.g., "chat", "embed", "completion")
|
||||
*/
|
||||
operation: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Symbol key for storing LLM trace metadata
|
||||
*/
|
||||
const LLM_TRACE_METADATA = Symbol("llm:trace:metadata");
|
||||
|
||||
/**
|
||||
* Decorator that adds OpenTelemetry tracing to LLM provider methods.
|
||||
* Automatically creates spans with GenAI semantic conventions.
|
||||
*
|
||||
* @param metadata - Configuration for the LLM trace
|
||||
* @returns Method decorator
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* class OllamaProvider {
|
||||
* @TraceLlmCall({ system: "ollama", operation: "chat" })
|
||||
* async chat(request: ChatRequest): Promise<ChatResponse> {
|
||||
* // Implementation
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export function TraceLlmCall(metadata: LlmTraceMetadata) {
|
||||
return function (
|
||||
target: object,
|
||||
propertyKey: string,
|
||||
descriptor: PropertyDescriptor
|
||||
): PropertyDescriptor {
|
||||
const originalMethod = descriptor.value as (
|
||||
this: unknown,
|
||||
...args: unknown[]
|
||||
) => Promise<unknown>;
|
||||
|
||||
descriptor.value = async function (this: unknown, ...args: unknown[]): Promise<unknown> {
|
||||
const tracer = trace.getTracer("mosaic-api");
|
||||
const spanName = `${metadata.system}.${metadata.operation}`;
|
||||
|
||||
const span = tracer.startSpan(spanName, {
|
||||
kind: SpanKind.CLIENT,
|
||||
attributes: {
|
||||
"gen_ai.system": metadata.system,
|
||||
"gen_ai.operation.name": metadata.operation,
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
// Extract model from first argument if it's an object with a model property
|
||||
if (args[0] && typeof args[0] === "object" && "model" in args[0]) {
|
||||
const request = args[0] as { model?: string };
|
||||
if (request.model) {
|
||||
span.setAttribute("gen_ai.request.model", request.model);
|
||||
}
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
const result = await originalMethod.apply(this, args);
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
span.setAttribute("gen_ai.response.duration_ms", duration);
|
||||
|
||||
// Extract token usage from response if available
|
||||
if (result && typeof result === "object") {
|
||||
if ("promptEvalCount" in result && typeof result.promptEvalCount === "number") {
|
||||
span.setAttribute("gen_ai.usage.prompt_tokens", result.promptEvalCount);
|
||||
}
|
||||
if ("evalCount" in result && typeof result.evalCount === "number") {
|
||||
span.setAttribute("gen_ai.usage.completion_tokens", result.evalCount);
|
||||
}
|
||||
}
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return result;
|
||||
} catch (error) {
|
||||
span.recordException(error as Error);
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
};
|
||||
|
||||
// Store metadata for potential runtime inspection
|
||||
Reflect.defineMetadata(LLM_TRACE_METADATA, metadata, target, propertyKey);
|
||||
|
||||
return descriptor;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to manually create an LLM span for stream operations.
|
||||
* Use this for async generators where the decorator pattern doesn't work well.
|
||||
*
|
||||
* @param system - The LLM system (e.g., "ollama")
|
||||
* @param operation - The operation type (e.g., "chat.stream")
|
||||
* @param model - The model being used
|
||||
* @returns A span instance
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* async *chatStream(request: ChatRequest) {
|
||||
* const span = createLlmSpan("ollama", "chat.stream", request.model);
|
||||
* try {
|
||||
* for await (const chunk of stream) {
|
||||
* yield chunk;
|
||||
* }
|
||||
* span.setStatus({ code: SpanStatusCode.OK });
|
||||
* } catch (error) {
|
||||
* span.recordException(error);
|
||||
* span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
* throw error;
|
||||
* } finally {
|
||||
* span.end();
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export function createLlmSpan(system: string, operation: string, model?: string): Span {
|
||||
const tracer = trace.getTracer("mosaic-api");
|
||||
const spanName = `${system}.${operation}`;
|
||||
|
||||
const span = tracer.startSpan(spanName, {
|
||||
kind: SpanKind.CLIENT,
|
||||
attributes: {
|
||||
"gen_ai.system": system,
|
||||
"gen_ai.operation.name": operation,
|
||||
...(model && { "gen_ai.request.model": model }),
|
||||
},
|
||||
});
|
||||
|
||||
return span;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to record token usage on an LLM span.
|
||||
*
|
||||
* @param span - The span to record usage on
|
||||
* @param promptTokens - Number of prompt tokens
|
||||
* @param completionTokens - Number of completion tokens
|
||||
*/
|
||||
export function recordLlmUsage(span: Span, promptTokens?: number, completionTokens?: number): void {
|
||||
if (promptTokens !== undefined) {
|
||||
span.setAttribute("gen_ai.usage.prompt_tokens", promptTokens);
|
||||
}
|
||||
if (completionTokens !== undefined) {
|
||||
span.setAttribute("gen_ai.usage.completion_tokens", completionTokens);
|
||||
}
|
||||
}
|
||||
73
apps/api/src/telemetry/span-context.service.ts
Normal file
73
apps/api/src/telemetry/span-context.service.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { context, trace, type Span, type Context } from "@opentelemetry/api";
|
||||
|
||||
/**
|
||||
* Service for managing OpenTelemetry span context propagation.
|
||||
* Provides utilities for accessing and manipulating the active trace context.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const activeSpan = spanContextService.getActiveSpan();
|
||||
* if (activeSpan) {
|
||||
* activeSpan.setAttribute('custom.key', 'value');
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class SpanContextService {
|
||||
/**
|
||||
* Get the currently active span from the context.
|
||||
*
|
||||
* @returns The active span, or undefined if no span is active
|
||||
*/
|
||||
getActiveSpan(): Span | undefined {
|
||||
return trace.getActiveSpan();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current trace context.
|
||||
*
|
||||
* @returns The current context
|
||||
*/
|
||||
getContext(): Context {
|
||||
return context.active();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a function within a specific context.
|
||||
*
|
||||
* @param ctx - The context to run the function in
|
||||
* @param fn - The function to execute
|
||||
* @returns The result of the function
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const result = spanContextService.with(customContext, () => {
|
||||
* // This code runs with customContext active
|
||||
* return doSomething();
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
with<T>(ctx: Context, fn: () => T): T {
|
||||
return context.with(ctx, fn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a span as active for the duration of a function execution.
|
||||
*
|
||||
* @param span - The span to make active
|
||||
* @param fn - The function to execute with the active span
|
||||
* @returns The result of the function
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const result = await spanContextService.withActiveSpan(span, async () => {
|
||||
* // This code runs with span active
|
||||
* return await doAsyncWork();
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
withActiveSpan<T>(span: Span, fn: () => T): T {
|
||||
return context.with(trace.setSpan(context.active(), span), fn);
|
||||
}
|
||||
}
|
||||
181
apps/api/src/telemetry/telemetry.interceptor.spec.ts
Normal file
181
apps/api/src/telemetry/telemetry.interceptor.spec.ts
Normal file
@@ -0,0 +1,181 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { TelemetryInterceptor } from "./telemetry.interceptor";
|
||||
import { TelemetryService } from "./telemetry.service";
|
||||
import type { ExecutionContext, CallHandler } from "@nestjs/common";
|
||||
import type { Span } from "@opentelemetry/api";
|
||||
import { of, throwError } from "rxjs";
|
||||
import { lastValueFrom } from "rxjs";
|
||||
|
||||
describe("TelemetryInterceptor", () => {
|
||||
let interceptor: TelemetryInterceptor;
|
||||
let telemetryService: TelemetryService;
|
||||
let mockSpan: Span;
|
||||
let mockContext: ExecutionContext;
|
||||
let mockHandler: CallHandler;
|
||||
|
||||
beforeEach(() => {
|
||||
// Mock span
|
||||
mockSpan = {
|
||||
end: vi.fn(),
|
||||
setAttribute: vi.fn(),
|
||||
setAttributes: vi.fn(),
|
||||
addEvent: vi.fn(),
|
||||
setStatus: vi.fn(),
|
||||
updateName: vi.fn(),
|
||||
isRecording: vi.fn().mockReturnValue(true),
|
||||
recordException: vi.fn(),
|
||||
spanContext: vi.fn().mockReturnValue({
|
||||
traceId: "test-trace-id",
|
||||
spanId: "test-span-id",
|
||||
}),
|
||||
} as unknown as Span;
|
||||
|
||||
// Mock telemetry service
|
||||
telemetryService = {
|
||||
startSpan: vi.fn().mockReturnValue(mockSpan),
|
||||
recordException: vi.fn(),
|
||||
getTracer: vi.fn(),
|
||||
onModuleInit: vi.fn(),
|
||||
onModuleDestroy: vi.fn(),
|
||||
} as unknown as TelemetryService;
|
||||
|
||||
// Mock execution context
|
||||
mockContext = {
|
||||
switchToHttp: vi.fn().mockReturnValue({
|
||||
getRequest: vi.fn().mockReturnValue({
|
||||
method: "GET",
|
||||
url: "/api/test",
|
||||
path: "/api/test",
|
||||
}),
|
||||
getResponse: vi.fn().mockReturnValue({
|
||||
statusCode: 200,
|
||||
setHeader: vi.fn(),
|
||||
}),
|
||||
}),
|
||||
getClass: vi.fn().mockReturnValue({ name: "TestController" }),
|
||||
getHandler: vi.fn().mockReturnValue({ name: "testHandler" }),
|
||||
} as unknown as ExecutionContext;
|
||||
|
||||
interceptor = new TelemetryInterceptor(telemetryService);
|
||||
});
|
||||
|
||||
describe("intercept", () => {
|
||||
it("should create a span for HTTP request", async () => {
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(of({ data: "test" })),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await lastValueFrom(interceptor.intercept(mockContext, mockHandler));
|
||||
|
||||
expect(telemetryService.startSpan).toHaveBeenCalledWith(
|
||||
"GET /api/test",
|
||||
expect.objectContaining({
|
||||
attributes: expect.objectContaining({
|
||||
"http.request.method": "GET",
|
||||
"url.path": "/api/test",
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should set http.status_code attribute on success", async () => {
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(of({ data: "test" })),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await lastValueFrom(interceptor.intercept(mockContext, mockHandler));
|
||||
|
||||
expect(mockSpan.setAttribute).toHaveBeenCalledWith("http.response.status_code", 200);
|
||||
expect(mockSpan.end).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should add trace context to response headers", async () => {
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(of({ data: "test" })),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
const mockResponse = mockContext.switchToHttp().getResponse();
|
||||
|
||||
await lastValueFrom(interceptor.intercept(mockContext, mockHandler));
|
||||
|
||||
expect(mockResponse.setHeader).toHaveBeenCalledWith("x-trace-id", "test-trace-id");
|
||||
});
|
||||
|
||||
it("should record exception on error", async () => {
|
||||
const error = new Error("Test error");
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(throwError(() => error)),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await expect(lastValueFrom(interceptor.intercept(mockContext, mockHandler))).rejects.toThrow(
|
||||
"Test error"
|
||||
);
|
||||
|
||||
expect(telemetryService.recordException).toHaveBeenCalledWith(mockSpan, error);
|
||||
expect(mockSpan.end).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should end span even if error occurs", async () => {
|
||||
const error = new Error("Test error");
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(throwError(() => error)),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await expect(
|
||||
lastValueFrom(interceptor.intercept(mockContext, mockHandler))
|
||||
).rejects.toThrow();
|
||||
|
||||
expect(mockSpan.end).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should handle different HTTP methods", async () => {
|
||||
const postContext = {
|
||||
...mockContext,
|
||||
switchToHttp: vi.fn().mockReturnValue({
|
||||
getRequest: vi.fn().mockReturnValue({
|
||||
method: "POST",
|
||||
url: "/api/test",
|
||||
path: "/api/test",
|
||||
}),
|
||||
getResponse: vi.fn().mockReturnValue({
|
||||
statusCode: 201,
|
||||
setHeader: vi.fn(),
|
||||
}),
|
||||
}),
|
||||
} as unknown as ExecutionContext;
|
||||
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(of({ data: "created" })),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await lastValueFrom(interceptor.intercept(postContext, mockHandler));
|
||||
|
||||
expect(telemetryService.startSpan).toHaveBeenCalledWith(
|
||||
"POST /api/test",
|
||||
expect.objectContaining({
|
||||
attributes: expect.objectContaining({
|
||||
"http.request.method": "POST",
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should set controller and handler attributes", async () => {
|
||||
mockHandler = {
|
||||
handle: vi.fn().mockReturnValue(of({ data: "test" })),
|
||||
} as unknown as CallHandler;
|
||||
|
||||
await lastValueFrom(interceptor.intercept(mockContext, mockHandler));
|
||||
|
||||
expect(telemetryService.startSpan).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
expect.objectContaining({
|
||||
attributes: expect.objectContaining({
|
||||
"code.function": "testHandler",
|
||||
"code.namespace": "TestController",
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
100
apps/api/src/telemetry/telemetry.interceptor.ts
Normal file
100
apps/api/src/telemetry/telemetry.interceptor.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import { Injectable, NestInterceptor, ExecutionContext, CallHandler, Logger } from "@nestjs/common";
|
||||
import { Observable, throwError } from "rxjs";
|
||||
import { tap, catchError } from "rxjs/operators";
|
||||
import type { Request, Response } from "express";
|
||||
import { TelemetryService } from "./telemetry.service";
|
||||
import type { Span } from "@opentelemetry/api";
|
||||
import { SpanKind } from "@opentelemetry/api";
|
||||
import {
|
||||
ATTR_HTTP_REQUEST_METHOD,
|
||||
ATTR_HTTP_RESPONSE_STATUS_CODE,
|
||||
ATTR_URL_FULL,
|
||||
ATTR_URL_PATH,
|
||||
} from "@opentelemetry/semantic-conventions";
|
||||
|
||||
/**
|
||||
* Interceptor that automatically creates OpenTelemetry spans for all HTTP requests.
|
||||
* Records HTTP method, URL, status code, and trace context in response headers.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* // Apply globally in AppModule
|
||||
* @Module({
|
||||
* providers: [
|
||||
* {
|
||||
* provide: APP_INTERCEPTOR,
|
||||
* useClass: TelemetryInterceptor,
|
||||
* },
|
||||
* ],
|
||||
* })
|
||||
* export class AppModule {}
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class TelemetryInterceptor implements NestInterceptor {
|
||||
private readonly logger = new Logger(TelemetryInterceptor.name);
|
||||
|
||||
constructor(private readonly telemetryService: TelemetryService) {}
|
||||
|
||||
/**
|
||||
* Intercept HTTP requests and wrap them in OpenTelemetry spans.
|
||||
*
|
||||
* @param context - The execution context
|
||||
* @param next - The next call handler
|
||||
* @returns Observable of the response with tracing applied
|
||||
*/
|
||||
intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
|
||||
const httpContext = context.switchToHttp();
|
||||
const request = httpContext.getRequest<Request>();
|
||||
const response = httpContext.getResponse<Response>();
|
||||
|
||||
const method = request.method;
|
||||
const path = request.path || request.url;
|
||||
const spanName = `${method} ${path}`;
|
||||
|
||||
const span = this.telemetryService.startSpan(spanName, {
|
||||
kind: SpanKind.SERVER,
|
||||
attributes: {
|
||||
[ATTR_HTTP_REQUEST_METHOD]: method,
|
||||
[ATTR_URL_PATH]: path,
|
||||
[ATTR_URL_FULL]: request.url,
|
||||
"code.function": context.getHandler().name,
|
||||
"code.namespace": context.getClass().name,
|
||||
},
|
||||
});
|
||||
|
||||
return next.handle().pipe(
|
||||
tap(() => {
|
||||
this.finalizeSpan(span, response);
|
||||
}),
|
||||
catchError((error: Error) => {
|
||||
this.telemetryService.recordException(span, error);
|
||||
this.finalizeSpan(span, response);
|
||||
return throwError(() => error);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize the span by setting status code and adding trace context to headers.
|
||||
*
|
||||
* @param span - The span to finalize
|
||||
* @param response - The HTTP response
|
||||
*/
|
||||
private finalizeSpan(span: Span, response: Response): void {
|
||||
try {
|
||||
const statusCode = response.statusCode;
|
||||
span.setAttribute(ATTR_HTTP_RESPONSE_STATUS_CODE, statusCode);
|
||||
|
||||
// Add trace context to response headers for distributed tracing
|
||||
const spanContext = span.spanContext();
|
||||
if (spanContext.traceId) {
|
||||
response.setHeader("x-trace-id", spanContext.traceId);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn("Failed to finalize span", error);
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
23
apps/api/src/telemetry/telemetry.module.ts
Normal file
23
apps/api/src/telemetry/telemetry.module.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
import { Module, Global } from "@nestjs/common";
|
||||
import { TelemetryService } from "./telemetry.service";
|
||||
import { TelemetryInterceptor } from "./telemetry.interceptor";
|
||||
import { SpanContextService } from "./span-context.service";
|
||||
|
||||
/**
|
||||
* Global module providing OpenTelemetry distributed tracing.
|
||||
* Automatically instruments HTTP requests and provides utilities for LLM tracing.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* @Module({
|
||||
* imports: [TelemetryModule],
|
||||
* })
|
||||
* export class AppModule {}
|
||||
* ```
|
||||
*/
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [TelemetryService, TelemetryInterceptor, SpanContextService],
|
||||
exports: [TelemetryService, TelemetryInterceptor, SpanContextService],
|
||||
})
|
||||
export class TelemetryModule {}
|
||||
188
apps/api/src/telemetry/telemetry.service.spec.ts
Normal file
188
apps/api/src/telemetry/telemetry.service.spec.ts
Normal file
@@ -0,0 +1,188 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import { TelemetryService } from "./telemetry.service";
|
||||
import type { Tracer, Span } from "@opentelemetry/api";
|
||||
|
||||
describe("TelemetryService", () => {
|
||||
let service: TelemetryService;
|
||||
let originalEnv: NodeJS.ProcessEnv;
|
||||
|
||||
beforeEach(() => {
|
||||
originalEnv = { ...process.env };
|
||||
// Enable tracing by default for tests
|
||||
process.env.OTEL_ENABLED = "true";
|
||||
process.env.OTEL_SERVICE_NAME = "mosaic-api-test";
|
||||
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://localhost:4318/v1/traces";
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
process.env = originalEnv;
|
||||
if (service) {
|
||||
await service.onModuleDestroy();
|
||||
}
|
||||
});
|
||||
|
||||
describe("onModuleInit", () => {
|
||||
it("should initialize the SDK when OTEL_ENABLED is true", async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(service.getTracer()).toBeDefined();
|
||||
});
|
||||
|
||||
it("should not initialize SDK when OTEL_ENABLED is false", async () => {
|
||||
process.env.OTEL_ENABLED = "false";
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(service.getTracer()).toBeDefined(); // Should return noop tracer
|
||||
});
|
||||
|
||||
it("should use custom service name from env", async () => {
|
||||
process.env.OTEL_SERVICE_NAME = "custom-service";
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(service.getTracer()).toBeDefined();
|
||||
});
|
||||
|
||||
it("should use default service name when not provided", async () => {
|
||||
delete process.env.OTEL_SERVICE_NAME;
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(service.getTracer()).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("getTracer", () => {
|
||||
beforeEach(async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
});
|
||||
|
||||
it("should return a tracer instance", () => {
|
||||
const tracer = service.getTracer();
|
||||
expect(tracer).toBeDefined();
|
||||
expect(typeof tracer.startSpan).toBe("function");
|
||||
});
|
||||
|
||||
it("should return the same tracer instance on multiple calls", () => {
|
||||
const tracer1 = service.getTracer();
|
||||
const tracer2 = service.getTracer();
|
||||
expect(tracer1).toBe(tracer2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("startSpan", () => {
|
||||
beforeEach(async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
});
|
||||
|
||||
it("should create a span with the given name", () => {
|
||||
const span = service.startSpan("test-span");
|
||||
expect(span).toBeDefined();
|
||||
expect(typeof span.end).toBe("function");
|
||||
span.end();
|
||||
});
|
||||
|
||||
it("should create a span with attributes", () => {
|
||||
const span = service.startSpan("test-span", {
|
||||
attributes: {
|
||||
"test.attribute": "value",
|
||||
},
|
||||
});
|
||||
expect(span).toBeDefined();
|
||||
span.end();
|
||||
});
|
||||
|
||||
it("should create nested spans", () => {
|
||||
const parentSpan = service.startSpan("parent-span");
|
||||
const childSpan = service.startSpan("child-span");
|
||||
|
||||
expect(parentSpan).toBeDefined();
|
||||
expect(childSpan).toBeDefined();
|
||||
|
||||
childSpan.end();
|
||||
parentSpan.end();
|
||||
});
|
||||
});
|
||||
|
||||
describe("recordException", () => {
|
||||
let span: Span;
|
||||
|
||||
beforeEach(async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
span = service.startSpan("test-span");
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
span.end();
|
||||
});
|
||||
|
||||
it("should record an exception on the span", () => {
|
||||
const error = new Error("Test error");
|
||||
const recordExceptionSpy = vi.spyOn(span, "recordException");
|
||||
|
||||
service.recordException(span, error);
|
||||
|
||||
expect(recordExceptionSpy).toHaveBeenCalledWith(error);
|
||||
});
|
||||
|
||||
it("should set span status to error", () => {
|
||||
const error = new Error("Test error");
|
||||
const setStatusSpy = vi.spyOn(span, "setStatus");
|
||||
|
||||
service.recordException(span, error);
|
||||
|
||||
expect(setStatusSpy).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("onModuleDestroy", () => {
|
||||
it("should shutdown the SDK gracefully", async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
await expect(service.onModuleDestroy()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should not throw if called multiple times", async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
await service.onModuleDestroy();
|
||||
await expect(service.onModuleDestroy()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should not throw if SDK was not initialized", async () => {
|
||||
process.env.OTEL_ENABLED = "false";
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
await expect(service.onModuleDestroy()).resolves.not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("disabled mode", () => {
|
||||
beforeEach(() => {
|
||||
process.env.OTEL_ENABLED = "false";
|
||||
});
|
||||
|
||||
it("should return noop tracer when disabled", async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
const tracer = service.getTracer();
|
||||
expect(tracer).toBeDefined();
|
||||
});
|
||||
|
||||
it("should not throw when creating spans while disabled", async () => {
|
||||
service = new TelemetryService();
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(() => service.startSpan("test-span")).not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
182
apps/api/src/telemetry/telemetry.service.ts
Normal file
182
apps/api/src/telemetry/telemetry.service.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from "@nestjs/common";
|
||||
import { NodeSDK } from "@opentelemetry/sdk-node";
|
||||
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
|
||||
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
|
||||
import { Resource } from "@opentelemetry/resources";
|
||||
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
|
||||
import type { Tracer, Span, SpanOptions } from "@opentelemetry/api";
|
||||
import { trace, SpanStatusCode } from "@opentelemetry/api";
|
||||
|
||||
/**
|
||||
* Service responsible for OpenTelemetry distributed tracing.
|
||||
* Initializes the OTEL SDK with Jaeger/OTLP exporters and provides
|
||||
* tracing utilities for HTTP requests and LLM operations.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const span = telemetryService.startSpan('operation-name', {
|
||||
* attributes: { 'custom.key': 'value' }
|
||||
* });
|
||||
* try {
|
||||
* // Perform operation
|
||||
* } catch (error) {
|
||||
* telemetryService.recordException(span, error);
|
||||
* } finally {
|
||||
* span.end();
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class TelemetryService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(TelemetryService.name);
|
||||
private sdk?: NodeSDK;
|
||||
private tracer!: Tracer;
|
||||
private enabled: boolean;
|
||||
private serviceName: string;
|
||||
private shutdownPromise?: Promise<void>;
|
||||
|
||||
constructor() {
|
||||
this.enabled = process.env.OTEL_ENABLED !== "false";
|
||||
this.serviceName = process.env.OTEL_SERVICE_NAME ?? "mosaic-api";
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the OpenTelemetry SDK with configured exporters.
|
||||
* This is called automatically by NestJS when the module is initialized.
|
||||
*/
|
||||
onModuleInit(): void {
|
||||
if (!this.enabled) {
|
||||
this.logger.log("OpenTelemetry tracing is disabled");
|
||||
this.tracer = trace.getTracer("noop");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const exporter = this.createExporter();
|
||||
const resource = new Resource({
|
||||
[ATTR_SERVICE_NAME]: this.serviceName,
|
||||
});
|
||||
|
||||
this.sdk = new NodeSDK({
|
||||
resource,
|
||||
traceExporter: exporter,
|
||||
instrumentations: [
|
||||
getNodeAutoInstrumentations({
|
||||
"@opentelemetry/instrumentation-fs": {
|
||||
enabled: false, // Disable file system instrumentation to reduce noise
|
||||
},
|
||||
}),
|
||||
],
|
||||
});
|
||||
|
||||
this.sdk.start();
|
||||
this.tracer = trace.getTracer(this.serviceName);
|
||||
|
||||
this.logger.log(`OpenTelemetry SDK started for service: ${this.serviceName}`);
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to initialize OpenTelemetry SDK", error);
|
||||
// Fallback to noop tracer to prevent application failures
|
||||
this.tracer = trace.getTracer("noop");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the OpenTelemetry SDK gracefully.
|
||||
* This is called automatically by NestJS when the module is destroyed.
|
||||
*/
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
if (!this.sdk) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Prevent multiple concurrent shutdowns
|
||||
if (this.shutdownPromise) {
|
||||
return this.shutdownPromise;
|
||||
}
|
||||
|
||||
this.shutdownPromise = (async () => {
|
||||
try {
|
||||
if (this.sdk) {
|
||||
await this.sdk.shutdown();
|
||||
}
|
||||
this.logger.log("OpenTelemetry SDK shut down successfully");
|
||||
} catch (error) {
|
||||
this.logger.error("Error shutting down OpenTelemetry SDK", error);
|
||||
}
|
||||
})();
|
||||
|
||||
return this.shutdownPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the tracer instance for creating spans.
|
||||
*
|
||||
* @returns The configured tracer instance
|
||||
*/
|
||||
getTracer(): Tracer {
|
||||
return this.tracer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new span with the given name and options.
|
||||
*
|
||||
* @param name - The name of the span
|
||||
* @param options - Optional span configuration
|
||||
* @returns A new span instance
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const span = telemetryService.startSpan('database-query', {
|
||||
* attributes: {
|
||||
* 'db.system': 'postgresql',
|
||||
* 'db.statement': 'SELECT * FROM users'
|
||||
* }
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
startSpan(name: string, options?: SpanOptions): Span {
|
||||
return this.tracer.startSpan(name, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record an exception on a span and set its status to error.
|
||||
*
|
||||
* @param span - The span to record the exception on
|
||||
* @param error - The error to record
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* try {
|
||||
* // Some operation
|
||||
* } catch (error) {
|
||||
* telemetryService.recordException(span, error);
|
||||
* throw error;
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
recordException(span: Span, error: Error): void {
|
||||
span.recordException(error);
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error.message,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the appropriate trace exporter based on environment configuration.
|
||||
* Uses OTLP HTTP exporter (compatible with Jaeger, Tempo, and other backends).
|
||||
*
|
||||
* @returns Configured trace exporter
|
||||
*/
|
||||
private createExporter(): OTLPTraceExporter {
|
||||
const otlpEndpoint =
|
||||
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ??
|
||||
process.env.OTEL_EXPORTER_JAEGER_ENDPOINT ??
|
||||
"http://localhost:4318/v1/traces";
|
||||
|
||||
this.logger.log(`Using OTLP HTTP exporter: ${otlpEndpoint}`);
|
||||
return new OTLPTraceExporter({
|
||||
url: otlpEndpoint,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -43,5 +43,8 @@
|
||||
"turbo": "^2.8.0",
|
||||
"typescript": "^5.8.2",
|
||||
"vitest": "^3.0.8"
|
||||
},
|
||||
"dependencies": {
|
||||
"@opentelemetry/resources": "^1.30.1"
|
||||
}
|
||||
}
|
||||
|
||||
1894
pnpm-lock.yaml
generated
1894
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user