diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..db95609 --- /dev/null +++ b/.npmrc @@ -0,0 +1 @@ +@mosaicstack:registry=https://git.mosaicstack.dev/api/packages/mosaic/npm/ diff --git a/apps/api/package.json b/apps/api/package.json index ce11f92..0f40211 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -27,6 +27,7 @@ "dependencies": { "@anthropic-ai/sdk": "^0.72.1", "@mosaic/shared": "workspace:*", + "@mosaicstack/telemetry-client": "^0.1.0", "@nestjs/axios": "^4.0.1", "@nestjs/bullmq": "^11.0.4", "@nestjs/common": "^11.1.12", diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 43733e3..704067b 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -37,6 +37,7 @@ import { JobStepsModule } from "./job-steps/job-steps.module"; import { CoordinatorIntegrationModule } from "./coordinator-integration/coordinator-integration.module"; import { FederationModule } from "./federation/federation.module"; import { CredentialsModule } from "./credentials/credentials.module"; +import { MosaicTelemetryModule } from "./mosaic-telemetry"; import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor"; @Module({ @@ -97,6 +98,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce CoordinatorIntegrationModule, FederationModule, CredentialsModule, + MosaicTelemetryModule, ], controllers: [AppController, CsrfController], providers: [ diff --git a/apps/api/src/mosaic-telemetry/index.ts b/apps/api/src/mosaic-telemetry/index.ts new file mode 100644 index 0000000..6f1c402 --- /dev/null +++ b/apps/api/src/mosaic-telemetry/index.ts @@ -0,0 +1,17 @@ +/** + * Mosaic Telemetry module — task completion tracking and crowd-sourced predictions. + * + * **Not to be confused with the OpenTelemetry (OTEL) TelemetryModule** at + * `src/telemetry/`, which handles distributed request tracing. + * + * @module mosaic-telemetry + */ + +export { MosaicTelemetryModule } from "./mosaic-telemetry.module"; +export { MosaicTelemetryService } from "./mosaic-telemetry.service"; +export { + loadMosaicTelemetryConfig, + toSdkConfig, + MOSAIC_TELEMETRY_ENV, + type MosaicTelemetryModuleConfig, +} from "./mosaic-telemetry.config"; diff --git a/apps/api/src/mosaic-telemetry/mosaic-telemetry.config.ts b/apps/api/src/mosaic-telemetry/mosaic-telemetry.config.ts new file mode 100644 index 0000000..f5fa6cf --- /dev/null +++ b/apps/api/src/mosaic-telemetry/mosaic-telemetry.config.ts @@ -0,0 +1,78 @@ +import type { ConfigService } from "@nestjs/config"; +import type { TelemetryConfig } from "@mosaicstack/telemetry-client"; + +/** + * Configuration interface for the Mosaic Telemetry module. + * Maps environment variables to SDK configuration. + */ +export interface MosaicTelemetryModuleConfig { + /** Whether telemetry collection is enabled. Default: true */ + enabled: boolean; + /** Base URL of the telemetry server */ + serverUrl: string; + /** API key for authentication (64-char hex string) */ + apiKey: string; + /** Instance UUID for this client */ + instanceId: string; + /** If true, log events instead of sending them. Default: false */ + dryRun: boolean; +} + +/** + * Environment variable names used by the Mosaic Telemetry module. + */ +export const MOSAIC_TELEMETRY_ENV = { + ENABLED: "MOSAIC_TELEMETRY_ENABLED", + SERVER_URL: "MOSAIC_TELEMETRY_SERVER_URL", + API_KEY: "MOSAIC_TELEMETRY_API_KEY", + INSTANCE_ID: "MOSAIC_TELEMETRY_INSTANCE_ID", + DRY_RUN: "MOSAIC_TELEMETRY_DRY_RUN", +} as const; + +/** + * Read Mosaic Telemetry configuration from environment variables via NestJS ConfigService. + * + * @param configService - NestJS ConfigService instance + * @returns Parsed module configuration + */ +export function loadMosaicTelemetryConfig( + configService: ConfigService +): MosaicTelemetryModuleConfig { + const enabledRaw = configService.get(MOSAIC_TELEMETRY_ENV.ENABLED, "true"); + const dryRunRaw = configService.get(MOSAIC_TELEMETRY_ENV.DRY_RUN, "false"); + + return { + enabled: enabledRaw.toLowerCase() === "true", + serverUrl: configService.get(MOSAIC_TELEMETRY_ENV.SERVER_URL, ""), + apiKey: configService.get(MOSAIC_TELEMETRY_ENV.API_KEY, ""), + instanceId: configService.get(MOSAIC_TELEMETRY_ENV.INSTANCE_ID, ""), + dryRun: dryRunRaw.toLowerCase() === "true", + }; +} + +/** + * Convert module config to SDK TelemetryConfig format. + * Includes the onError callback for NestJS Logger integration. + * + * @param config - Module configuration + * @param onError - Error callback (typically NestJS Logger) + * @returns SDK-compatible TelemetryConfig + */ +export function toSdkConfig( + config: MosaicTelemetryModuleConfig, + onError?: (error: Error) => void +): TelemetryConfig { + const sdkConfig: TelemetryConfig = { + serverUrl: config.serverUrl, + apiKey: config.apiKey, + instanceId: config.instanceId, + enabled: config.enabled, + dryRun: config.dryRun, + }; + + if (onError) { + sdkConfig.onError = onError; + } + + return sdkConfig; +} diff --git a/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts b/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts new file mode 100644 index 0000000..37420ec --- /dev/null +++ b/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts @@ -0,0 +1,212 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { ConfigModule } from "@nestjs/config"; +import { MosaicTelemetryModule } from "./mosaic-telemetry.module"; +import { MosaicTelemetryService } from "./mosaic-telemetry.service"; + +// Mock the telemetry client to avoid real HTTP calls +vi.mock("@mosaicstack/telemetry-client", async (importOriginal) => { + const actual = await importOriginal(); + + class MockTelemetryClient { + private _isRunning = false; + + constructor(_config: unknown) { + // no-op + } + + get eventBuilder() { + return { build: vi.fn().mockReturnValue({ event_id: "test-event-id" }) }; + } + + start(): void { + this._isRunning = true; + } + + async stop(): Promise { + this._isRunning = false; + } + + track(_event: unknown): void { + // no-op + } + + getPrediction(_query: unknown): unknown { + return null; + } + + async refreshPredictions(_queries: unknown): Promise { + // no-op + } + + get queueSize(): number { + return 0; + } + + get isRunning(): boolean { + return this._isRunning; + } + } + + return { + ...actual, + TelemetryClient: MockTelemetryClient, + }; +}); + +describe("MosaicTelemetryModule", () => { + let module: TestingModule; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("module initialization", () => { + it("should compile the module successfully", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + expect(module).toBeDefined(); + await module.close(); + }); + + it("should provide MosaicTelemetryService", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + const service = module.get(MosaicTelemetryService); + expect(service).toBeDefined(); + expect(service).toBeInstanceOf(MosaicTelemetryService); + + await module.close(); + }); + + it("should export MosaicTelemetryService for injection in other modules", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + const service = module.get(MosaicTelemetryService); + expect(service).toBeDefined(); + + await module.close(); + }); + }); + + describe("lifecycle integration", () => { + it("should initialize service on module init when enabled", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "true", + MOSAIC_TELEMETRY_SERVER_URL: "https://tel.test.local", + MOSAIC_TELEMETRY_API_KEY: "a".repeat(64), + MOSAIC_TELEMETRY_INSTANCE_ID: "550e8400-e29b-41d4-a716-446655440000", + MOSAIC_TELEMETRY_DRY_RUN: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + await module.init(); + + const service = module.get(MosaicTelemetryService); + expect(service.isEnabled).toBe(true); + + await module.close(); + }); + + it("should not start client when disabled via env", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + await module.init(); + + const service = module.get(MosaicTelemetryService); + expect(service.isEnabled).toBe(false); + + await module.close(); + }); + + it("should cleanly shut down on module destroy", async () => { + module = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: [], + load: [ + () => ({ + MOSAIC_TELEMETRY_ENABLED: "true", + MOSAIC_TELEMETRY_SERVER_URL: "https://tel.test.local", + MOSAIC_TELEMETRY_API_KEY: "a".repeat(64), + MOSAIC_TELEMETRY_INSTANCE_ID: "550e8400-e29b-41d4-a716-446655440000", + MOSAIC_TELEMETRY_DRY_RUN: "false", + }), + ], + }), + MosaicTelemetryModule, + ], + }).compile(); + + await module.init(); + + const service = module.get(MosaicTelemetryService); + expect(service.isEnabled).toBe(true); + + await expect(module.close()).resolves.not.toThrow(); + }); + }); +}); diff --git a/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts b/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts new file mode 100644 index 0000000..a321dda --- /dev/null +++ b/apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts @@ -0,0 +1,37 @@ +import { Module, Global } from "@nestjs/common"; +import { ConfigModule } from "@nestjs/config"; +import { MosaicTelemetryService } from "./mosaic-telemetry.service"; + +/** + * Global module providing Mosaic Telemetry integration via @mosaicstack/telemetry-client. + * + * Tracks task completion events and provides crowd-sourced predictions for + * token usage, cost estimation, and quality metrics. + * + * **This is separate from the OpenTelemetry (OTEL) TelemetryModule** which + * handles distributed request tracing. This module is specifically for + * Mosaic Stack's own telemetry aggregation service. + * + * Configuration via environment variables: + * - MOSAIC_TELEMETRY_ENABLED (boolean, default: true) + * - MOSAIC_TELEMETRY_SERVER_URL (string) + * - MOSAIC_TELEMETRY_API_KEY (string, 64-char hex) + * - MOSAIC_TELEMETRY_INSTANCE_ID (string, UUID) + * - MOSAIC_TELEMETRY_DRY_RUN (boolean, default: false) + * + * @example + * ```typescript + * // In any service (no need to import module — it's global): + * @Injectable() + * export class MyService { + * constructor(private readonly telemetry: MosaicTelemetryService) {} + * } + * ``` + */ +@Global() +@Module({ + imports: [ConfigModule], + providers: [MosaicTelemetryService], + exports: [MosaicTelemetryService], +}) +export class MosaicTelemetryModule {} diff --git a/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.spec.ts b/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.spec.ts new file mode 100644 index 0000000..a84e84a --- /dev/null +++ b/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.spec.ts @@ -0,0 +1,506 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { ConfigService } from "@nestjs/config"; +import { MOSAIC_TELEMETRY_ENV } from "./mosaic-telemetry.config"; +import type { + TaskCompletionEvent, + PredictionQuery, + PredictionResponse, +} from "@mosaicstack/telemetry-client"; +import { TaskType, Complexity, Provider, Outcome } from "@mosaicstack/telemetry-client"; + +// Track mock instances created during tests +const mockStartFn = vi.fn(); +const mockStopFn = vi.fn().mockResolvedValue(undefined); +const mockTrackFn = vi.fn(); +const mockGetPredictionFn = vi.fn().mockReturnValue(null); +const mockRefreshPredictionsFn = vi.fn().mockResolvedValue(undefined); +const mockBuildFn = vi.fn().mockReturnValue({ event_id: "test-event-id" }); + +vi.mock("@mosaicstack/telemetry-client", async (importOriginal) => { + const actual = await importOriginal(); + + class MockTelemetryClient { + private _isRunning = false; + + constructor(_config: unknown) { + // no-op + } + + get eventBuilder() { + return { build: mockBuildFn }; + } + + start(): void { + this._isRunning = true; + mockStartFn(); + } + + async stop(): Promise { + this._isRunning = false; + await mockStopFn(); + } + + track(event: unknown): void { + mockTrackFn(event); + } + + getPrediction(query: unknown): unknown { + return mockGetPredictionFn(query); + } + + async refreshPredictions(queries: unknown): Promise { + await mockRefreshPredictionsFn(queries); + } + + get queueSize(): number { + return 0; + } + + get isRunning(): boolean { + return this._isRunning; + } + } + + return { + ...actual, + TelemetryClient: MockTelemetryClient, + }; +}); + +// Lazy-import the service after the mock is in place +const { MosaicTelemetryService } = await import("./mosaic-telemetry.service"); + +/** + * Create a ConfigService mock that returns environment values from the provided map. + */ +function createConfigService( + envMap: Record = {}, +): ConfigService { + const configService = { + get: vi.fn((key: string, defaultValue?: string): string => { + const value = envMap[key]; + if (value !== undefined) { + return value; + } + return defaultValue ?? ""; + }), + } as unknown as ConfigService; + return configService; +} + +/** + * Default env config for an enabled telemetry service. + */ +const ENABLED_CONFIG: Record = { + [MOSAIC_TELEMETRY_ENV.ENABLED]: "true", + [MOSAIC_TELEMETRY_ENV.SERVER_URL]: "https://tel.test.local", + [MOSAIC_TELEMETRY_ENV.API_KEY]: "a".repeat(64), + [MOSAIC_TELEMETRY_ENV.INSTANCE_ID]: "550e8400-e29b-41d4-a716-446655440000", + [MOSAIC_TELEMETRY_ENV.DRY_RUN]: "false", +}; + +/** + * Create a minimal TaskCompletionEvent for testing. + */ +function createTestEvent(): TaskCompletionEvent { + return { + schema_version: "1.0.0", + event_id: "test-event-123", + timestamp: new Date().toISOString(), + instance_id: "550e8400-e29b-41d4-a716-446655440000", + task_duration_ms: 5000, + task_type: TaskType.FEATURE, + complexity: Complexity.MEDIUM, + harness: "claude-code" as TaskCompletionEvent["harness"], + model: "claude-sonnet-4-20250514", + provider: Provider.ANTHROPIC, + estimated_input_tokens: 1000, + estimated_output_tokens: 500, + actual_input_tokens: 1100, + actual_output_tokens: 450, + estimated_cost_usd_micros: 5000, + actual_cost_usd_micros: 4800, + quality_gate_passed: true, + quality_gates_run: [], + quality_gates_failed: [], + context_compactions: 0, + context_rotations: 0, + context_utilization_final: 0.45, + outcome: Outcome.SUCCESS, + retry_count: 0, + }; +} + +describe("MosaicTelemetryService", () => { + let service: InstanceType; + + afterEach(async () => { + if (service) { + await service.onModuleDestroy(); + } + vi.clearAllMocks(); + }); + + describe("onModuleInit", () => { + it("should initialize the client when enabled with valid config", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(mockStartFn).toHaveBeenCalledOnce(); + expect(service.isEnabled).toBe(true); + }); + + it("should not initialize client when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(mockStartFn).not.toHaveBeenCalled(); + expect(service.isEnabled).toBe(false); + }); + + it("should disable when server URL is missing", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.SERVER_URL]: "", + }); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(service.isEnabled).toBe(false); + }); + + it("should disable when API key is missing", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.API_KEY]: "", + }); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(service.isEnabled).toBe(false); + }); + + it("should disable when instance ID is missing", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.INSTANCE_ID]: "", + }); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(service.isEnabled).toBe(false); + }); + + it("should log dry-run mode when configured", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true", + }); + service = new MosaicTelemetryService(configService); + + service.onModuleInit(); + + expect(mockStartFn).toHaveBeenCalledOnce(); + }); + }); + + describe("onModuleDestroy", () => { + it("should stop the client on shutdown", async () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + await service.onModuleDestroy(); + + expect(mockStopFn).toHaveBeenCalledOnce(); + }); + + it("should not throw when client is not initialized (disabled)", async () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + await expect(service.onModuleDestroy()).resolves.not.toThrow(); + }); + + it("should not throw when called multiple times", async () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + await service.onModuleDestroy(); + await expect(service.onModuleDestroy()).resolves.not.toThrow(); + }); + }); + + describe("trackTaskCompletion", () => { + it("should queue event via client.track() when enabled", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const event = createTestEvent(); + service.trackTaskCompletion(event); + + expect(mockTrackFn).toHaveBeenCalledWith(event); + }); + + it("should be a no-op when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const event = createTestEvent(); + service.trackTaskCompletion(event); + + expect(mockTrackFn).not.toHaveBeenCalled(); + }); + }); + + describe("getPrediction", () => { + const testQuery: PredictionQuery = { + task_type: TaskType.FEATURE, + model: "claude-sonnet-4-20250514", + provider: Provider.ANTHROPIC, + complexity: Complexity.MEDIUM, + }; + + it("should return cached prediction when available", () => { + const mockPrediction: PredictionResponse = { + prediction: { + input_tokens: { p10: 100, p25: 200, median: 300, p75: 400, p90: 500 }, + output_tokens: { p10: 50, p25: 100, median: 150, p75: 200, p90: 250 }, + cost_usd_micros: { median: 5000 }, + duration_ms: { median: 10000 }, + correction_factors: { input: 1.0, output: 1.0 }, + quality: { gate_pass_rate: 0.95, success_rate: 0.90 }, + }, + metadata: { + sample_size: 100, + fallback_level: 0, + confidence: "high", + last_updated: new Date().toISOString(), + cache_hit: true, + }, + }; + + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + mockGetPredictionFn.mockReturnValueOnce(mockPrediction); + + const result = service.getPrediction(testQuery); + + expect(result).toEqual(mockPrediction); + expect(mockGetPredictionFn).toHaveBeenCalledWith(testQuery); + }); + + it("should return null when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const result = service.getPrediction(testQuery); + + expect(result).toBeNull(); + }); + + it("should return null when no cached prediction exists", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + mockGetPredictionFn.mockReturnValueOnce(null); + + const result = service.getPrediction(testQuery); + + expect(result).toBeNull(); + }); + }); + + describe("refreshPredictions", () => { + const testQueries: PredictionQuery[] = [ + { + task_type: TaskType.FEATURE, + model: "claude-sonnet-4-20250514", + provider: Provider.ANTHROPIC, + complexity: Complexity.MEDIUM, + }, + ]; + + it("should call client.refreshPredictions when enabled", async () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + await service.refreshPredictions(testQueries); + + expect(mockRefreshPredictionsFn).toHaveBeenCalledWith(testQueries); + }); + + it("should be a no-op when disabled", async () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + await service.refreshPredictions(testQueries); + + expect(mockRefreshPredictionsFn).not.toHaveBeenCalled(); + }); + }); + + describe("eventBuilder", () => { + it("should return EventBuilder when enabled", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const builder = service.eventBuilder; + + expect(builder).toBeDefined(); + expect(builder).not.toBeNull(); + expect(typeof builder?.build).toBe("function"); + }); + + it("should return null when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const builder = service.eventBuilder; + + expect(builder).toBeNull(); + }); + }); + + describe("isEnabled", () => { + it("should return true when client is running", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + expect(service.isEnabled).toBe(true); + }); + + it("should return false when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + expect(service.isEnabled).toBe(false); + }); + }); + + describe("queueSize", () => { + it("should return 0 when disabled", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + expect(service.queueSize).toBe(0); + }); + + it("should delegate to client.queueSize when enabled", () => { + const configService = createConfigService(ENABLED_CONFIG); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + expect(service.queueSize).toBe(0); + }); + }); + + describe("disabled mode (comprehensive)", () => { + beforeEach(() => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.ENABLED]: "false", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + }); + + it("should not make any HTTP calls when disabled", () => { + const event = createTestEvent(); + service.trackTaskCompletion(event); + + expect(mockTrackFn).not.toHaveBeenCalled(); + expect(mockStartFn).not.toHaveBeenCalled(); + }); + + it("should safely handle all method calls when disabled", async () => { + expect(() => service.trackTaskCompletion(createTestEvent())).not.toThrow(); + expect( + service.getPrediction({ + task_type: TaskType.FEATURE, + model: "test", + provider: Provider.ANTHROPIC, + complexity: Complexity.LOW, + }), + ).toBeNull(); + await expect(service.refreshPredictions([])).resolves.not.toThrow(); + expect(service.eventBuilder).toBeNull(); + expect(service.isEnabled).toBe(false); + expect(service.queueSize).toBe(0); + }); + }); + + describe("dry-run mode", () => { + it("should create client in dry-run mode", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + expect(mockStartFn).toHaveBeenCalledOnce(); + expect(service.isEnabled).toBe(true); + }); + + it("should accept events in dry-run mode", () => { + const configService = createConfigService({ + ...ENABLED_CONFIG, + [MOSAIC_TELEMETRY_ENV.DRY_RUN]: "true", + }); + service = new MosaicTelemetryService(configService); + service.onModuleInit(); + + const event = createTestEvent(); + service.trackTaskCompletion(event); + + expect(mockTrackFn).toHaveBeenCalledWith(event); + }); + }); +}); diff --git a/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.ts b/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.ts new file mode 100644 index 0000000..a1a737f --- /dev/null +++ b/apps/api/src/mosaic-telemetry/mosaic-telemetry.service.ts @@ -0,0 +1,164 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { + TelemetryClient, + type TaskCompletionEvent, + type PredictionQuery, + type PredictionResponse, + type EventBuilder, +} from "@mosaicstack/telemetry-client"; +import { + loadMosaicTelemetryConfig, + toSdkConfig, + type MosaicTelemetryModuleConfig, +} from "./mosaic-telemetry.config"; + +/** + * NestJS service wrapping the @mosaicstack/telemetry-client SDK. + * + * Provides convenience methods for tracking task completions and reading + * crowd-sourced predictions. When telemetry is disabled via + * MOSAIC_TELEMETRY_ENABLED=false, all methods are safe no-ops. + * + * This service is provided globally by MosaicTelemetryModule — any service + * can inject it without importing the module explicitly. + * + * @example + * ```typescript + * @Injectable() + * export class TasksService { + * constructor(private readonly telemetry: MosaicTelemetryService) {} + * + * async completeTask(taskId: string): Promise { + * // ... complete the task ... + * const event = this.telemetry.eventBuilder.build({ ... }); + * this.telemetry.trackTaskCompletion(event); + * } + * } + * ``` + */ +@Injectable() +export class MosaicTelemetryService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(MosaicTelemetryService.name); + private client: TelemetryClient | null = null; + private config: MosaicTelemetryModuleConfig | null = null; + + constructor(private readonly configService: ConfigService) {} + + /** + * Initialize the telemetry client on module startup. + * Reads configuration from environment variables and starts background submission. + */ + onModuleInit(): void { + this.config = loadMosaicTelemetryConfig(this.configService); + + if (!this.config.enabled) { + this.logger.log("Mosaic Telemetry is disabled"); + return; + } + + if (!this.config.serverUrl || !this.config.apiKey || !this.config.instanceId) { + this.logger.warn( + "Mosaic Telemetry is enabled but missing configuration " + + "(MOSAIC_TELEMETRY_SERVER_URL, MOSAIC_TELEMETRY_API_KEY, or MOSAIC_TELEMETRY_INSTANCE_ID). " + + "Telemetry will remain disabled." + ); + this.config = { ...this.config, enabled: false }; + return; + } + + const sdkConfig = toSdkConfig(this.config, (error: Error) => { + this.logger.error(`Telemetry client error: ${error.message}`, error.stack); + }); + + this.client = new TelemetryClient(sdkConfig); + this.client.start(); + + const mode = this.config.dryRun ? "dry-run" : "live"; + this.logger.log(`Mosaic Telemetry client started (${mode}) -> ${this.config.serverUrl}`); + } + + /** + * Stop the telemetry client on module shutdown. + * Flushes any remaining queued events before stopping. + */ + async onModuleDestroy(): Promise { + if (this.client) { + this.logger.log("Stopping Mosaic Telemetry client..."); + await this.client.stop(); + this.client = null; + this.logger.log("Mosaic Telemetry client stopped"); + } + } + + /** + * Queue a task completion event for batch submission. + * No-op when telemetry is disabled. + * + * @param event - The task completion event to track + */ + trackTaskCompletion(event: TaskCompletionEvent): void { + if (!this.client) { + return; + } + this.client.track(event); + } + + /** + * Get a cached prediction for the given query. + * Returns null when telemetry is disabled or if not cached/expired. + * + * @param query - The prediction query parameters + * @returns Cached prediction response, or null + */ + getPrediction(query: PredictionQuery): PredictionResponse | null { + if (!this.client) { + return null; + } + return this.client.getPrediction(query); + } + + /** + * Force-refresh predictions from the telemetry server. + * No-op when telemetry is disabled. + * + * @param queries - Array of prediction queries to refresh + */ + async refreshPredictions(queries: PredictionQuery[]): Promise { + if (!this.client) { + return; + } + await this.client.refreshPredictions(queries); + } + + /** + * Get the EventBuilder for constructing TaskCompletionEvent objects. + * Returns null when telemetry is disabled. + * + * @returns EventBuilder instance, or null if disabled + */ + get eventBuilder(): EventBuilder | null { + if (!this.client) { + return null; + } + return this.client.eventBuilder; + } + + /** + * Whether the telemetry client is currently active and running. + */ + get isEnabled(): boolean { + return this.client?.isRunning ?? false; + } + + /** + * Number of events currently queued for submission. + * Returns 0 when telemetry is disabled. + */ + get queueSize(): number { + if (!this.client) { + return 0; + } + return this.client.queueSize; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4f24087..8450600 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -66,6 +66,9 @@ importers: '@mosaic/shared': specifier: workspace:* version: link:../../packages/shared + '@mosaicstack/telemetry-client': + specifier: ^0.1.0 + version: 0.1.0 '@nestjs/axios': specifier: ^4.0.1 version: 4.0.1(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(axios@1.13.5)(rxjs@7.8.2) @@ -1500,6 +1503,9 @@ packages: '@mermaid-js/parser@0.6.3': resolution: {integrity: sha512-lnjOhe7zyHjc+If7yT4zoedx2vo4sHaTmtkl1+or8BRTnCtDmcTpAjpzDSfCZrshM5bCoz0GyidzadJAH1xobA==} + '@mosaicstack/telemetry-client@0.1.0': + resolution: {integrity: sha512-j78u9QDIAhTPzGfi9hfiM1wIhw9DUsjshGQ8PrXBdMcp22hfocdmjys4VwwvGiHYTn45um9rY04H4W1NdCaMiA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.0/telemetry-client-0.1.0.tgz} + '@mrleebo/prisma-ast@0.13.1': resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==} engines: {node: '>=16'} @@ -7732,6 +7738,8 @@ snapshots: dependencies: langium: 3.3.1 + '@mosaicstack/telemetry-client@0.1.0': {} + '@mrleebo/prisma-ast@0.13.1': dependencies: chevrotain: 10.5.0