All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
161 lines
4.6 KiB
TypeScript
161 lines
4.6 KiB
TypeScript
import { TelemetryConfig, ResolvedConfig, resolveConfig } from "./config.js";
|
|
import { EventQueue } from "./queue.js";
|
|
import { BatchSubmitter } from "./submitter.js";
|
|
import { PredictionCache } from "./prediction-cache.js";
|
|
import { EventBuilder } from "./event-builder.js";
|
|
import { TaskCompletionEvent } from "./types/events.js";
|
|
import { PredictionQuery, PredictionResponse } from "./types/predictions.js";
|
|
import { BatchPredictionResponse } from "./types/common.js";
|
|
|
|
/**
|
|
* Main telemetry client. Queues task-completion events for background
|
|
* batch submission and provides access to crowd-sourced predictions.
|
|
*/
|
|
export class TelemetryClient {
|
|
private readonly config: ResolvedConfig;
|
|
private readonly queue: EventQueue;
|
|
private readonly submitter: BatchSubmitter;
|
|
private readonly predictionCache: PredictionCache;
|
|
private readonly _eventBuilder: EventBuilder;
|
|
private intervalId: ReturnType<typeof setInterval> | null = null;
|
|
private _isRunning = false;
|
|
|
|
constructor(config: TelemetryConfig) {
|
|
this.config = resolveConfig(config);
|
|
this.queue = new EventQueue(this.config.maxQueueSize);
|
|
this.submitter = new BatchSubmitter(this.config);
|
|
this.predictionCache = new PredictionCache(
|
|
this.config.predictionCacheTtlMs,
|
|
);
|
|
this._eventBuilder = new EventBuilder(this.config);
|
|
}
|
|
|
|
/** Get the event builder for constructing events. */
|
|
get eventBuilder(): EventBuilder {
|
|
return this._eventBuilder;
|
|
}
|
|
|
|
/** Start background submission via setInterval. Idempotent. */
|
|
start(): void {
|
|
if (this._isRunning) {
|
|
return;
|
|
}
|
|
this._isRunning = true;
|
|
this.intervalId = setInterval(() => {
|
|
void this.flush();
|
|
}, this.config.submitIntervalMs);
|
|
}
|
|
|
|
/** Stop background submission, flush remaining events. */
|
|
async stop(): Promise<void> {
|
|
if (!this._isRunning) {
|
|
return;
|
|
}
|
|
this._isRunning = false;
|
|
if (this.intervalId !== null) {
|
|
clearInterval(this.intervalId);
|
|
this.intervalId = null;
|
|
}
|
|
await this.flush();
|
|
}
|
|
|
|
/** Queue an event for batch submission. Never throws. */
|
|
track(event: TaskCompletionEvent): void {
|
|
try {
|
|
if (!this.config.enabled) {
|
|
return;
|
|
}
|
|
this.queue.enqueue(event);
|
|
} catch (error) {
|
|
this.handleError(error);
|
|
}
|
|
}
|
|
|
|
/** Get a cached prediction. Returns null if not cached/expired. */
|
|
getPrediction(query: PredictionQuery): PredictionResponse | null {
|
|
return this.predictionCache.get(query);
|
|
}
|
|
|
|
/** Force-refresh predictions from server. */
|
|
async refreshPredictions(queries: PredictionQuery[]): Promise<void> {
|
|
try {
|
|
const url = `${this.config.serverUrl}/v1/predictions/batch`;
|
|
const controller = new AbortController();
|
|
const timeout = setTimeout(
|
|
() => controller.abort(),
|
|
this.config.requestTimeoutMs,
|
|
);
|
|
|
|
try {
|
|
const response = await fetch(url, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
},
|
|
body: JSON.stringify({ queries }),
|
|
signal: controller.signal,
|
|
});
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
|
|
}
|
|
|
|
const body = (await response.json()) as BatchPredictionResponse;
|
|
|
|
for (let i = 0; i < queries.length; i++) {
|
|
if (body.results[i]) {
|
|
this.predictionCache.set(queries[i], body.results[i]);
|
|
}
|
|
}
|
|
} finally {
|
|
clearTimeout(timeout);
|
|
}
|
|
} catch (error) {
|
|
this.handleError(error);
|
|
}
|
|
}
|
|
|
|
/** Number of events currently queued. */
|
|
get queueSize(): number {
|
|
return this.queue.size;
|
|
}
|
|
|
|
/** Whether the client is currently running. */
|
|
get isRunning(): boolean {
|
|
return this._isRunning;
|
|
}
|
|
|
|
/** Flush the queue by draining and submitting batches. */
|
|
private async flush(): Promise<void> {
|
|
while (!this.queue.isEmpty) {
|
|
const batch = this.queue.drain(this.config.batchSize);
|
|
if (batch.length === 0) break;
|
|
|
|
try {
|
|
const result = await this.submitter.submit(batch);
|
|
if (!result.success) {
|
|
// Re-enqueue events that failed to submit
|
|
this.queue.prepend(batch);
|
|
if (result.error) {
|
|
this.handleError(result.error);
|
|
}
|
|
break; // Stop flushing on failure to avoid loops
|
|
}
|
|
} catch (error) {
|
|
this.queue.prepend(batch);
|
|
this.handleError(error);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
private handleError(error: unknown): void {
|
|
const err = error instanceof Error ? error : new Error(String(error));
|
|
try {
|
|
this.config.onError(err);
|
|
} catch {
|
|
// Prevent error handler from throwing
|
|
}
|
|
}
|
|
}
|