import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; import { randomUUID } from "crypto"; import { ValkeyService } from "../../valkey/valkey.service"; import type { EventHandler, OrchestratorEvent } from "../../valkey/types"; type UnsubscribeFn = () => void; const MAX_RECENT_EVENTS = 500; @Injectable() export class AgentEventsService implements OnModuleInit { private readonly logger = new Logger(AgentEventsService.name); private readonly subscribers = new Map(); private readonly recentEvents: OrchestratorEvent[] = []; private connected = false; constructor(private readonly valkeyService: ValkeyService) {} async onModuleInit(): Promise { if (this.connected) return; await this.valkeyService.subscribeToEvents( (event) => { this.appendRecentEvent(event); this.subscribers.forEach((handler) => { void handler(event); }); }, (error, _raw, channel) => { this.logger.warn(`Event stream parse/validation warning on ${channel}: ${error.message}`); } ); this.connected = true; this.logger.log("Agent event stream subscription active"); } subscribe(handler: EventHandler): UnsubscribeFn { const id = randomUUID(); this.subscribers.set(id, handler); return () => { this.subscribers.delete(id); }; } async getInitialSnapshot(): Promise<{ type: "stream.snapshot"; timestamp: string; agents: number; tasks: number; }> { const [agents, tasks] = await Promise.all([ this.valkeyService.listAgents(), this.valkeyService.listTasks(), ]); return { type: "stream.snapshot", timestamp: new Date().toISOString(), agents: agents.length, tasks: tasks.length, }; } createHeartbeat(): OrchestratorEvent { return { type: "task.processing", timestamp: new Date().toISOString(), data: { heartbeat: true, }, }; } getRecentEvents(limit = 100): OrchestratorEvent[] { const safeLimit = Math.min(Math.max(Math.floor(limit), 1), MAX_RECENT_EVENTS); if (safeLimit >= this.recentEvents.length) { return [...this.recentEvents]; } return this.recentEvents.slice(-safeLimit); } private appendRecentEvent(event: OrchestratorEvent): void { this.recentEvents.push(event); if (this.recentEvents.length > MAX_RECENT_EVENTS) { this.recentEvents.shift(); } } }