import { Inject, Injectable, Logger } from '@nestjs/common'; import type { LogService } from '@mosaicstack/log'; import type { Memory } from '@mosaicstack/memory'; import { LOG_SERVICE } from './log.tokens.js'; import { MEMORY } from '../memory/memory.tokens.js'; import { EmbeddingService } from '../memory/embedding.service.js'; import type { Db } from '@mosaicstack/db'; import { sql, summarizationJobs } from '@mosaicstack/db'; import { DB } from '../database/database.module.js'; const SUMMARIZATION_PROMPT = `You are a knowledge extraction assistant. Given the following agent interaction logs, extract the key decisions, learnings, and patterns. Output a concise summary (2-4 sentences) that captures the most important information for future reference. Focus on actionable insights, not raw events. Logs: {logs} Summary:`; interface ChatCompletion { choices: Array<{ message: { content: string } }>; } @Injectable() export class SummarizationService { private readonly logger = new Logger(SummarizationService.name); private readonly apiKey: string | undefined; private readonly baseUrl: string; private readonly model: string; constructor( @Inject(LOG_SERVICE) private readonly logService: LogService, @Inject(MEMORY) private readonly memory: Memory, @Inject(EmbeddingService) private readonly embeddings: EmbeddingService, @Inject(DB) private readonly db: Db, ) { this.apiKey = process.env['OPENAI_API_KEY']; this.baseUrl = process.env['SUMMARIZATION_API_URL'] ?? 'https://api.openai.com/v1'; this.model = process.env['SUMMARIZATION_MODEL'] ?? 'gpt-4o-mini'; } /** * Run one summarization cycle: * 1. Find hot logs older than 24h with decision/learning/tool_use categories * 2. Group by session * 3. Summarize each group via cheap LLM * 4. Store as insights with embeddings * 5. Transition processed logs to warm tier */ async runSummarization(): Promise<{ logsProcessed: number; insightsCreated: number }> { const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24h ago // Create job record const [job] = await this.db .insert(summarizationJobs) .values({ status: 'running', startedAt: new Date() }) .returning(); try { const logs = await this.logService.logs.getLogsForSummarization(cutoff, 200); if (logs.length === 0) { await this.db .update(summarizationJobs) .set({ status: 'completed', completedAt: new Date() }) .where(sql`id = ${job!.id}`); return { logsProcessed: 0, insightsCreated: 0 }; } // Group logs by session const bySession = new Map(); for (const log of logs) { const group = bySession.get(log.sessionId) ?? []; group.push(log); bySession.set(log.sessionId, group); } let insightsCreated = 0; for (const [sessionId, sessionLogs] of bySession) { const userId = sessionLogs[0]?.userId; if (!userId) continue; const logsText = sessionLogs.map((l) => `[${l.category}] ${l.content}`).join('\n'); const summary = await this.summarize(logsText); if (!summary) continue; const embedding = this.embeddings.available ? await this.embeddings.embed(summary) : undefined; await this.memory.insights.create({ userId, content: summary, embedding: embedding ?? null, source: 'summarization', category: 'learning', metadata: { sessionId, logCount: sessionLogs.length }, }); insightsCreated++; } // Transition processed logs to warm await this.logService.logs.promoteToWarm(cutoff); await this.db .update(summarizationJobs) .set({ status: 'completed', logsProcessed: logs.length, insightsCreated, completedAt: new Date(), }) .where(sql`id = ${job!.id}`); this.logger.log(`Summarization complete: ${logs.length} logs → ${insightsCreated} insights`); return { logsProcessed: logs.length, insightsCreated }; } catch (error) { const message = error instanceof Error ? error.message : String(error); await this.db .update(summarizationJobs) .set({ status: 'failed', errorMessage: message, completedAt: new Date() }) .where(sql`id = ${job!.id}`); this.logger.error(`Summarization failed: ${message}`); throw error; } } /** * Run tier management: * - Warm logs older than 30 days → cold * - Cold logs older than 90 days → purged * - Decay old insight relevance scores */ async runTierManagement(): Promise { const warmCutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const coldCutoff = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000); const decayCutoff = new Date(Date.now() - 14 * 24 * 60 * 60 * 1000); const promoted = await this.logService.logs.promoteToCold(warmCutoff); const purged = await this.logService.logs.purge(coldCutoff); const decayed = await this.memory.insights.decayAllInsights(decayCutoff); this.logger.log( `Tier management: ${promoted} logs→cold, ${purged} purged, ${decayed} insights decayed`, ); } private async summarize(logsText: string): Promise { if (!this.apiKey) { this.logger.warn('No API key configured — skipping summarization'); return null; } const prompt = SUMMARIZATION_PROMPT.replace('{logs}', logsText); const response = await fetch(`${this.baseUrl}/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${this.apiKey}`, }, body: JSON.stringify({ model: this.model, messages: [{ role: 'user', content: prompt }], max_tokens: 300, temperature: 0.3, }), }); if (!response.ok) { const body = await response.text(); this.logger.error(`Summarization API error: ${response.status} ${body}`); return null; } const json = (await response.json()) as ChatCompletion; return json.choices[0]?.message.content ?? null; } }