merge: resolve conflicts with develop (telemetry + lockfile)
Keep both Mosaic Telemetry section (from develop) and Matrix Dev Environment section (from feature branch) in .env.example. Regenerate pnpm-lock.yaml with both dependency trees merged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -27,6 +27,7 @@
|
||||
"dependencies": {
|
||||
"@anthropic-ai/sdk": "^0.72.1",
|
||||
"@mosaic/shared": "workspace:*",
|
||||
"@mosaicstack/telemetry-client": "^0.1.0",
|
||||
"@nestjs/axios": "^4.0.1",
|
||||
"@nestjs/bullmq": "^11.0.4",
|
||||
"@nestjs/common": "^11.1.12",
|
||||
|
||||
@@ -37,6 +37,7 @@ import { JobStepsModule } from "./job-steps/job-steps.module";
|
||||
import { CoordinatorIntegrationModule } from "./coordinator-integration/coordinator-integration.module";
|
||||
import { FederationModule } from "./federation/federation.module";
|
||||
import { CredentialsModule } from "./credentials/credentials.module";
|
||||
import { MosaicTelemetryModule } from "./mosaic-telemetry";
|
||||
import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor";
|
||||
|
||||
@Module({
|
||||
@@ -97,6 +98,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce
|
||||
CoordinatorIntegrationModule,
|
||||
FederationModule,
|
||||
CredentialsModule,
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
controllers: [AppController, CsrfController],
|
||||
providers: [
|
||||
|
||||
109
apps/api/src/llm/llm-cost-table.ts
Normal file
109
apps/api/src/llm/llm-cost-table.ts
Normal file
@@ -0,0 +1,109 @@
|
||||
/**
|
||||
* LLM Cost Table
|
||||
*
|
||||
* Maps model names to per-token costs in microdollars (USD * 1,000,000).
|
||||
* For example, $0.003 per 1K tokens = 3,000 microdollars per 1K tokens = 3 microdollars per token.
|
||||
*
|
||||
* Costs are split into input (prompt) and output (completion) pricing.
|
||||
* Ollama models run locally and are free (0 cost).
|
||||
*/
|
||||
|
||||
/**
|
||||
* Per-token cost in microdollars for a single model.
|
||||
*/
|
||||
export interface ModelCost {
|
||||
/** Cost per input token in microdollars */
|
||||
inputPerToken: number;
|
||||
/** Cost per output token in microdollars */
|
||||
outputPerToken: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cost table mapping model name prefixes to per-token pricing.
|
||||
*
|
||||
* Model matching is prefix-based: "claude-sonnet-4-5" matches "claude-sonnet-4-5-20250929".
|
||||
* More specific prefixes are checked first (longest match wins).
|
||||
*
|
||||
* Prices sourced from provider pricing pages as of 2026-02.
|
||||
*/
|
||||
const MODEL_COSTS: Record<string, ModelCost> = {
|
||||
// Anthropic Claude models (per-token microdollars)
|
||||
// claude-sonnet-4-5: $3/M input, $15/M output
|
||||
"claude-sonnet-4-5": { inputPerToken: 3, outputPerToken: 15 },
|
||||
// claude-opus-4: $15/M input, $75/M output
|
||||
"claude-opus-4": { inputPerToken: 15, outputPerToken: 75 },
|
||||
// claude-3-5-haiku / claude-haiku-4-5: $0.80/M input, $4/M output
|
||||
"claude-haiku-4-5": { inputPerToken: 0.8, outputPerToken: 4 },
|
||||
"claude-3-5-haiku": { inputPerToken: 0.8, outputPerToken: 4 },
|
||||
// claude-3-5-sonnet: $3/M input, $15/M output
|
||||
"claude-3-5-sonnet": { inputPerToken: 3, outputPerToken: 15 },
|
||||
// claude-3-opus: $15/M input, $75/M output
|
||||
"claude-3-opus": { inputPerToken: 15, outputPerToken: 75 },
|
||||
// claude-3-sonnet: $3/M input, $15/M output
|
||||
"claude-3-sonnet": { inputPerToken: 3, outputPerToken: 15 },
|
||||
// claude-3-haiku: $0.25/M input, $1.25/M output
|
||||
"claude-3-haiku": { inputPerToken: 0.25, outputPerToken: 1.25 },
|
||||
|
||||
// OpenAI models (per-token microdollars)
|
||||
// gpt-4o: $2.50/M input, $10/M output
|
||||
"gpt-4o-mini": { inputPerToken: 0.15, outputPerToken: 0.6 },
|
||||
"gpt-4o": { inputPerToken: 2.5, outputPerToken: 10 },
|
||||
// gpt-4-turbo: $10/M input, $30/M output
|
||||
"gpt-4-turbo": { inputPerToken: 10, outputPerToken: 30 },
|
||||
// gpt-4: $30/M input, $60/M output
|
||||
"gpt-4": { inputPerToken: 30, outputPerToken: 60 },
|
||||
// gpt-3.5-turbo: $0.50/M input, $1.50/M output
|
||||
"gpt-3.5-turbo": { inputPerToken: 0.5, outputPerToken: 1.5 },
|
||||
|
||||
// Ollama / local models: free
|
||||
// These are catch-all entries; any model not matched above falls through to getModelCost default
|
||||
};
|
||||
|
||||
/**
|
||||
* Sorted model prefixes from longest to shortest for greedy prefix matching.
|
||||
* Ensures "gpt-4o-mini" matches before "gpt-4o" and "claude-3-5-haiku" before "claude-3-haiku".
|
||||
*/
|
||||
const SORTED_PREFIXES = Object.keys(MODEL_COSTS).sort((a, b) => b.length - a.length);
|
||||
|
||||
/**
|
||||
* Look up per-token cost for a given model name.
|
||||
*
|
||||
* Uses longest-prefix matching: the model name is compared against known
|
||||
* prefixes from longest to shortest. If no prefix matches, returns zero cost
|
||||
* (assumes local/free model).
|
||||
*
|
||||
* @param modelName - Full model name (e.g. "claude-sonnet-4-5-20250929", "gpt-4o")
|
||||
* @returns Per-token cost in microdollars
|
||||
*/
|
||||
export function getModelCost(modelName: string): ModelCost {
|
||||
const normalized = modelName.toLowerCase();
|
||||
|
||||
for (const prefix of SORTED_PREFIXES) {
|
||||
if (normalized.startsWith(prefix)) {
|
||||
const cost = MODEL_COSTS[prefix];
|
||||
if (cost !== undefined) {
|
||||
return cost;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Unknown or local model — assume free
|
||||
return { inputPerToken: 0, outputPerToken: 0 };
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate total cost in microdollars for a given model and token counts.
|
||||
*
|
||||
* @param modelName - Full model name
|
||||
* @param inputTokens - Number of input (prompt) tokens
|
||||
* @param outputTokens - Number of output (completion) tokens
|
||||
* @returns Total cost in microdollars (USD * 1,000,000)
|
||||
*/
|
||||
export function calculateCostMicrodollars(
|
||||
modelName: string,
|
||||
inputTokens: number,
|
||||
outputTokens: number
|
||||
): number {
|
||||
const cost = getModelCost(modelName);
|
||||
return Math.round(cost.inputPerToken * inputTokens + cost.outputPerToken * outputTokens);
|
||||
}
|
||||
487
apps/api/src/llm/llm-telemetry-tracker.service.spec.ts
Normal file
487
apps/api/src/llm/llm-telemetry-tracker.service.spec.ts
Normal file
@@ -0,0 +1,487 @@
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { TaskType, Complexity, Harness, Provider, Outcome } from "@mosaicstack/telemetry-client";
|
||||
import type { TaskCompletionEvent, EventBuilderParams } from "@mosaicstack/telemetry-client";
|
||||
import { MosaicTelemetryService } from "../mosaic-telemetry/mosaic-telemetry.service";
|
||||
import {
|
||||
LlmTelemetryTrackerService,
|
||||
estimateTokens,
|
||||
mapProviderType,
|
||||
mapHarness,
|
||||
inferTaskType,
|
||||
} from "./llm-telemetry-tracker.service";
|
||||
import type { LlmCompletionParams } from "./llm-telemetry-tracker.service";
|
||||
import { getModelCost, calculateCostMicrodollars } from "./llm-cost-table";
|
||||
|
||||
// ---------- Cost Table Tests ----------
|
||||
|
||||
describe("llm-cost-table", () => {
|
||||
describe("getModelCost", () => {
|
||||
it("should return cost for claude-sonnet-4-5 models", () => {
|
||||
const cost = getModelCost("claude-sonnet-4-5-20250929");
|
||||
expect(cost.inputPerToken).toBe(3);
|
||||
expect(cost.outputPerToken).toBe(15);
|
||||
});
|
||||
|
||||
it("should return cost for claude-opus-4 models", () => {
|
||||
const cost = getModelCost("claude-opus-4-6");
|
||||
expect(cost.inputPerToken).toBe(15);
|
||||
expect(cost.outputPerToken).toBe(75);
|
||||
});
|
||||
|
||||
it("should return cost for claude-haiku-4-5 models", () => {
|
||||
const cost = getModelCost("claude-haiku-4-5-20251001");
|
||||
expect(cost.inputPerToken).toBe(0.8);
|
||||
expect(cost.outputPerToken).toBe(4);
|
||||
});
|
||||
|
||||
it("should return cost for gpt-4o", () => {
|
||||
const cost = getModelCost("gpt-4o");
|
||||
expect(cost.inputPerToken).toBe(2.5);
|
||||
expect(cost.outputPerToken).toBe(10);
|
||||
});
|
||||
|
||||
it("should return cost for gpt-4o-mini (longer prefix matches first)", () => {
|
||||
const cost = getModelCost("gpt-4o-mini");
|
||||
expect(cost.inputPerToken).toBe(0.15);
|
||||
expect(cost.outputPerToken).toBe(0.6);
|
||||
});
|
||||
|
||||
it("should return zero cost for unknown/local models", () => {
|
||||
const cost = getModelCost("llama3.2");
|
||||
expect(cost.inputPerToken).toBe(0);
|
||||
expect(cost.outputPerToken).toBe(0);
|
||||
});
|
||||
|
||||
it("should return zero cost for ollama models", () => {
|
||||
const cost = getModelCost("mistral:7b");
|
||||
expect(cost.inputPerToken).toBe(0);
|
||||
expect(cost.outputPerToken).toBe(0);
|
||||
});
|
||||
|
||||
it("should be case-insensitive", () => {
|
||||
const cost = getModelCost("Claude-Sonnet-4-5-20250929");
|
||||
expect(cost.inputPerToken).toBe(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("calculateCostMicrodollars", () => {
|
||||
it("should calculate cost for claude-sonnet-4-5 with token counts", () => {
|
||||
// 1000 input tokens * 3 + 500 output tokens * 15 = 3000 + 7500 = 10500
|
||||
const cost = calculateCostMicrodollars("claude-sonnet-4-5-20250929", 1000, 500);
|
||||
expect(cost).toBe(10500);
|
||||
});
|
||||
|
||||
it("should return 0 for local models", () => {
|
||||
const cost = calculateCostMicrodollars("llama3.2", 1000, 500);
|
||||
expect(cost).toBe(0);
|
||||
});
|
||||
|
||||
it("should return 0 when token counts are 0", () => {
|
||||
const cost = calculateCostMicrodollars("claude-opus-4-6", 0, 0);
|
||||
expect(cost).toBe(0);
|
||||
});
|
||||
|
||||
it("should round the result to integer microdollars", () => {
|
||||
// gpt-4o-mini: 0.15 * 3 + 0.6 * 7 = 0.45 + 4.2 = 4.65 -> rounds to 5
|
||||
const cost = calculateCostMicrodollars("gpt-4o-mini", 3, 7);
|
||||
expect(cost).toBe(5);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// ---------- Helper Function Tests ----------
|
||||
|
||||
describe("helper functions", () => {
|
||||
describe("estimateTokens", () => {
|
||||
it("should estimate ~1 token per 4 characters", () => {
|
||||
expect(estimateTokens("abcd")).toBe(1);
|
||||
expect(estimateTokens("abcdefgh")).toBe(2);
|
||||
});
|
||||
|
||||
it("should round up for partial tokens", () => {
|
||||
expect(estimateTokens("abc")).toBe(1);
|
||||
expect(estimateTokens("abcde")).toBe(2);
|
||||
});
|
||||
|
||||
it("should return 0 for empty string", () => {
|
||||
expect(estimateTokens("")).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("mapProviderType", () => {
|
||||
it("should map claude to ANTHROPIC", () => {
|
||||
expect(mapProviderType("claude")).toBe(Provider.ANTHROPIC);
|
||||
});
|
||||
|
||||
it("should map openai to OPENAI", () => {
|
||||
expect(mapProviderType("openai")).toBe(Provider.OPENAI);
|
||||
});
|
||||
|
||||
it("should map ollama to OLLAMA", () => {
|
||||
expect(mapProviderType("ollama")).toBe(Provider.OLLAMA);
|
||||
});
|
||||
});
|
||||
|
||||
describe("mapHarness", () => {
|
||||
it("should map ollama to OLLAMA_LOCAL", () => {
|
||||
expect(mapHarness("ollama")).toBe(Harness.OLLAMA_LOCAL);
|
||||
});
|
||||
|
||||
it("should map claude to API_DIRECT", () => {
|
||||
expect(mapHarness("claude")).toBe(Harness.API_DIRECT);
|
||||
});
|
||||
|
||||
it("should map openai to API_DIRECT", () => {
|
||||
expect(mapHarness("openai")).toBe(Harness.API_DIRECT);
|
||||
});
|
||||
});
|
||||
|
||||
describe("inferTaskType", () => {
|
||||
it("should return IMPLEMENTATION for embed operation", () => {
|
||||
expect(inferTaskType("embed")).toBe(TaskType.IMPLEMENTATION);
|
||||
});
|
||||
|
||||
it("should return UNKNOWN when no context provided for chat", () => {
|
||||
expect(inferTaskType("chat")).toBe(TaskType.UNKNOWN);
|
||||
});
|
||||
|
||||
it("should return PLANNING for brain context", () => {
|
||||
expect(inferTaskType("chat", "brain")).toBe(TaskType.PLANNING);
|
||||
});
|
||||
|
||||
it("should return PLANNING for planning context", () => {
|
||||
expect(inferTaskType("chat", "planning")).toBe(TaskType.PLANNING);
|
||||
});
|
||||
|
||||
it("should return CODE_REVIEW for review context", () => {
|
||||
expect(inferTaskType("chat", "code-review")).toBe(TaskType.CODE_REVIEW);
|
||||
});
|
||||
|
||||
it("should return TESTING for test context", () => {
|
||||
expect(inferTaskType("chat", "test-generation")).toBe(TaskType.TESTING);
|
||||
});
|
||||
|
||||
it("should return DEBUGGING for debug context", () => {
|
||||
expect(inferTaskType("chatStream", "debug-session")).toBe(TaskType.DEBUGGING);
|
||||
});
|
||||
|
||||
it("should return REFACTORING for refactor context", () => {
|
||||
expect(inferTaskType("chat", "refactor")).toBe(TaskType.REFACTORING);
|
||||
});
|
||||
|
||||
it("should return DOCUMENTATION for doc context", () => {
|
||||
expect(inferTaskType("chat", "documentation")).toBe(TaskType.DOCUMENTATION);
|
||||
});
|
||||
|
||||
it("should return CONFIGURATION for config context", () => {
|
||||
expect(inferTaskType("chat", "config-update")).toBe(TaskType.CONFIGURATION);
|
||||
});
|
||||
|
||||
it("should return SECURITY_AUDIT for security context", () => {
|
||||
expect(inferTaskType("chat", "security-check")).toBe(TaskType.SECURITY_AUDIT);
|
||||
});
|
||||
|
||||
it("should return IMPLEMENTATION for chat context", () => {
|
||||
expect(inferTaskType("chat", "chat")).toBe(TaskType.IMPLEMENTATION);
|
||||
});
|
||||
|
||||
it("should be case-insensitive", () => {
|
||||
expect(inferTaskType("chat", "BRAIN")).toBe(TaskType.PLANNING);
|
||||
});
|
||||
|
||||
it("should return UNKNOWN for unrecognized context", () => {
|
||||
expect(inferTaskType("chat", "something-else")).toBe(TaskType.UNKNOWN);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// ---------- LlmTelemetryTrackerService Tests ----------
|
||||
|
||||
describe("LlmTelemetryTrackerService", () => {
|
||||
let service: LlmTelemetryTrackerService;
|
||||
let mockTelemetryService: {
|
||||
eventBuilder: { build: ReturnType<typeof vi.fn> } | null;
|
||||
trackTaskCompletion: ReturnType<typeof vi.fn>;
|
||||
isEnabled: boolean;
|
||||
};
|
||||
|
||||
const mockEvent: TaskCompletionEvent = {
|
||||
instance_id: "test-instance",
|
||||
event_id: "test-event",
|
||||
schema_version: "1.0.0",
|
||||
timestamp: new Date().toISOString(),
|
||||
task_duration_ms: 1000,
|
||||
task_type: TaskType.IMPLEMENTATION,
|
||||
complexity: Complexity.LOW,
|
||||
harness: Harness.API_DIRECT,
|
||||
model: "claude-sonnet-4-5-20250929",
|
||||
provider: Provider.ANTHROPIC,
|
||||
estimated_input_tokens: 100,
|
||||
estimated_output_tokens: 200,
|
||||
actual_input_tokens: 100,
|
||||
actual_output_tokens: 200,
|
||||
estimated_cost_usd_micros: 3300,
|
||||
actual_cost_usd_micros: 3300,
|
||||
quality_gate_passed: true,
|
||||
quality_gates_run: [],
|
||||
quality_gates_failed: [],
|
||||
context_compactions: 0,
|
||||
context_rotations: 0,
|
||||
context_utilization_final: 0,
|
||||
outcome: Outcome.SUCCESS,
|
||||
retry_count: 0,
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
mockTelemetryService = {
|
||||
eventBuilder: {
|
||||
build: vi.fn().mockReturnValue(mockEvent),
|
||||
},
|
||||
trackTaskCompletion: vi.fn(),
|
||||
isEnabled: true,
|
||||
};
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
LlmTelemetryTrackerService,
|
||||
{
|
||||
provide: MosaicTelemetryService,
|
||||
useValue: mockTelemetryService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<LlmTelemetryTrackerService>(LlmTelemetryTrackerService);
|
||||
});
|
||||
|
||||
it("should be defined", () => {
|
||||
expect(service).toBeDefined();
|
||||
});
|
||||
|
||||
describe("trackLlmCompletion", () => {
|
||||
const baseParams: LlmCompletionParams = {
|
||||
model: "claude-sonnet-4-5-20250929",
|
||||
providerType: "claude",
|
||||
operation: "chat",
|
||||
durationMs: 1200,
|
||||
inputTokens: 150,
|
||||
outputTokens: 300,
|
||||
callingContext: "chat",
|
||||
success: true,
|
||||
};
|
||||
|
||||
it("should build and track a telemetry event for Anthropic provider", () => {
|
||||
service.trackLlmCompletion(baseParams);
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
task_duration_ms: 1200,
|
||||
task_type: TaskType.IMPLEMENTATION,
|
||||
complexity: Complexity.LOW,
|
||||
harness: Harness.API_DIRECT,
|
||||
model: "claude-sonnet-4-5-20250929",
|
||||
provider: Provider.ANTHROPIC,
|
||||
actual_input_tokens: 150,
|
||||
actual_output_tokens: 300,
|
||||
outcome: Outcome.SUCCESS,
|
||||
})
|
||||
);
|
||||
|
||||
expect(mockTelemetryService.trackTaskCompletion).toHaveBeenCalledWith(mockEvent);
|
||||
});
|
||||
|
||||
it("should build and track a telemetry event for OpenAI provider", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
model: "gpt-4o",
|
||||
providerType: "openai",
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "gpt-4o",
|
||||
provider: Provider.OPENAI,
|
||||
harness: Harness.API_DIRECT,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should build and track a telemetry event for Ollama provider", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
model: "llama3.2",
|
||||
providerType: "ollama",
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "llama3.2",
|
||||
provider: Provider.OLLAMA,
|
||||
harness: Harness.OLLAMA_LOCAL,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should calculate cost in microdollars correctly", () => {
|
||||
service.trackLlmCompletion(baseParams);
|
||||
|
||||
// claude-sonnet-4-5: 150 * 3 + 300 * 15 = 450 + 4500 = 4950
|
||||
const expectedActualCost = 4950;
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
// Estimated values are 0 when no PredictionService is injected
|
||||
estimated_cost_usd_micros: 0,
|
||||
actual_cost_usd_micros: expectedActualCost,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should calculate zero cost for ollama models", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
model: "llama3.2",
|
||||
providerType: "ollama",
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
estimated_cost_usd_micros: 0,
|
||||
actual_cost_usd_micros: 0,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should track FAILURE outcome when success is false", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
success: false,
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
outcome: Outcome.FAILURE,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should infer task type from calling context", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
callingContext: "brain",
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
task_type: TaskType.PLANNING,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should set empty quality gates arrays for direct LLM calls", () => {
|
||||
service.trackLlmCompletion(baseParams);
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
quality_gate_passed: true,
|
||||
quality_gates_run: [],
|
||||
quality_gates_failed: [],
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should silently skip when telemetry is disabled (eventBuilder is null)", () => {
|
||||
mockTelemetryService.eventBuilder = null;
|
||||
|
||||
// Should not throw
|
||||
service.trackLlmCompletion(baseParams);
|
||||
|
||||
expect(mockTelemetryService.trackTaskCompletion).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should not throw when eventBuilder.build throws an error", () => {
|
||||
mockTelemetryService.eventBuilder = {
|
||||
build: vi.fn().mockImplementation(() => {
|
||||
throw new Error("Build failed");
|
||||
}),
|
||||
};
|
||||
|
||||
// Should not throw
|
||||
expect(() => service.trackLlmCompletion(baseParams)).not.toThrow();
|
||||
});
|
||||
|
||||
it("should not throw when trackTaskCompletion throws an error", () => {
|
||||
mockTelemetryService.trackTaskCompletion.mockImplementation(() => {
|
||||
throw new Error("Track failed");
|
||||
});
|
||||
|
||||
// Should not throw
|
||||
expect(() => service.trackLlmCompletion(baseParams)).not.toThrow();
|
||||
});
|
||||
|
||||
it("should handle streaming operation with estimated tokens", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
operation: "chatStream",
|
||||
inputTokens: 50,
|
||||
outputTokens: 100,
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
actual_input_tokens: 50,
|
||||
actual_output_tokens: 100,
|
||||
// Estimated values are 0 when no PredictionService is injected
|
||||
estimated_input_tokens: 0,
|
||||
estimated_output_tokens: 0,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should handle embed operation", () => {
|
||||
service.trackLlmCompletion({
|
||||
...baseParams,
|
||||
operation: "embed",
|
||||
outputTokens: 0,
|
||||
callingContext: undefined,
|
||||
});
|
||||
|
||||
expect(mockTelemetryService.eventBuilder?.build).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
task_type: TaskType.IMPLEMENTATION,
|
||||
actual_output_tokens: 0,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should pass all required EventBuilderParams fields", () => {
|
||||
service.trackLlmCompletion(baseParams);
|
||||
|
||||
const buildCall = (mockTelemetryService.eventBuilder?.build as ReturnType<typeof vi.fn>).mock
|
||||
.calls[0][0] as EventBuilderParams;
|
||||
|
||||
// Verify all required fields are present
|
||||
expect(buildCall).toHaveProperty("task_duration_ms");
|
||||
expect(buildCall).toHaveProperty("task_type");
|
||||
expect(buildCall).toHaveProperty("complexity");
|
||||
expect(buildCall).toHaveProperty("harness");
|
||||
expect(buildCall).toHaveProperty("model");
|
||||
expect(buildCall).toHaveProperty("provider");
|
||||
expect(buildCall).toHaveProperty("estimated_input_tokens");
|
||||
expect(buildCall).toHaveProperty("estimated_output_tokens");
|
||||
expect(buildCall).toHaveProperty("actual_input_tokens");
|
||||
expect(buildCall).toHaveProperty("actual_output_tokens");
|
||||
expect(buildCall).toHaveProperty("estimated_cost_usd_micros");
|
||||
expect(buildCall).toHaveProperty("actual_cost_usd_micros");
|
||||
expect(buildCall).toHaveProperty("quality_gate_passed");
|
||||
expect(buildCall).toHaveProperty("quality_gates_run");
|
||||
expect(buildCall).toHaveProperty("quality_gates_failed");
|
||||
expect(buildCall).toHaveProperty("context_compactions");
|
||||
expect(buildCall).toHaveProperty("context_rotations");
|
||||
expect(buildCall).toHaveProperty("context_utilization_final");
|
||||
expect(buildCall).toHaveProperty("outcome");
|
||||
expect(buildCall).toHaveProperty("retry_count");
|
||||
});
|
||||
});
|
||||
});
|
||||
224
apps/api/src/llm/llm-telemetry-tracker.service.ts
Normal file
224
apps/api/src/llm/llm-telemetry-tracker.service.ts
Normal file
@@ -0,0 +1,224 @@
|
||||
import { Injectable, Logger, Optional } from "@nestjs/common";
|
||||
import { MosaicTelemetryService } from "../mosaic-telemetry/mosaic-telemetry.service";
|
||||
import { PredictionService } from "../mosaic-telemetry/prediction.service";
|
||||
import { TaskType, Complexity, Harness, Provider, Outcome } from "@mosaicstack/telemetry-client";
|
||||
import type { LlmProviderType } from "./providers/llm-provider.interface";
|
||||
import { calculateCostMicrodollars } from "./llm-cost-table";
|
||||
|
||||
/**
|
||||
* Parameters for tracking an LLM completion event.
|
||||
*/
|
||||
export interface LlmCompletionParams {
|
||||
/** Full model name (e.g. "claude-sonnet-4-5-20250929") */
|
||||
model: string;
|
||||
/** Provider type discriminator */
|
||||
providerType: LlmProviderType;
|
||||
/** Operation type that was performed */
|
||||
operation: "chat" | "chatStream" | "embed";
|
||||
/** Duration of the LLM call in milliseconds */
|
||||
durationMs: number;
|
||||
/** Number of input (prompt) tokens consumed */
|
||||
inputTokens: number;
|
||||
/** Number of output (completion) tokens generated */
|
||||
outputTokens: number;
|
||||
/**
|
||||
* Optional calling context hint for task type inference.
|
||||
* Examples: "brain", "chat", "embed", "planning", "code-review"
|
||||
*/
|
||||
callingContext?: string | undefined;
|
||||
/** Whether the call succeeded or failed */
|
||||
success: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimated token count from text length.
|
||||
* Uses a rough approximation of ~4 characters per token (GPT/Claude average).
|
||||
*/
|
||||
export function estimateTokens(text: string): number {
|
||||
return Math.ceil(text.length / 4);
|
||||
}
|
||||
|
||||
/** Map LLM provider type to telemetry Provider enum */
|
||||
export function mapProviderType(providerType: LlmProviderType): Provider {
|
||||
switch (providerType) {
|
||||
case "claude":
|
||||
return Provider.ANTHROPIC;
|
||||
case "openai":
|
||||
return Provider.OPENAI;
|
||||
case "ollama":
|
||||
return Provider.OLLAMA;
|
||||
default:
|
||||
return Provider.UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
/** Map LLM provider type to telemetry Harness enum */
|
||||
export function mapHarness(providerType: LlmProviderType): Harness {
|
||||
switch (providerType) {
|
||||
case "ollama":
|
||||
return Harness.OLLAMA_LOCAL;
|
||||
default:
|
||||
return Harness.API_DIRECT;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Infer the task type from calling context and operation.
|
||||
*
|
||||
* @param operation - The LLM operation (chat, chatStream, embed)
|
||||
* @param callingContext - Optional hint about the caller's purpose
|
||||
* @returns Inferred TaskType
|
||||
*/
|
||||
export function inferTaskType(
|
||||
operation: "chat" | "chatStream" | "embed",
|
||||
callingContext?: string
|
||||
): TaskType {
|
||||
// Embedding operations are typically for indexing/search
|
||||
if (operation === "embed") {
|
||||
return TaskType.IMPLEMENTATION;
|
||||
}
|
||||
|
||||
if (!callingContext) {
|
||||
return TaskType.UNKNOWN;
|
||||
}
|
||||
|
||||
const ctx = callingContext.toLowerCase();
|
||||
|
||||
if (ctx.includes("brain") || ctx.includes("planning") || ctx.includes("plan")) {
|
||||
return TaskType.PLANNING;
|
||||
}
|
||||
if (ctx.includes("review") || ctx.includes("code-review")) {
|
||||
return TaskType.CODE_REVIEW;
|
||||
}
|
||||
if (ctx.includes("test")) {
|
||||
return TaskType.TESTING;
|
||||
}
|
||||
if (ctx.includes("debug")) {
|
||||
return TaskType.DEBUGGING;
|
||||
}
|
||||
if (ctx.includes("refactor")) {
|
||||
return TaskType.REFACTORING;
|
||||
}
|
||||
if (ctx.includes("doc")) {
|
||||
return TaskType.DOCUMENTATION;
|
||||
}
|
||||
if (ctx.includes("config")) {
|
||||
return TaskType.CONFIGURATION;
|
||||
}
|
||||
if (ctx.includes("security") || ctx.includes("audit")) {
|
||||
return TaskType.SECURITY_AUDIT;
|
||||
}
|
||||
if (ctx.includes("chat") || ctx.includes("implement")) {
|
||||
return TaskType.IMPLEMENTATION;
|
||||
}
|
||||
|
||||
return TaskType.UNKNOWN;
|
||||
}
|
||||
|
||||
/**
|
||||
* LLM Telemetry Tracker Service
|
||||
*
|
||||
* Builds and submits telemetry events for LLM completions.
|
||||
* All tracking is non-blocking and fire-and-forget; telemetry errors
|
||||
* never propagate to the caller.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* // After a successful chat completion
|
||||
* this.telemetryTracker.trackLlmCompletion({
|
||||
* model: "claude-sonnet-4-5-20250929",
|
||||
* providerType: "claude",
|
||||
* operation: "chat",
|
||||
* durationMs: 1200,
|
||||
* inputTokens: 150,
|
||||
* outputTokens: 300,
|
||||
* callingContext: "chat",
|
||||
* success: true,
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class LlmTelemetryTrackerService {
|
||||
private readonly logger = new Logger(LlmTelemetryTrackerService.name);
|
||||
|
||||
constructor(
|
||||
private readonly telemetry: MosaicTelemetryService,
|
||||
@Optional() private readonly predictionService?: PredictionService
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Track an LLM completion event via Mosaic Telemetry.
|
||||
*
|
||||
* This method is intentionally fire-and-forget. It catches all errors
|
||||
* internally and logs them without propagating to the caller.
|
||||
*
|
||||
* @param params - LLM completion parameters
|
||||
*/
|
||||
trackLlmCompletion(params: LlmCompletionParams): void {
|
||||
try {
|
||||
const builder = this.telemetry.eventBuilder;
|
||||
if (!builder) {
|
||||
// Telemetry is disabled — silently skip
|
||||
return;
|
||||
}
|
||||
|
||||
const taskType = inferTaskType(params.operation, params.callingContext);
|
||||
const provider = mapProviderType(params.providerType);
|
||||
|
||||
const costMicrodollars = calculateCostMicrodollars(
|
||||
params.model,
|
||||
params.inputTokens,
|
||||
params.outputTokens
|
||||
);
|
||||
|
||||
// Query predictions for estimated fields (graceful degradation)
|
||||
let estimatedInputTokens = 0;
|
||||
let estimatedOutputTokens = 0;
|
||||
let estimatedCostMicros = 0;
|
||||
|
||||
if (this.predictionService) {
|
||||
const prediction = this.predictionService.getEstimate(
|
||||
taskType,
|
||||
params.model,
|
||||
provider,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
if (prediction?.prediction && prediction.metadata.confidence !== "none") {
|
||||
estimatedInputTokens = prediction.prediction.input_tokens.median;
|
||||
estimatedOutputTokens = prediction.prediction.output_tokens.median;
|
||||
estimatedCostMicros = prediction.prediction.cost_usd_micros.median ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
const event = builder.build({
|
||||
task_duration_ms: params.durationMs,
|
||||
task_type: taskType,
|
||||
complexity: Complexity.LOW,
|
||||
harness: mapHarness(params.providerType),
|
||||
model: params.model,
|
||||
provider,
|
||||
estimated_input_tokens: estimatedInputTokens,
|
||||
estimated_output_tokens: estimatedOutputTokens,
|
||||
actual_input_tokens: params.inputTokens,
|
||||
actual_output_tokens: params.outputTokens,
|
||||
estimated_cost_usd_micros: estimatedCostMicros,
|
||||
actual_cost_usd_micros: costMicrodollars,
|
||||
quality_gate_passed: true,
|
||||
quality_gates_run: [],
|
||||
quality_gates_failed: [],
|
||||
context_compactions: 0,
|
||||
context_rotations: 0,
|
||||
context_utilization_final: 0,
|
||||
outcome: params.success ? Outcome.SUCCESS : Outcome.FAILURE,
|
||||
retry_count: 0,
|
||||
});
|
||||
|
||||
this.telemetry.trackTaskCompletion(event);
|
||||
} catch (error: unknown) {
|
||||
// Never let telemetry errors propagate
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(`Failed to track LLM telemetry event: ${msg}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,13 +3,14 @@ import { LlmController } from "./llm.controller";
|
||||
import { LlmProviderAdminController } from "./llm-provider-admin.controller";
|
||||
import { LlmService } from "./llm.service";
|
||||
import { LlmManagerService } from "./llm-manager.service";
|
||||
import { LlmTelemetryTrackerService } from "./llm-telemetry-tracker.service";
|
||||
import { PrismaModule } from "../prisma/prisma.module";
|
||||
import { LlmUsageModule } from "../llm-usage/llm-usage.module";
|
||||
|
||||
@Module({
|
||||
imports: [PrismaModule, LlmUsageModule],
|
||||
controllers: [LlmController, LlmProviderAdminController],
|
||||
providers: [LlmService, LlmManagerService],
|
||||
providers: [LlmService, LlmManagerService, LlmTelemetryTrackerService],
|
||||
exports: [LlmService, LlmManagerService],
|
||||
})
|
||||
export class LlmModule {}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { ServiceUnavailableException } from "@nestjs/common";
|
||||
import { LlmService } from "./llm.service";
|
||||
import { LlmManagerService } from "./llm-manager.service";
|
||||
import { LlmTelemetryTrackerService } from "./llm-telemetry-tracker.service";
|
||||
import type { ChatRequestDto, EmbedRequestDto, ChatResponseDto, EmbedResponseDto } from "./dto";
|
||||
import type {
|
||||
LlmProviderInterface,
|
||||
@@ -14,6 +15,9 @@ describe("LlmService", () => {
|
||||
let mockManagerService: {
|
||||
getDefaultProvider: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let mockTelemetryTracker: {
|
||||
trackLlmCompletion: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let mockProvider: {
|
||||
chat: ReturnType<typeof vi.fn>;
|
||||
chatStream: ReturnType<typeof vi.fn>;
|
||||
@@ -41,6 +45,11 @@ describe("LlmService", () => {
|
||||
getDefaultProvider: vi.fn().mockResolvedValue(mockProvider),
|
||||
};
|
||||
|
||||
// Create mock telemetry tracker
|
||||
mockTelemetryTracker = {
|
||||
trackLlmCompletion: vi.fn(),
|
||||
};
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
LlmService,
|
||||
@@ -48,6 +57,10 @@ describe("LlmService", () => {
|
||||
provide: LlmManagerService,
|
||||
useValue: mockManagerService,
|
||||
},
|
||||
{
|
||||
provide: LlmTelemetryTrackerService,
|
||||
useValue: mockTelemetryTracker,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@@ -135,6 +148,45 @@ describe("LlmService", () => {
|
||||
expect(result).toEqual(response);
|
||||
});
|
||||
|
||||
it("should track telemetry on successful chat", async () => {
|
||||
const response: ChatResponseDto = {
|
||||
model: "llama3.2",
|
||||
message: { role: "assistant", content: "Hello" },
|
||||
done: true,
|
||||
promptEvalCount: 10,
|
||||
evalCount: 20,
|
||||
};
|
||||
mockProvider.chat.mockResolvedValue(response);
|
||||
|
||||
await service.chat(request, "chat");
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "llama3.2",
|
||||
providerType: "ollama",
|
||||
operation: "chat",
|
||||
inputTokens: 10,
|
||||
outputTokens: 20,
|
||||
callingContext: "chat",
|
||||
success: true,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should track telemetry on failed chat", async () => {
|
||||
mockProvider.chat.mockRejectedValue(new Error("Chat failed"));
|
||||
|
||||
await expect(service.chat(request)).rejects.toThrow(ServiceUnavailableException);
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "llama3.2",
|
||||
operation: "chat",
|
||||
success: false,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should throw ServiceUnavailableException on error", async () => {
|
||||
mockProvider.chat.mockRejectedValue(new Error("Chat failed"));
|
||||
|
||||
@@ -177,6 +229,94 @@ describe("LlmService", () => {
|
||||
expect(chunks[1].message.content).toBe(" world");
|
||||
});
|
||||
|
||||
it("should track telemetry after stream completes", async () => {
|
||||
async function* mockGenerator(): AsyncGenerator<ChatResponseDto> {
|
||||
yield {
|
||||
model: "llama3.2",
|
||||
message: { role: "assistant", content: "Hello" },
|
||||
done: false,
|
||||
};
|
||||
yield {
|
||||
model: "llama3.2",
|
||||
message: { role: "assistant", content: " world" },
|
||||
done: true,
|
||||
promptEvalCount: 5,
|
||||
evalCount: 10,
|
||||
};
|
||||
}
|
||||
|
||||
mockProvider.chatStream.mockReturnValue(mockGenerator());
|
||||
|
||||
const chunks: ChatResponseDto[] = [];
|
||||
for await (const chunk of service.chatStream(request, "brain")) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "llama3.2",
|
||||
providerType: "ollama",
|
||||
operation: "chatStream",
|
||||
inputTokens: 5,
|
||||
outputTokens: 10,
|
||||
callingContext: "brain",
|
||||
success: true,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should estimate tokens when provider does not return counts in stream", async () => {
|
||||
async function* mockGenerator(): AsyncGenerator<ChatResponseDto> {
|
||||
yield {
|
||||
model: "llama3.2",
|
||||
message: { role: "assistant", content: "Hello world" },
|
||||
done: false,
|
||||
};
|
||||
yield {
|
||||
model: "llama3.2",
|
||||
message: { role: "assistant", content: "" },
|
||||
done: true,
|
||||
};
|
||||
}
|
||||
|
||||
mockProvider.chatStream.mockReturnValue(mockGenerator());
|
||||
|
||||
const chunks: ChatResponseDto[] = [];
|
||||
for await (const chunk of service.chatStream(request)) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
// Should use estimated tokens since no actual counts provided
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operation: "chatStream",
|
||||
success: true,
|
||||
// Input estimated from "Hi" -> ceil(2/4) = 1
|
||||
inputTokens: 1,
|
||||
// Output estimated from "Hello world" -> ceil(11/4) = 3
|
||||
outputTokens: 3,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should track telemetry on stream failure", async () => {
|
||||
async function* errorGenerator(): AsyncGenerator<ChatResponseDto> {
|
||||
throw new Error("Stream failed");
|
||||
}
|
||||
|
||||
mockProvider.chatStream.mockReturnValue(errorGenerator());
|
||||
|
||||
const generator = service.chatStream(request);
|
||||
await expect(generator.next()).rejects.toThrow(ServiceUnavailableException);
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operation: "chatStream",
|
||||
success: false,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should throw ServiceUnavailableException on error", async () => {
|
||||
async function* errorGenerator(): AsyncGenerator<ChatResponseDto> {
|
||||
throw new Error("Stream failed");
|
||||
@@ -210,6 +350,41 @@ describe("LlmService", () => {
|
||||
expect(result).toEqual(response);
|
||||
});
|
||||
|
||||
it("should track telemetry on successful embed", async () => {
|
||||
const response: EmbedResponseDto = {
|
||||
model: "llama3.2",
|
||||
embeddings: [[0.1, 0.2, 0.3]],
|
||||
totalDuration: 500,
|
||||
};
|
||||
mockProvider.embed.mockResolvedValue(response);
|
||||
|
||||
await service.embed(request, "embed");
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
model: "llama3.2",
|
||||
providerType: "ollama",
|
||||
operation: "embed",
|
||||
outputTokens: 0,
|
||||
callingContext: "embed",
|
||||
success: true,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should track telemetry on failed embed", async () => {
|
||||
mockProvider.embed.mockRejectedValue(new Error("Embedding failed"));
|
||||
|
||||
await expect(service.embed(request)).rejects.toThrow(ServiceUnavailableException);
|
||||
|
||||
expect(mockTelemetryTracker.trackLlmCompletion).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operation: "embed",
|
||||
success: false,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should throw ServiceUnavailableException on error", async () => {
|
||||
mockProvider.embed.mockRejectedValue(new Error("Embedding failed"));
|
||||
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import { Injectable, OnModuleInit, Logger, ServiceUnavailableException } from "@nestjs/common";
|
||||
import { LlmManagerService } from "./llm-manager.service";
|
||||
import { LlmTelemetryTrackerService, estimateTokens } from "./llm-telemetry-tracker.service";
|
||||
import type { ChatRequestDto, ChatResponseDto, EmbedRequestDto, EmbedResponseDto } from "./dto";
|
||||
import type { LlmProviderHealthStatus } from "./providers/llm-provider.interface";
|
||||
import type { LlmProviderHealthStatus, LlmProviderType } from "./providers/llm-provider.interface";
|
||||
|
||||
/**
|
||||
* LLM Service
|
||||
*
|
||||
* High-level service for LLM operations. Delegates to providers via LlmManagerService.
|
||||
* Maintains backward compatibility with the original API while supporting multiple providers.
|
||||
* Automatically tracks completions via Mosaic Telemetry (non-blocking).
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
@@ -33,7 +35,10 @@ import type { LlmProviderHealthStatus } from "./providers/llm-provider.interface
|
||||
export class LlmService implements OnModuleInit {
|
||||
private readonly logger = new Logger(LlmService.name);
|
||||
|
||||
constructor(private readonly llmManager: LlmManagerService) {
|
||||
constructor(
|
||||
private readonly llmManager: LlmManagerService,
|
||||
private readonly telemetryTracker: LlmTelemetryTrackerService
|
||||
) {
|
||||
this.logger.log("LLM service initialized");
|
||||
}
|
||||
|
||||
@@ -91,14 +96,45 @@ export class LlmService implements OnModuleInit {
|
||||
* Perform a synchronous chat completion.
|
||||
*
|
||||
* @param request - Chat request with messages and configuration
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @returns Complete chat response
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async chat(request: ChatRequestDto): Promise<ChatResponseDto> {
|
||||
async chat(request: ChatRequestDto, callingContext?: string): Promise<ChatResponseDto> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
return await provider.chat(request);
|
||||
providerType = provider.type;
|
||||
const response = await provider.chat(request);
|
||||
|
||||
// Fire-and-forget telemetry tracking
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: response.model,
|
||||
providerType,
|
||||
operation: "chat",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: response.promptEvalCount ?? 0,
|
||||
outputTokens: response.evalCount ?? 0,
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
|
||||
return response;
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chat",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Chat failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Chat completion failed: ${errorMessage}`);
|
||||
@@ -107,20 +143,75 @@ export class LlmService implements OnModuleInit {
|
||||
/**
|
||||
* Perform a streaming chat completion.
|
||||
* Yields response chunks as they arrive from the provider.
|
||||
* Aggregates token usage and tracks telemetry after the stream ends.
|
||||
*
|
||||
* @param request - Chat request with messages and configuration
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @yields Chat response chunks
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async *chatStream(request: ChatRequestDto): AsyncGenerator<ChatResponseDto, void, unknown> {
|
||||
async *chatStream(
|
||||
request: ChatRequestDto,
|
||||
callingContext?: string
|
||||
): AsyncGenerator<ChatResponseDto, void, unknown> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
let aggregatedContent = "";
|
||||
let lastChunkInputTokens = 0;
|
||||
let lastChunkOutputTokens = 0;
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
providerType = provider.type;
|
||||
const stream = provider.chatStream(request);
|
||||
|
||||
for await (const chunk of stream) {
|
||||
// Accumulate content for token estimation
|
||||
aggregatedContent += chunk.message.content;
|
||||
|
||||
// Some providers include token counts on the final chunk
|
||||
if (chunk.promptEvalCount !== undefined) {
|
||||
lastChunkInputTokens = chunk.promptEvalCount;
|
||||
}
|
||||
if (chunk.evalCount !== undefined) {
|
||||
lastChunkOutputTokens = chunk.evalCount;
|
||||
}
|
||||
|
||||
yield chunk;
|
||||
}
|
||||
|
||||
// After stream completes, track telemetry
|
||||
// Use actual token counts if available, otherwise estimate from content length
|
||||
const inputTokens =
|
||||
lastChunkInputTokens > 0
|
||||
? lastChunkInputTokens
|
||||
: estimateTokens(request.messages.map((m) => m.content).join(" "));
|
||||
const outputTokens =
|
||||
lastChunkOutputTokens > 0 ? lastChunkOutputTokens : estimateTokens(aggregatedContent);
|
||||
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chatStream",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "chatStream",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Stream failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Streaming failed: ${errorMessage}`);
|
||||
@@ -130,14 +221,48 @@ export class LlmService implements OnModuleInit {
|
||||
* Generate embeddings for the given input texts.
|
||||
*
|
||||
* @param request - Embedding request with model and input texts
|
||||
* @param callingContext - Optional context hint for telemetry task type inference
|
||||
* @returns Embeddings response with vector arrays
|
||||
* @throws {ServiceUnavailableException} If provider is unavailable or request fails
|
||||
*/
|
||||
async embed(request: EmbedRequestDto): Promise<EmbedResponseDto> {
|
||||
async embed(request: EmbedRequestDto, callingContext?: string): Promise<EmbedResponseDto> {
|
||||
const startTime = Date.now();
|
||||
let providerType: LlmProviderType = "ollama";
|
||||
|
||||
try {
|
||||
const provider = await this.llmManager.getDefaultProvider();
|
||||
return await provider.embed(request);
|
||||
providerType = provider.type;
|
||||
const response = await provider.embed(request);
|
||||
|
||||
// Estimate input tokens from the input text
|
||||
const inputTokens = estimateTokens(request.input.join(" "));
|
||||
|
||||
// Fire-and-forget telemetry tracking
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: response.model,
|
||||
providerType,
|
||||
operation: "embed",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens,
|
||||
outputTokens: 0, // Embeddings don't produce output tokens
|
||||
callingContext,
|
||||
success: true,
|
||||
});
|
||||
|
||||
return response;
|
||||
} catch (error: unknown) {
|
||||
// Track failure (fire-and-forget)
|
||||
this.telemetryTracker.trackLlmCompletion({
|
||||
model: request.model,
|
||||
providerType,
|
||||
operation: "embed",
|
||||
durationMs: Date.now() - startTime,
|
||||
inputTokens: 0,
|
||||
outputTokens: 0,
|
||||
callingContext,
|
||||
success: false,
|
||||
});
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
this.logger.error(`Embed failed: ${errorMessage}`);
|
||||
throw new ServiceUnavailableException(`Embedding failed: ${errorMessage}`);
|
||||
|
||||
17
apps/api/src/mosaic-telemetry/index.ts
Normal file
17
apps/api/src/mosaic-telemetry/index.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
/**
|
||||
* Mosaic Telemetry module — task completion tracking and crowd-sourced predictions.
|
||||
*
|
||||
* **Not to be confused with the OpenTelemetry (OTEL) TelemetryModule** at
|
||||
* `src/telemetry/`, which handles distributed request tracing.
|
||||
*
|
||||
* @module mosaic-telemetry
|
||||
*/
|
||||
|
||||
export { MosaicTelemetryModule } from "./mosaic-telemetry.module";
|
||||
export { MosaicTelemetryService } from "./mosaic-telemetry.service";
|
||||
export {
|
||||
loadMosaicTelemetryConfig,
|
||||
toSdkConfig,
|
||||
MOSAIC_TELEMETRY_ENV,
|
||||
type MosaicTelemetryModuleConfig,
|
||||
} from "./mosaic-telemetry.config";
|
||||
78
apps/api/src/mosaic-telemetry/mosaic-telemetry.config.ts
Normal file
78
apps/api/src/mosaic-telemetry/mosaic-telemetry.config.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import type { ConfigService } from "@nestjs/config";
|
||||
import type { TelemetryConfig } from "@mosaicstack/telemetry-client";
|
||||
|
||||
/**
|
||||
* Configuration interface for the Mosaic Telemetry module.
|
||||
* Maps environment variables to SDK configuration.
|
||||
*/
|
||||
export interface MosaicTelemetryModuleConfig {
|
||||
/** Whether telemetry collection is enabled. Default: true */
|
||||
enabled: boolean;
|
||||
/** Base URL of the telemetry server */
|
||||
serverUrl: string;
|
||||
/** API key for authentication (64-char hex string) */
|
||||
apiKey: string;
|
||||
/** Instance UUID for this client */
|
||||
instanceId: string;
|
||||
/** If true, log events instead of sending them. Default: false */
|
||||
dryRun: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Environment variable names used by the Mosaic Telemetry module.
|
||||
*/
|
||||
export const MOSAIC_TELEMETRY_ENV = {
|
||||
ENABLED: "MOSAIC_TELEMETRY_ENABLED",
|
||||
SERVER_URL: "MOSAIC_TELEMETRY_SERVER_URL",
|
||||
API_KEY: "MOSAIC_TELEMETRY_API_KEY",
|
||||
INSTANCE_ID: "MOSAIC_TELEMETRY_INSTANCE_ID",
|
||||
DRY_RUN: "MOSAIC_TELEMETRY_DRY_RUN",
|
||||
} as const;
|
||||
|
||||
/**
|
||||
* Read Mosaic Telemetry configuration from environment variables via NestJS ConfigService.
|
||||
*
|
||||
* @param configService - NestJS ConfigService instance
|
||||
* @returns Parsed module configuration
|
||||
*/
|
||||
export function loadMosaicTelemetryConfig(
|
||||
configService: ConfigService
|
||||
): MosaicTelemetryModuleConfig {
|
||||
const enabledRaw = configService.get<string>(MOSAIC_TELEMETRY_ENV.ENABLED, "true");
|
||||
const dryRunRaw = configService.get<string>(MOSAIC_TELEMETRY_ENV.DRY_RUN, "false");
|
||||
|
||||
return {
|
||||
enabled: enabledRaw.toLowerCase() === "true",
|
||||
serverUrl: configService.get<string>(MOSAIC_TELEMETRY_ENV.SERVER_URL, ""),
|
||||
apiKey: configService.get<string>(MOSAIC_TELEMETRY_ENV.API_KEY, ""),
|
||||
instanceId: configService.get<string>(MOSAIC_TELEMETRY_ENV.INSTANCE_ID, ""),
|
||||
dryRun: dryRunRaw.toLowerCase() === "true",
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert module config to SDK TelemetryConfig format.
|
||||
* Includes the onError callback for NestJS Logger integration.
|
||||
*
|
||||
* @param config - Module configuration
|
||||
* @param onError - Error callback (typically NestJS Logger)
|
||||
* @returns SDK-compatible TelemetryConfig
|
||||
*/
|
||||
export function toSdkConfig(
|
||||
config: MosaicTelemetryModuleConfig,
|
||||
onError?: (error: Error) => void
|
||||
): TelemetryConfig {
|
||||
const sdkConfig: TelemetryConfig = {
|
||||
serverUrl: config.serverUrl,
|
||||
apiKey: config.apiKey,
|
||||
instanceId: config.instanceId,
|
||||
enabled: config.enabled,
|
||||
dryRun: config.dryRun,
|
||||
};
|
||||
|
||||
if (onError) {
|
||||
sdkConfig.onError = onError;
|
||||
}
|
||||
|
||||
return sdkConfig;
|
||||
}
|
||||
92
apps/api/src/mosaic-telemetry/mosaic-telemetry.controller.ts
Normal file
92
apps/api/src/mosaic-telemetry/mosaic-telemetry.controller.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import { Controller, Get, Query, UseGuards, BadRequestException } from "@nestjs/common";
|
||||
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||
import { PredictionService } from "./prediction.service";
|
||||
import {
|
||||
TaskType,
|
||||
Complexity,
|
||||
Provider,
|
||||
type PredictionResponse,
|
||||
} from "@mosaicstack/telemetry-client";
|
||||
|
||||
/**
|
||||
* Valid values for query parameter validation.
|
||||
*/
|
||||
const VALID_TASK_TYPES = new Set<string>(Object.values(TaskType));
|
||||
const VALID_COMPLEXITIES = new Set<string>(Object.values(Complexity));
|
||||
const VALID_PROVIDERS = new Set<string>(Object.values(Provider));
|
||||
|
||||
/**
|
||||
* Response DTO for the estimate endpoint.
|
||||
*/
|
||||
interface EstimateResponseDto {
|
||||
data: PredictionResponse | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mosaic Telemetry Controller
|
||||
*
|
||||
* Provides API endpoints for accessing telemetry prediction data.
|
||||
* All endpoints require authentication via AuthGuard.
|
||||
*
|
||||
* This controller is intentionally lightweight - it delegates to PredictionService
|
||||
* for the actual prediction logic and returns results directly to the frontend.
|
||||
*/
|
||||
@Controller("telemetry")
|
||||
@UseGuards(AuthGuard)
|
||||
export class MosaicTelemetryController {
|
||||
constructor(private readonly predictionService: PredictionService) {}
|
||||
|
||||
/**
|
||||
* GET /api/telemetry/estimate
|
||||
*
|
||||
* Get a cost/token estimate for a given task configuration.
|
||||
* Returns prediction data including confidence level, or null if
|
||||
* no prediction is available.
|
||||
*
|
||||
* @param taskType - Task type enum value (e.g. "implementation", "planning")
|
||||
* @param model - Model name (e.g. "claude-sonnet-4-5")
|
||||
* @param provider - Provider enum value (e.g. "anthropic", "openai")
|
||||
* @param complexity - Complexity level (e.g. "low", "medium", "high")
|
||||
* @returns Prediction response with estimates and confidence
|
||||
*/
|
||||
@Get("estimate")
|
||||
getEstimate(
|
||||
@Query("taskType") taskType: string,
|
||||
@Query("model") model: string,
|
||||
@Query("provider") provider: string,
|
||||
@Query("complexity") complexity: string
|
||||
): EstimateResponseDto {
|
||||
if (!taskType || !model || !provider || !complexity) {
|
||||
throw new BadRequestException(
|
||||
"Missing query parameters. Required: taskType, model, provider, complexity"
|
||||
);
|
||||
}
|
||||
|
||||
if (!VALID_TASK_TYPES.has(taskType)) {
|
||||
throw new BadRequestException(
|
||||
`Invalid taskType "${taskType}". Valid values: ${[...VALID_TASK_TYPES].join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
if (!VALID_PROVIDERS.has(provider)) {
|
||||
throw new BadRequestException(
|
||||
`Invalid provider "${provider}". Valid values: ${[...VALID_PROVIDERS].join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
if (!VALID_COMPLEXITIES.has(complexity)) {
|
||||
throw new BadRequestException(
|
||||
`Invalid complexity "${complexity}". Valid values: ${[...VALID_COMPLEXITIES].join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
const prediction = this.predictionService.getEstimate(
|
||||
taskType as TaskType,
|
||||
model,
|
||||
provider as Provider,
|
||||
complexity as Complexity
|
||||
);
|
||||
|
||||
return { data: prediction };
|
||||
}
|
||||
}
|
||||
212
apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts
Normal file
212
apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts
Normal file
@@ -0,0 +1,212 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { MosaicTelemetryModule } from "./mosaic-telemetry.module";
|
||||
import { MosaicTelemetryService } from "./mosaic-telemetry.service";
|
||||
|
||||
// Mock the telemetry client to avoid real HTTP calls
|
||||
vi.mock("@mosaicstack/telemetry-client", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("@mosaicstack/telemetry-client")>();
|
||||
|
||||
class MockTelemetryClient {
|
||||
private _isRunning = false;
|
||||
|
||||
constructor(_config: unknown) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
get eventBuilder() {
|
||||
return { build: vi.fn().mockReturnValue({ event_id: "test-event-id" }) };
|
||||
}
|
||||
|
||||
start(): void {
|
||||
this._isRunning = true;
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this._isRunning = false;
|
||||
}
|
||||
|
||||
track(_event: unknown): void {
|
||||
// no-op
|
||||
}
|
||||
|
||||
getPrediction(_query: unknown): unknown {
|
||||
return null;
|
||||
}
|
||||
|
||||
async refreshPredictions(_queries: unknown): Promise<void> {
|
||||
// no-op
|
||||
}
|
||||
|
||||
get queueSize(): number {
|
||||
return 0;
|
||||
}
|
||||
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...actual,
|
||||
TelemetryClient: MockTelemetryClient,
|
||||
};
|
||||
});
|
||||
|
||||
describe("MosaicTelemetryModule", () => {
|
||||
let module: TestingModule;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("module initialization", () => {
|
||||
it("should compile the module successfully", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
expect(module).toBeDefined();
|
||||
await module.close();
|
||||
});
|
||||
|
||||
it("should provide MosaicTelemetryService", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
const service = module.get<MosaicTelemetryService>(MosaicTelemetryService);
|
||||
expect(service).toBeDefined();
|
||||
expect(service).toBeInstanceOf(MosaicTelemetryService);
|
||||
|
||||
await module.close();
|
||||
});
|
||||
|
||||
it("should export MosaicTelemetryService for injection in other modules", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
const service = module.get(MosaicTelemetryService);
|
||||
expect(service).toBeDefined();
|
||||
|
||||
await module.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe("lifecycle integration", () => {
|
||||
it("should initialize service on module init when enabled", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "true",
|
||||
MOSAIC_TELEMETRY_SERVER_URL: "https://tel.test.local",
|
||||
MOSAIC_TELEMETRY_API_KEY: "a".repeat(64),
|
||||
MOSAIC_TELEMETRY_INSTANCE_ID: "550e8400-e29b-41d4-a716-446655440000",
|
||||
MOSAIC_TELEMETRY_DRY_RUN: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
await module.init();
|
||||
|
||||
const service = module.get<MosaicTelemetryService>(MosaicTelemetryService);
|
||||
expect(service.isEnabled).toBe(true);
|
||||
|
||||
await module.close();
|
||||
});
|
||||
|
||||
it("should not start client when disabled via env", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
await module.init();
|
||||
|
||||
const service = module.get<MosaicTelemetryService>(MosaicTelemetryService);
|
||||
expect(service.isEnabled).toBe(false);
|
||||
|
||||
await module.close();
|
||||
});
|
||||
|
||||
it("should cleanly shut down on module destroy", async () => {
|
||||
module = await Test.createTestingModule({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
envFilePath: [],
|
||||
load: [
|
||||
() => ({
|
||||
MOSAIC_TELEMETRY_ENABLED: "true",
|
||||
MOSAIC_TELEMETRY_SERVER_URL: "https://tel.test.local",
|
||||
MOSAIC_TELEMETRY_API_KEY: "a".repeat(64),
|
||||
MOSAIC_TELEMETRY_INSTANCE_ID: "550e8400-e29b-41d4-a716-446655440000",
|
||||
MOSAIC_TELEMETRY_DRY_RUN: "false",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
MosaicTelemetryModule,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
await module.init();
|
||||
|
||||
const service = module.get<MosaicTelemetryService>(MosaicTelemetryService);
|
||||
expect(service.isEnabled).toBe(true);
|
||||
|
||||
await expect(module.close()).resolves.not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
41
apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts
Normal file
41
apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { Module, Global } from "@nestjs/common";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { AuthModule } from "../auth/auth.module";
|
||||
import { MosaicTelemetryService } from "./mosaic-telemetry.service";
|
||||
import { PredictionService } from "./prediction.service";
|
||||
import { MosaicTelemetryController } from "./mosaic-telemetry.controller";
|
||||
|
||||
/**
|
||||
* Global module providing Mosaic Telemetry integration via @mosaicstack/telemetry-client.
|
||||
*
|
||||
* Tracks task completion events and provides crowd-sourced predictions for
|
||||
* token usage, cost estimation, and quality metrics.
|
||||
*
|
||||
* **This is separate from the OpenTelemetry (OTEL) TelemetryModule** which
|
||||
* handles distributed request tracing. This module is specifically for
|
||||
* Mosaic Stack's own telemetry aggregation service.
|
||||
*
|
||||
* Configuration via environment variables:
|
||||
* - MOSAIC_TELEMETRY_ENABLED (boolean, default: true)
|
||||
* - MOSAIC_TELEMETRY_SERVER_URL (string)
|
||||
* - MOSAIC_TELEMETRY_API_KEY (string, 64-char hex)
|
||||
* - MOSAIC_TELEMETRY_INSTANCE_ID (string, UUID)
|
||||
* - MOSAIC_TELEMETRY_DRY_RUN (boolean, default: false)
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* // In any service (no need to import module — it's global):
|
||||
* @Injectable()
|
||||
* export class MyService {
|
||||
* constructor(private readonly telemetry: MosaicTelemetryService) {}
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Global()
|
||||
@Module({
|
||||
imports: [ConfigModule, AuthModule],
|
||||
controllers: [MosaicTelemetryController],
|
||||
providers: [MosaicTelemetryService, PredictionService],
|
||||
exports: [MosaicTelemetryService, PredictionService],
|
||||
})
|
||||
export class MosaicTelemetryModule {}
|
||||
504
apps/api/src/mosaic-telemetry/mosaic-telemetry.service.spec.ts
Normal file
504
apps/api/src/mosaic-telemetry/mosaic-telemetry.service.spec.ts
Normal file
@@ -0,0 +1,504 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { MOSAIC_TELEMETRY_ENV } from "./mosaic-telemetry.config";
|
||||
import type {
|
||||
TaskCompletionEvent,
|
||||
PredictionQuery,
|
||||
PredictionResponse,
|
||||
} from "@mosaicstack/telemetry-client";
|
||||
import { TaskType, Complexity, Provider, Outcome } from "@mosaicstack/telemetry-client";
|
||||
|
||||
// Track mock instances created during tests
|
||||
const mockStartFn = vi.fn();
|
||||
const mockStopFn = vi.fn().mockResolvedValue(undefined);
|
||||
const mockTrackFn = vi.fn();
|
||||
const mockGetPredictionFn = vi.fn().mockReturnValue(null);
|
||||
const mockRefreshPredictionsFn = vi.fn().mockResolvedValue(undefined);
|
||||
const mockBuildFn = vi.fn().mockReturnValue({ event_id: "test-event-id" });
|
||||
|
||||
vi.mock("@mosaicstack/telemetry-client", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("@mosaicstack/telemetry-client")>();
|
||||
|
||||
class MockTelemetryClient {
|
||||
private _isRunning = false;
|
||||
|
||||
constructor(_config: unknown) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
get eventBuilder() {
|
||||
return { build: mockBuildFn };
|
||||
}
|
||||
|
||||
start(): void {
|
||||
this._isRunning = true;
|
||||
mockStartFn();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this._isRunning = false;
|
||||
await mockStopFn();
|
||||
}
|
||||
|
||||
track(event: unknown): void {
|
||||
mockTrackFn(event);
|
||||
}
|
||||
|
||||
getPrediction(query: unknown): unknown {
|
||||
return mockGetPredictionFn(query);
|
||||
}
|
||||
|
||||
async refreshPredictions(queries: unknown): Promise<void> {
|
||||
await mockRefreshPredictionsFn(queries);
|
||||
}
|
||||
|
||||
get queueSize(): number {
|
||||
return 0;
|
||||
}
|
||||
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...actual,
|
||||
TelemetryClient: MockTelemetryClient,
|
||||
};
|
||||
});
|
||||
|
||||
// Lazy-import the service after the mock is in place
|
||||
const { MosaicTelemetryService } = await import("./mosaic-telemetry.service");
|
||||
|
||||
/**
|
||||
* Create a ConfigService mock that returns environment values from the provided map.
|
||||
*/
|
||||
function createConfigService(envMap: Record<string, string | undefined> = {}): ConfigService {
|
||||
const configService = {
|
||||
get: vi.fn((key: string, defaultValue?: string): string => {
|
||||
const value = envMap[key];
|
||||
if (value !== undefined) {
|
||||
return value;
|
||||
}
|
||||
return defaultValue ?? "";
|
||||
}),
|
||||
} as unknown as ConfigService;
|
||||
return configService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default env config for an enabled telemetry service.
|
||||
*/
|
||||
const ENABLED_CONFIG: Record<string, string> = {
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "true",
|
||||
[MOSAIC_TELEMETRY_ENV.SERVER_URL]: "https://tel.test.local",
|
||||
[MOSAIC_TELEMETRY_ENV.API_KEY]: "a".repeat(64),
|
||||
[MOSAIC_TELEMETRY_ENV.INSTANCE_ID]: "550e8400-e29b-41d4-a716-446655440000",
|
||||
[MOSAIC_TELEMETRY_ENV.DRY_RUN]: "false",
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a minimal TaskCompletionEvent for testing.
|
||||
*/
|
||||
function createTestEvent(): TaskCompletionEvent {
|
||||
return {
|
||||
schema_version: "1.0.0",
|
||||
event_id: "test-event-123",
|
||||
timestamp: new Date().toISOString(),
|
||||
instance_id: "550e8400-e29b-41d4-a716-446655440000",
|
||||
task_duration_ms: 5000,
|
||||
task_type: TaskType.FEATURE,
|
||||
complexity: Complexity.MEDIUM,
|
||||
harness: "claude-code" as TaskCompletionEvent["harness"],
|
||||
model: "claude-sonnet-4-20250514",
|
||||
provider: Provider.ANTHROPIC,
|
||||
estimated_input_tokens: 1000,
|
||||
estimated_output_tokens: 500,
|
||||
actual_input_tokens: 1100,
|
||||
actual_output_tokens: 450,
|
||||
estimated_cost_usd_micros: 5000,
|
||||
actual_cost_usd_micros: 4800,
|
||||
quality_gate_passed: true,
|
||||
quality_gates_run: [],
|
||||
quality_gates_failed: [],
|
||||
context_compactions: 0,
|
||||
context_rotations: 0,
|
||||
context_utilization_final: 0.45,
|
||||
outcome: Outcome.SUCCESS,
|
||||
retry_count: 0,
|
||||
};
|
||||
}
|
||||
|
||||
describe("MosaicTelemetryService", () => {
|
||||
let service: InstanceType<typeof MosaicTelemetryService>;
|
||||
|
||||
afterEach(async () => {
|
||||
if (service) {
|
||||
await service.onModuleDestroy();
|
||||
}
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("onModuleInit", () => {
|
||||
it("should initialize the client when enabled with valid config", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(mockStartFn).toHaveBeenCalledOnce();
|
||||
expect(service.isEnabled).toBe(true);
|
||||
});
|
||||
|
||||
it("should not initialize client when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(mockStartFn).not.toHaveBeenCalled();
|
||||
expect(service.isEnabled).toBe(false);
|
||||
});
|
||||
|
||||
it("should disable when server URL is missing", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.SERVER_URL]: "",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.isEnabled).toBe(false);
|
||||
});
|
||||
|
||||
it("should disable when API key is missing", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.API_KEY]: "",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.isEnabled).toBe(false);
|
||||
});
|
||||
|
||||
it("should disable when instance ID is missing", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.INSTANCE_ID]: "",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.isEnabled).toBe(false);
|
||||
});
|
||||
|
||||
it("should log dry-run mode when configured", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
expect(mockStartFn).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
describe("onModuleDestroy", () => {
|
||||
it("should stop the client on shutdown", async () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
await service.onModuleDestroy();
|
||||
|
||||
expect(mockStopFn).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("should not throw when client is not initialized (disabled)", async () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
await expect(service.onModuleDestroy()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should not throw when called multiple times", async () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
await service.onModuleDestroy();
|
||||
await expect(service.onModuleDestroy()).resolves.not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("trackTaskCompletion", () => {
|
||||
it("should queue event via client.track() when enabled", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const event = createTestEvent();
|
||||
service.trackTaskCompletion(event);
|
||||
|
||||
expect(mockTrackFn).toHaveBeenCalledWith(event);
|
||||
});
|
||||
|
||||
it("should be a no-op when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const event = createTestEvent();
|
||||
service.trackTaskCompletion(event);
|
||||
|
||||
expect(mockTrackFn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("getPrediction", () => {
|
||||
const testQuery: PredictionQuery = {
|
||||
task_type: TaskType.FEATURE,
|
||||
model: "claude-sonnet-4-20250514",
|
||||
provider: Provider.ANTHROPIC,
|
||||
complexity: Complexity.MEDIUM,
|
||||
};
|
||||
|
||||
it("should return cached prediction when available", () => {
|
||||
const mockPrediction: PredictionResponse = {
|
||||
prediction: {
|
||||
input_tokens: { p10: 100, p25: 200, median: 300, p75: 400, p90: 500 },
|
||||
output_tokens: { p10: 50, p25: 100, median: 150, p75: 200, p90: 250 },
|
||||
cost_usd_micros: { median: 5000 },
|
||||
duration_ms: { median: 10000 },
|
||||
correction_factors: { input: 1.0, output: 1.0 },
|
||||
quality: { gate_pass_rate: 0.95, success_rate: 0.9 },
|
||||
},
|
||||
metadata: {
|
||||
sample_size: 100,
|
||||
fallback_level: 0,
|
||||
confidence: "high",
|
||||
last_updated: new Date().toISOString(),
|
||||
cache_hit: true,
|
||||
},
|
||||
};
|
||||
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
mockGetPredictionFn.mockReturnValueOnce(mockPrediction);
|
||||
|
||||
const result = service.getPrediction(testQuery);
|
||||
|
||||
expect(result).toEqual(mockPrediction);
|
||||
expect(mockGetPredictionFn).toHaveBeenCalledWith(testQuery);
|
||||
});
|
||||
|
||||
it("should return null when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const result = service.getPrediction(testQuery);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("should return null when no cached prediction exists", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
mockGetPredictionFn.mockReturnValueOnce(null);
|
||||
|
||||
const result = service.getPrediction(testQuery);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("refreshPredictions", () => {
|
||||
const testQueries: PredictionQuery[] = [
|
||||
{
|
||||
task_type: TaskType.FEATURE,
|
||||
model: "claude-sonnet-4-20250514",
|
||||
provider: Provider.ANTHROPIC,
|
||||
complexity: Complexity.MEDIUM,
|
||||
},
|
||||
];
|
||||
|
||||
it("should call client.refreshPredictions when enabled", async () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
await service.refreshPredictions(testQueries);
|
||||
|
||||
expect(mockRefreshPredictionsFn).toHaveBeenCalledWith(testQueries);
|
||||
});
|
||||
|
||||
it("should be a no-op when disabled", async () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
await service.refreshPredictions(testQueries);
|
||||
|
||||
expect(mockRefreshPredictionsFn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("eventBuilder", () => {
|
||||
it("should return EventBuilder when enabled", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const builder = service.eventBuilder;
|
||||
|
||||
expect(builder).toBeDefined();
|
||||
expect(builder).not.toBeNull();
|
||||
expect(typeof builder?.build).toBe("function");
|
||||
});
|
||||
|
||||
it("should return null when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const builder = service.eventBuilder;
|
||||
|
||||
expect(builder).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("isEnabled", () => {
|
||||
it("should return true when client is running", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.isEnabled).toBe(true);
|
||||
});
|
||||
|
||||
it("should return false when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.isEnabled).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("queueSize", () => {
|
||||
it("should return 0 when disabled", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.queueSize).toBe(0);
|
||||
});
|
||||
|
||||
it("should delegate to client.queueSize when enabled", () => {
|
||||
const configService = createConfigService(ENABLED_CONFIG);
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
expect(service.queueSize).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("disabled mode (comprehensive)", () => {
|
||||
beforeEach(() => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.ENABLED]: "false",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
});
|
||||
|
||||
it("should not make any HTTP calls when disabled", () => {
|
||||
const event = createTestEvent();
|
||||
service.trackTaskCompletion(event);
|
||||
|
||||
expect(mockTrackFn).not.toHaveBeenCalled();
|
||||
expect(mockStartFn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should safely handle all method calls when disabled", async () => {
|
||||
expect(() => service.trackTaskCompletion(createTestEvent())).not.toThrow();
|
||||
expect(
|
||||
service.getPrediction({
|
||||
task_type: TaskType.FEATURE,
|
||||
model: "test",
|
||||
provider: Provider.ANTHROPIC,
|
||||
complexity: Complexity.LOW,
|
||||
})
|
||||
).toBeNull();
|
||||
await expect(service.refreshPredictions([])).resolves.not.toThrow();
|
||||
expect(service.eventBuilder).toBeNull();
|
||||
expect(service.isEnabled).toBe(false);
|
||||
expect(service.queueSize).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("dry-run mode", () => {
|
||||
it("should create client in dry-run mode", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
expect(mockStartFn).toHaveBeenCalledOnce();
|
||||
expect(service.isEnabled).toBe(true);
|
||||
});
|
||||
|
||||
it("should accept events in dry-run mode", () => {
|
||||
const configService = createConfigService({
|
||||
...ENABLED_CONFIG,
|
||||
[MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true",
|
||||
});
|
||||
service = new MosaicTelemetryService(configService);
|
||||
service.onModuleInit();
|
||||
|
||||
const event = createTestEvent();
|
||||
service.trackTaskCompletion(event);
|
||||
|
||||
expect(mockTrackFn).toHaveBeenCalledWith(event);
|
||||
});
|
||||
});
|
||||
});
|
||||
164
apps/api/src/mosaic-telemetry/mosaic-telemetry.service.ts
Normal file
164
apps/api/src/mosaic-telemetry/mosaic-telemetry.service.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import {
|
||||
TelemetryClient,
|
||||
type TaskCompletionEvent,
|
||||
type PredictionQuery,
|
||||
type PredictionResponse,
|
||||
type EventBuilder,
|
||||
} from "@mosaicstack/telemetry-client";
|
||||
import {
|
||||
loadMosaicTelemetryConfig,
|
||||
toSdkConfig,
|
||||
type MosaicTelemetryModuleConfig,
|
||||
} from "./mosaic-telemetry.config";
|
||||
|
||||
/**
|
||||
* NestJS service wrapping the @mosaicstack/telemetry-client SDK.
|
||||
*
|
||||
* Provides convenience methods for tracking task completions and reading
|
||||
* crowd-sourced predictions. When telemetry is disabled via
|
||||
* MOSAIC_TELEMETRY_ENABLED=false, all methods are safe no-ops.
|
||||
*
|
||||
* This service is provided globally by MosaicTelemetryModule — any service
|
||||
* can inject it without importing the module explicitly.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* @Injectable()
|
||||
* export class TasksService {
|
||||
* constructor(private readonly telemetry: MosaicTelemetryService) {}
|
||||
*
|
||||
* async completeTask(taskId: string): Promise<void> {
|
||||
* // ... complete the task ...
|
||||
* const event = this.telemetry.eventBuilder.build({ ... });
|
||||
* this.telemetry.trackTaskCompletion(event);
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class MosaicTelemetryService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(MosaicTelemetryService.name);
|
||||
private client: TelemetryClient | null = null;
|
||||
private config: MosaicTelemetryModuleConfig | null = null;
|
||||
|
||||
constructor(private readonly configService: ConfigService) {}
|
||||
|
||||
/**
|
||||
* Initialize the telemetry client on module startup.
|
||||
* Reads configuration from environment variables and starts background submission.
|
||||
*/
|
||||
onModuleInit(): void {
|
||||
this.config = loadMosaicTelemetryConfig(this.configService);
|
||||
|
||||
if (!this.config.enabled) {
|
||||
this.logger.log("Mosaic Telemetry is disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.config.serverUrl || !this.config.apiKey || !this.config.instanceId) {
|
||||
this.logger.warn(
|
||||
"Mosaic Telemetry is enabled but missing configuration " +
|
||||
"(MOSAIC_TELEMETRY_SERVER_URL, MOSAIC_TELEMETRY_API_KEY, or MOSAIC_TELEMETRY_INSTANCE_ID). " +
|
||||
"Telemetry will remain disabled."
|
||||
);
|
||||
this.config = { ...this.config, enabled: false };
|
||||
return;
|
||||
}
|
||||
|
||||
const sdkConfig = toSdkConfig(this.config, (error: Error) => {
|
||||
this.logger.error(`Telemetry client error: ${error.message}`, error.stack);
|
||||
});
|
||||
|
||||
this.client = new TelemetryClient(sdkConfig);
|
||||
this.client.start();
|
||||
|
||||
const mode = this.config.dryRun ? "dry-run" : "live";
|
||||
this.logger.log(`Mosaic Telemetry client started (${mode}) -> ${this.config.serverUrl}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the telemetry client on module shutdown.
|
||||
* Flushes any remaining queued events before stopping.
|
||||
*/
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
if (this.client) {
|
||||
this.logger.log("Stopping Mosaic Telemetry client...");
|
||||
await this.client.stop();
|
||||
this.client = null;
|
||||
this.logger.log("Mosaic Telemetry client stopped");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a task completion event for batch submission.
|
||||
* No-op when telemetry is disabled.
|
||||
*
|
||||
* @param event - The task completion event to track
|
||||
*/
|
||||
trackTaskCompletion(event: TaskCompletionEvent): void {
|
||||
if (!this.client) {
|
||||
return;
|
||||
}
|
||||
this.client.track(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a cached prediction for the given query.
|
||||
* Returns null when telemetry is disabled or if not cached/expired.
|
||||
*
|
||||
* @param query - The prediction query parameters
|
||||
* @returns Cached prediction response, or null
|
||||
*/
|
||||
getPrediction(query: PredictionQuery): PredictionResponse | null {
|
||||
if (!this.client) {
|
||||
return null;
|
||||
}
|
||||
return this.client.getPrediction(query);
|
||||
}
|
||||
|
||||
/**
|
||||
* Force-refresh predictions from the telemetry server.
|
||||
* No-op when telemetry is disabled.
|
||||
*
|
||||
* @param queries - Array of prediction queries to refresh
|
||||
*/
|
||||
async refreshPredictions(queries: PredictionQuery[]): Promise<void> {
|
||||
if (!this.client) {
|
||||
return;
|
||||
}
|
||||
await this.client.refreshPredictions(queries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the EventBuilder for constructing TaskCompletionEvent objects.
|
||||
* Returns null when telemetry is disabled.
|
||||
*
|
||||
* @returns EventBuilder instance, or null if disabled
|
||||
*/
|
||||
get eventBuilder(): EventBuilder | null {
|
||||
if (!this.client) {
|
||||
return null;
|
||||
}
|
||||
return this.client.eventBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the telemetry client is currently active and running.
|
||||
*/
|
||||
get isEnabled(): boolean {
|
||||
return this.client?.isRunning ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of events currently queued for submission.
|
||||
* Returns 0 when telemetry is disabled.
|
||||
*/
|
||||
get queueSize(): number {
|
||||
if (!this.client) {
|
||||
return 0;
|
||||
}
|
||||
return this.client.queueSize;
|
||||
}
|
||||
}
|
||||
297
apps/api/src/mosaic-telemetry/prediction.service.spec.ts
Normal file
297
apps/api/src/mosaic-telemetry/prediction.service.spec.ts
Normal file
@@ -0,0 +1,297 @@
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { TaskType, Complexity, Provider } from "@mosaicstack/telemetry-client";
|
||||
import type { PredictionResponse, PredictionQuery } from "@mosaicstack/telemetry-client";
|
||||
import { MosaicTelemetryService } from "./mosaic-telemetry.service";
|
||||
import { PredictionService } from "./prediction.service";
|
||||
|
||||
describe("PredictionService", () => {
|
||||
let service: PredictionService;
|
||||
let mockTelemetryService: {
|
||||
isEnabled: boolean;
|
||||
getPrediction: ReturnType<typeof vi.fn>;
|
||||
refreshPredictions: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const mockPredictionResponse: PredictionResponse = {
|
||||
prediction: {
|
||||
input_tokens: {
|
||||
p10: 50,
|
||||
p25: 80,
|
||||
median: 120,
|
||||
p75: 200,
|
||||
p90: 350,
|
||||
},
|
||||
output_tokens: {
|
||||
p10: 100,
|
||||
p25: 150,
|
||||
median: 250,
|
||||
p75: 400,
|
||||
p90: 600,
|
||||
},
|
||||
cost_usd_micros: {
|
||||
p10: 500,
|
||||
p25: 800,
|
||||
median: 1200,
|
||||
p75: 2000,
|
||||
p90: 3500,
|
||||
},
|
||||
duration_ms: {
|
||||
p10: 200,
|
||||
p25: 400,
|
||||
median: 800,
|
||||
p75: 1500,
|
||||
p90: 3000,
|
||||
},
|
||||
correction_factors: {
|
||||
input: 1.0,
|
||||
output: 1.0,
|
||||
},
|
||||
quality: {
|
||||
gate_pass_rate: 0.95,
|
||||
success_rate: 0.92,
|
||||
},
|
||||
},
|
||||
metadata: {
|
||||
sample_size: 150,
|
||||
fallback_level: 0,
|
||||
confidence: "high",
|
||||
last_updated: "2026-02-15T00:00:00Z",
|
||||
cache_hit: true,
|
||||
},
|
||||
};
|
||||
|
||||
const nullPredictionResponse: PredictionResponse = {
|
||||
prediction: null,
|
||||
metadata: {
|
||||
sample_size: 0,
|
||||
fallback_level: 3,
|
||||
confidence: "none",
|
||||
last_updated: null,
|
||||
cache_hit: false,
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
mockTelemetryService = {
|
||||
isEnabled: true,
|
||||
getPrediction: vi.fn().mockReturnValue(mockPredictionResponse),
|
||||
refreshPredictions: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
PredictionService,
|
||||
{
|
||||
provide: MosaicTelemetryService,
|
||||
useValue: mockTelemetryService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<PredictionService>(PredictionService);
|
||||
});
|
||||
|
||||
it("should be defined", () => {
|
||||
expect(service).toBeDefined();
|
||||
});
|
||||
|
||||
// ---------- getEstimate ----------
|
||||
|
||||
describe("getEstimate", () => {
|
||||
it("should return prediction response for valid query", () => {
|
||||
const result = service.getEstimate(
|
||||
TaskType.IMPLEMENTATION,
|
||||
"claude-sonnet-4-5",
|
||||
Provider.ANTHROPIC,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
expect(result).toEqual(mockPredictionResponse);
|
||||
expect(mockTelemetryService.getPrediction).toHaveBeenCalledWith({
|
||||
task_type: TaskType.IMPLEMENTATION,
|
||||
model: "claude-sonnet-4-5",
|
||||
provider: Provider.ANTHROPIC,
|
||||
complexity: Complexity.LOW,
|
||||
});
|
||||
});
|
||||
|
||||
it("should pass correct query parameters to telemetry service", () => {
|
||||
service.getEstimate(TaskType.CODE_REVIEW, "gpt-4o", Provider.OPENAI, Complexity.HIGH);
|
||||
|
||||
expect(mockTelemetryService.getPrediction).toHaveBeenCalledWith({
|
||||
task_type: TaskType.CODE_REVIEW,
|
||||
model: "gpt-4o",
|
||||
provider: Provider.OPENAI,
|
||||
complexity: Complexity.HIGH,
|
||||
});
|
||||
});
|
||||
|
||||
it("should return null when telemetry returns null", () => {
|
||||
mockTelemetryService.getPrediction.mockReturnValue(null);
|
||||
|
||||
const result = service.getEstimate(
|
||||
TaskType.IMPLEMENTATION,
|
||||
"claude-sonnet-4-5",
|
||||
Provider.ANTHROPIC,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("should return null prediction response when confidence is none", () => {
|
||||
mockTelemetryService.getPrediction.mockReturnValue(nullPredictionResponse);
|
||||
|
||||
const result = service.getEstimate(
|
||||
TaskType.IMPLEMENTATION,
|
||||
"unknown-model",
|
||||
Provider.UNKNOWN,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
expect(result).toEqual(nullPredictionResponse);
|
||||
expect(result?.metadata.confidence).toBe("none");
|
||||
});
|
||||
|
||||
it("should return null and not throw when getPrediction throws", () => {
|
||||
mockTelemetryService.getPrediction.mockImplementation(() => {
|
||||
throw new Error("Prediction fetch failed");
|
||||
});
|
||||
|
||||
const result = service.getEstimate(
|
||||
TaskType.IMPLEMENTATION,
|
||||
"claude-sonnet-4-5",
|
||||
Provider.ANTHROPIC,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("should handle non-Error thrown objects gracefully", () => {
|
||||
mockTelemetryService.getPrediction.mockImplementation(() => {
|
||||
throw "string error";
|
||||
});
|
||||
|
||||
const result = service.getEstimate(
|
||||
TaskType.IMPLEMENTATION,
|
||||
"claude-sonnet-4-5",
|
||||
Provider.ANTHROPIC,
|
||||
Complexity.LOW
|
||||
);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------- refreshCommonPredictions ----------
|
||||
|
||||
describe("refreshCommonPredictions", () => {
|
||||
it("should call refreshPredictions with multiple query combinations", async () => {
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
expect(mockTelemetryService.refreshPredictions).toHaveBeenCalledTimes(1);
|
||||
|
||||
const queries: PredictionQuery[] = mockTelemetryService.refreshPredictions.mock.calls[0][0];
|
||||
|
||||
// Should have queries for cross-product of models, task types, and complexities
|
||||
expect(queries.length).toBeGreaterThan(0);
|
||||
|
||||
// Verify all queries have valid structure
|
||||
for (const query of queries) {
|
||||
expect(query).toHaveProperty("task_type");
|
||||
expect(query).toHaveProperty("model");
|
||||
expect(query).toHaveProperty("provider");
|
||||
expect(query).toHaveProperty("complexity");
|
||||
}
|
||||
});
|
||||
|
||||
it("should include Anthropic model predictions", async () => {
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
const queries: PredictionQuery[] = mockTelemetryService.refreshPredictions.mock.calls[0][0];
|
||||
|
||||
const anthropicQueries = queries.filter(
|
||||
(q: PredictionQuery) => q.provider === Provider.ANTHROPIC
|
||||
);
|
||||
expect(anthropicQueries.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("should include OpenAI model predictions", async () => {
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
const queries: PredictionQuery[] = mockTelemetryService.refreshPredictions.mock.calls[0][0];
|
||||
|
||||
const openaiQueries = queries.filter((q: PredictionQuery) => q.provider === Provider.OPENAI);
|
||||
expect(openaiQueries.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("should not call refreshPredictions when telemetry is disabled", async () => {
|
||||
mockTelemetryService.isEnabled = false;
|
||||
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
expect(mockTelemetryService.refreshPredictions).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should not throw when refreshPredictions rejects", async () => {
|
||||
mockTelemetryService.refreshPredictions.mockRejectedValue(new Error("Server unreachable"));
|
||||
|
||||
// Should not throw
|
||||
await expect(service.refreshCommonPredictions()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should include common task types in queries", async () => {
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
const queries: PredictionQuery[] = mockTelemetryService.refreshPredictions.mock.calls[0][0];
|
||||
|
||||
const taskTypes = new Set(queries.map((q: PredictionQuery) => q.task_type));
|
||||
|
||||
expect(taskTypes.has(TaskType.IMPLEMENTATION)).toBe(true);
|
||||
expect(taskTypes.has(TaskType.PLANNING)).toBe(true);
|
||||
expect(taskTypes.has(TaskType.CODE_REVIEW)).toBe(true);
|
||||
});
|
||||
|
||||
it("should include common complexity levels in queries", async () => {
|
||||
await service.refreshCommonPredictions();
|
||||
|
||||
const queries: PredictionQuery[] = mockTelemetryService.refreshPredictions.mock.calls[0][0];
|
||||
|
||||
const complexities = new Set(queries.map((q: PredictionQuery) => q.complexity));
|
||||
|
||||
expect(complexities.has(Complexity.LOW)).toBe(true);
|
||||
expect(complexities.has(Complexity.MEDIUM)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------- onModuleInit ----------
|
||||
|
||||
describe("onModuleInit", () => {
|
||||
it("should trigger refreshCommonPredictions on init when telemetry is enabled", () => {
|
||||
// refreshPredictions is async, but onModuleInit fires it and forgets
|
||||
service.onModuleInit();
|
||||
|
||||
// Give the promise microtask a chance to execute
|
||||
expect(mockTelemetryService.isEnabled).toBe(true);
|
||||
// refreshPredictions will be called asynchronously
|
||||
});
|
||||
|
||||
it("should not refresh when telemetry is disabled", () => {
|
||||
mockTelemetryService.isEnabled = false;
|
||||
|
||||
service.onModuleInit();
|
||||
|
||||
// refreshPredictions should not be called since we returned early
|
||||
expect(mockTelemetryService.refreshPredictions).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should not throw when refresh fails on init", () => {
|
||||
mockTelemetryService.refreshPredictions.mockRejectedValue(new Error("Connection refused"));
|
||||
|
||||
// Should not throw
|
||||
expect(() => service.onModuleInit()).not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
161
apps/api/src/mosaic-telemetry/prediction.service.ts
Normal file
161
apps/api/src/mosaic-telemetry/prediction.service.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
|
||||
import {
|
||||
TaskType,
|
||||
Complexity,
|
||||
Provider,
|
||||
type PredictionQuery,
|
||||
type PredictionResponse,
|
||||
} from "@mosaicstack/telemetry-client";
|
||||
import { MosaicTelemetryService } from "./mosaic-telemetry.service";
|
||||
|
||||
/**
|
||||
* Common model-provider combinations used for pre-fetching predictions.
|
||||
* These represent the most frequently used LLM configurations.
|
||||
*/
|
||||
const COMMON_MODELS: { model: string; provider: Provider }[] = [
|
||||
{ model: "claude-sonnet-4-5", provider: Provider.ANTHROPIC },
|
||||
{ model: "claude-opus-4", provider: Provider.ANTHROPIC },
|
||||
{ model: "claude-haiku-4-5", provider: Provider.ANTHROPIC },
|
||||
{ model: "gpt-4o", provider: Provider.OPENAI },
|
||||
{ model: "gpt-4o-mini", provider: Provider.OPENAI },
|
||||
];
|
||||
|
||||
/**
|
||||
* Common task types to pre-fetch predictions for.
|
||||
*/
|
||||
const COMMON_TASK_TYPES: TaskType[] = [
|
||||
TaskType.IMPLEMENTATION,
|
||||
TaskType.PLANNING,
|
||||
TaskType.CODE_REVIEW,
|
||||
];
|
||||
|
||||
/**
|
||||
* Common complexity levels to pre-fetch predictions for.
|
||||
*/
|
||||
const COMMON_COMPLEXITIES: Complexity[] = [Complexity.LOW, Complexity.MEDIUM];
|
||||
|
||||
/**
|
||||
* PredictionService
|
||||
*
|
||||
* Provides pre-task cost and token estimates using crowd-sourced prediction data
|
||||
* from the Mosaic Telemetry server. Predictions are cached by the underlying SDK
|
||||
* with a 6-hour TTL.
|
||||
*
|
||||
* This service is intentionally non-blocking: if predictions are unavailable
|
||||
* (telemetry disabled, server unreachable, no data), all methods return null
|
||||
* without throwing errors. Task execution should never be blocked by prediction
|
||||
* failures.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const estimate = this.predictionService.getEstimate(
|
||||
* TaskType.IMPLEMENTATION,
|
||||
* "claude-sonnet-4-5",
|
||||
* Provider.ANTHROPIC,
|
||||
* Complexity.LOW,
|
||||
* );
|
||||
* if (estimate?.prediction) {
|
||||
* console.log(`Estimated cost: ${estimate.prediction.cost_usd_micros}`);
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class PredictionService implements OnModuleInit {
|
||||
private readonly logger = new Logger(PredictionService.name);
|
||||
|
||||
constructor(private readonly telemetry: MosaicTelemetryService) {}
|
||||
|
||||
/**
|
||||
* Refresh common predictions on startup.
|
||||
* Runs asynchronously and never blocks module initialization.
|
||||
*/
|
||||
onModuleInit(): void {
|
||||
if (!this.telemetry.isEnabled) {
|
||||
this.logger.log("Telemetry disabled - skipping prediction refresh");
|
||||
return;
|
||||
}
|
||||
|
||||
// Fire-and-forget: refresh in the background
|
||||
this.refreshCommonPredictions().catch((error: unknown) => {
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(`Failed to refresh common predictions on startup: ${msg}`);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a cost/token estimate for a given task configuration.
|
||||
*
|
||||
* Returns the cached prediction from the SDK, or null if:
|
||||
* - Telemetry is disabled
|
||||
* - No prediction data exists for this combination
|
||||
* - The prediction has expired
|
||||
*
|
||||
* @param taskType - The type of task to estimate
|
||||
* @param model - The model name (e.g. "claude-sonnet-4-5")
|
||||
* @param provider - The provider enum value
|
||||
* @param complexity - The complexity level
|
||||
* @returns Prediction response with estimates and confidence, or null
|
||||
*/
|
||||
getEstimate(
|
||||
taskType: TaskType,
|
||||
model: string,
|
||||
provider: Provider,
|
||||
complexity: Complexity
|
||||
): PredictionResponse | null {
|
||||
try {
|
||||
const query: PredictionQuery = {
|
||||
task_type: taskType,
|
||||
model,
|
||||
provider,
|
||||
complexity,
|
||||
};
|
||||
|
||||
return this.telemetry.getPrediction(query);
|
||||
} catch (error: unknown) {
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(`Failed to get prediction estimate: ${msg}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh predictions for commonly used (taskType, model, provider, complexity) combinations.
|
||||
*
|
||||
* Generates the cross-product of common models, task types, and complexities,
|
||||
* then batch-refreshes them from the telemetry server. The SDK caches the
|
||||
* results with a 6-hour TTL.
|
||||
*
|
||||
* This method is safe to call at any time. If telemetry is disabled or the
|
||||
* server is unreachable, it completes without error.
|
||||
*/
|
||||
async refreshCommonPredictions(): Promise<void> {
|
||||
if (!this.telemetry.isEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
const queries: PredictionQuery[] = [];
|
||||
|
||||
for (const { model, provider } of COMMON_MODELS) {
|
||||
for (const taskType of COMMON_TASK_TYPES) {
|
||||
for (const complexity of COMMON_COMPLEXITIES) {
|
||||
queries.push({
|
||||
task_type: taskType,
|
||||
model,
|
||||
provider,
|
||||
complexity,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Refreshing ${String(queries.length)} common prediction queries...`);
|
||||
|
||||
try {
|
||||
await this.telemetry.refreshPredictions(queries);
|
||||
this.logger.log(`Successfully refreshed ${String(queries.length)} predictions`);
|
||||
} catch (error: unknown) {
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
this.logger.warn(`Failed to refresh predictions: ${msg}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user