Standalone npm package (@mosaicstack/telemetry-client) for reporting task-completion telemetry and querying predictions from the Mosaic Stack Telemetry server. - TelemetryClient with setInterval-based background flush - EventQueue (bounded FIFO array) - BatchSubmitter with native fetch, exponential backoff, Retry-After - PredictionCache (Map + TTL) - EventBuilder with auto-generated event_id/timestamp - Zero runtime dependencies (Node 18+ native APIs) - 43 tests, 86% branch coverage Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
319 lines
9.2 KiB
TypeScript
319 lines
9.2 KiB
TypeScript
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
|
import { TelemetryClient } from '../src/client.js';
|
|
import { TelemetryConfig } from '../src/config.js';
|
|
import {
|
|
TaskCompletionEvent,
|
|
TaskType,
|
|
Complexity,
|
|
Harness,
|
|
Provider,
|
|
Outcome,
|
|
} from '../src/types/events.js';
|
|
import { PredictionQuery, PredictionResponse } from '../src/types/predictions.js';
|
|
|
|
function makeConfig(overrides: Partial<TelemetryConfig> = {}): TelemetryConfig {
|
|
return {
|
|
serverUrl: 'https://tel.example.com',
|
|
apiKey: 'a'.repeat(64),
|
|
instanceId: 'test-instance',
|
|
submitIntervalMs: 60_000,
|
|
maxQueueSize: 100,
|
|
batchSize: 10,
|
|
requestTimeoutMs: 5000,
|
|
dryRun: true, // Use dryRun by default in tests
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function makeEvent(id = 'evt-1'): TaskCompletionEvent {
|
|
return {
|
|
instance_id: 'test-instance',
|
|
event_id: id,
|
|
schema_version: '1.0',
|
|
timestamp: new Date().toISOString(),
|
|
task_duration_ms: 5000,
|
|
task_type: TaskType.IMPLEMENTATION,
|
|
complexity: Complexity.MEDIUM,
|
|
harness: Harness.CLAUDE_CODE,
|
|
model: 'claude-3-opus',
|
|
provider: Provider.ANTHROPIC,
|
|
estimated_input_tokens: 1000,
|
|
estimated_output_tokens: 500,
|
|
actual_input_tokens: 1100,
|
|
actual_output_tokens: 550,
|
|
estimated_cost_usd_micros: 50000,
|
|
actual_cost_usd_micros: 55000,
|
|
quality_gate_passed: true,
|
|
quality_gates_run: [],
|
|
quality_gates_failed: [],
|
|
context_compactions: 0,
|
|
context_rotations: 0,
|
|
context_utilization_final: 0.5,
|
|
outcome: Outcome.SUCCESS,
|
|
retry_count: 0,
|
|
};
|
|
}
|
|
|
|
function makeQuery(): PredictionQuery {
|
|
return {
|
|
task_type: TaskType.IMPLEMENTATION,
|
|
model: 'claude-3-opus',
|
|
provider: Provider.ANTHROPIC,
|
|
complexity: Complexity.MEDIUM,
|
|
};
|
|
}
|
|
|
|
function makePredictionResponse(): PredictionResponse {
|
|
return {
|
|
prediction: {
|
|
input_tokens: { p10: 500, p25: 750, median: 1000, p75: 1500, p90: 2000 },
|
|
output_tokens: { p10: 200, p25: 350, median: 500, p75: 750, p90: 1000 },
|
|
cost_usd_micros: { median: 50000 },
|
|
duration_ms: { median: 30000 },
|
|
correction_factors: { input: 1.1, output: 1.05 },
|
|
quality: { gate_pass_rate: 0.85, success_rate: 0.9 },
|
|
},
|
|
metadata: {
|
|
sample_size: 100,
|
|
fallback_level: 0,
|
|
confidence: 'high',
|
|
last_updated: new Date().toISOString(),
|
|
cache_hit: false,
|
|
},
|
|
};
|
|
}
|
|
|
|
describe('TelemetryClient', () => {
|
|
let fetchSpy: ReturnType<typeof vi.fn>;
|
|
|
|
beforeEach(() => {
|
|
vi.useFakeTimers();
|
|
fetchSpy = vi.fn();
|
|
vi.stubGlobal('fetch', fetchSpy);
|
|
});
|
|
|
|
afterEach(() => {
|
|
vi.useRealTimers();
|
|
vi.unstubAllGlobals();
|
|
});
|
|
|
|
describe('start/stop lifecycle', () => {
|
|
it('should start and stop cleanly', async () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
|
|
expect(client.isRunning).toBe(false);
|
|
client.start();
|
|
expect(client.isRunning).toBe(true);
|
|
|
|
await client.stop();
|
|
expect(client.isRunning).toBe(false);
|
|
});
|
|
|
|
it('should be idempotent on start', () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
client.start();
|
|
client.start(); // Should not throw or create double intervals
|
|
expect(client.isRunning).toBe(true);
|
|
});
|
|
|
|
it('should be idempotent on stop', async () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
await client.stop();
|
|
await client.stop(); // Should not throw
|
|
expect(client.isRunning).toBe(false);
|
|
});
|
|
|
|
it('should flush events on stop', async () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
client.start();
|
|
|
|
client.track(makeEvent('e1'));
|
|
client.track(makeEvent('e2'));
|
|
expect(client.queueSize).toBe(2);
|
|
|
|
await client.stop();
|
|
// In dryRun mode, flush succeeds and queue should be empty
|
|
expect(client.queueSize).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe('track()', () => {
|
|
it('should queue events', () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
client.track(makeEvent('e1'));
|
|
client.track(makeEvent('e2'));
|
|
expect(client.queueSize).toBe(2);
|
|
});
|
|
|
|
it('should silently drop events when disabled', () => {
|
|
const client = new TelemetryClient(makeConfig({ enabled: false }));
|
|
client.track(makeEvent());
|
|
expect(client.queueSize).toBe(0);
|
|
});
|
|
|
|
it('should never throw even on internal error', () => {
|
|
const errorFn = vi.fn();
|
|
const client = new TelemetryClient(
|
|
makeConfig({ onError: errorFn, maxQueueSize: 0 }),
|
|
);
|
|
|
|
// This should not throw. maxQueueSize of 0 could cause issues
|
|
// but track() is designed to catch everything.
|
|
expect(() => client.track(makeEvent())).not.toThrow();
|
|
});
|
|
});
|
|
|
|
describe('predictions', () => {
|
|
it('should return null for uncached prediction', () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
const result = client.getPrediction(makeQuery());
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('should return cached prediction after refresh', async () => {
|
|
const predictionResponse = makePredictionResponse();
|
|
fetchSpy.mockResolvedValueOnce({
|
|
ok: true,
|
|
status: 200,
|
|
json: () =>
|
|
Promise.resolve({
|
|
results: [predictionResponse],
|
|
}),
|
|
});
|
|
|
|
const client = new TelemetryClient(makeConfig({ dryRun: false }));
|
|
const query = makeQuery();
|
|
|
|
await client.refreshPredictions([query]);
|
|
|
|
const result = client.getPrediction(query);
|
|
expect(result).toEqual(predictionResponse);
|
|
});
|
|
|
|
it('should handle refresh error gracefully', async () => {
|
|
fetchSpy.mockRejectedValueOnce(new Error('Network error'));
|
|
|
|
const errorFn = vi.fn();
|
|
const client = new TelemetryClient(
|
|
makeConfig({ dryRun: false, onError: errorFn }),
|
|
);
|
|
|
|
// Should not throw
|
|
await client.refreshPredictions([makeQuery()]);
|
|
expect(errorFn).toHaveBeenCalledWith(expect.any(Error));
|
|
});
|
|
|
|
it('should handle non-ok HTTP response on refresh', async () => {
|
|
fetchSpy.mockResolvedValueOnce({
|
|
ok: false,
|
|
status: 500,
|
|
statusText: 'Internal Server Error',
|
|
});
|
|
|
|
const errorFn = vi.fn();
|
|
const client = new TelemetryClient(
|
|
makeConfig({ dryRun: false, onError: errorFn }),
|
|
);
|
|
|
|
await client.refreshPredictions([makeQuery()]);
|
|
expect(errorFn).toHaveBeenCalledWith(expect.any(Error));
|
|
});
|
|
});
|
|
|
|
describe('background flush', () => {
|
|
it('should trigger flush on interval', async () => {
|
|
const client = new TelemetryClient(
|
|
makeConfig({ submitIntervalMs: 10_000 }),
|
|
);
|
|
client.start();
|
|
|
|
client.track(makeEvent('e1'));
|
|
expect(client.queueSize).toBe(1);
|
|
|
|
// Advance past submit interval
|
|
await vi.advanceTimersByTimeAsync(11_000);
|
|
|
|
// In dryRun mode, events should be flushed
|
|
expect(client.queueSize).toBe(0);
|
|
|
|
await client.stop();
|
|
});
|
|
});
|
|
|
|
describe('flush error handling', () => {
|
|
it('should re-enqueue events on submit failure', async () => {
|
|
// Use non-dryRun mode to actually hit the submitter
|
|
fetchSpy.mockResolvedValueOnce({
|
|
ok: false,
|
|
status: 500,
|
|
statusText: 'Internal Server Error',
|
|
});
|
|
|
|
const errorFn = vi.fn();
|
|
const client = new TelemetryClient(
|
|
makeConfig({ dryRun: false, maxRetries: 0, onError: errorFn }),
|
|
);
|
|
|
|
client.track(makeEvent('e1'));
|
|
expect(client.queueSize).toBe(1);
|
|
|
|
// Start and trigger flush
|
|
client.start();
|
|
await vi.advanceTimersByTimeAsync(70_000);
|
|
|
|
// Events should be re-enqueued after failure
|
|
expect(client.queueSize).toBeGreaterThan(0);
|
|
|
|
await client.stop();
|
|
});
|
|
|
|
it('should handle onError callback that throws', async () => {
|
|
const throwingErrorFn = () => {
|
|
throw new Error('Error handler broke');
|
|
};
|
|
const client = new TelemetryClient(
|
|
makeConfig({ onError: throwingErrorFn, enabled: false }),
|
|
);
|
|
|
|
// This should not throw even though onError throws
|
|
// Force an error path by calling track when disabled (no error),
|
|
// but we can test via refreshPredictions
|
|
fetchSpy.mockRejectedValueOnce(new Error('fail'));
|
|
await expect(client.refreshPredictions([makeQuery()])).resolves.not.toThrow();
|
|
});
|
|
});
|
|
|
|
describe('event builder', () => {
|
|
it('should expose an event builder', () => {
|
|
const client = new TelemetryClient(makeConfig());
|
|
expect(client.eventBuilder).toBeDefined();
|
|
|
|
const event = client.eventBuilder.build({
|
|
task_duration_ms: 1000,
|
|
task_type: TaskType.TESTING,
|
|
complexity: Complexity.LOW,
|
|
harness: Harness.AIDER,
|
|
model: 'gpt-4',
|
|
provider: Provider.OPENAI,
|
|
estimated_input_tokens: 100,
|
|
estimated_output_tokens: 50,
|
|
actual_input_tokens: 100,
|
|
actual_output_tokens: 50,
|
|
estimated_cost_usd_micros: 1000,
|
|
actual_cost_usd_micros: 1000,
|
|
quality_gate_passed: true,
|
|
quality_gates_run: [],
|
|
quality_gates_failed: [],
|
|
context_compactions: 0,
|
|
context_rotations: 0,
|
|
context_utilization_final: 0.3,
|
|
outcome: Outcome.SUCCESS,
|
|
retry_count: 0,
|
|
});
|
|
|
|
expect(event.instance_id).toBe('test-instance');
|
|
expect(event.schema_version).toBe('1.0');
|
|
});
|
|
});
|
|
});
|