From 1d4916fe973140a0bfc1637b10a0c99d01ed14f3 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 13 Mar 2026 08:52:52 -0500 Subject: [PATCH] =?UTF-8?q?feat(P4-004):=20summarization=20pipeline=20?= =?UTF-8?q?=E2=80=94=20LLM=20+=20cron=20scheduling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SummarizationService that reads hot agent logs (>24h), groups by session, calls a cheap LLM (gpt-4o-mini default, configurable via SUMMARIZATION_MODEL) to extract key insights, stores them with embeddings in the insights table, and transitions processed logs to warm tier. Add CronService with node-cron for scheduled execution (summarization every 6h, tier management daily at 3am). Tier management promotes warm→cold (30d) and purges cold logs (90d). Co-Authored-By: Claude Opus 4.6 --- apps/gateway/package.json | 2 + apps/gateway/src/log/cron.service.ts | 44 +++++ apps/gateway/src/log/log.module.ts | 6 +- apps/gateway/src/log/summarization.service.ts | 178 ++++++++++++++++++ docs/TASKS.md | 2 +- pnpm-lock.yaml | 17 ++ 6 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 apps/gateway/src/log/cron.service.ts create mode 100644 apps/gateway/src/log/summarization.service.ts diff --git a/apps/gateway/package.json b/apps/gateway/package.json index 9e64384..6602939 100644 --- a/apps/gateway/package.json +++ b/apps/gateway/package.json @@ -36,6 +36,7 @@ "@sinclair/typebox": "^0.34.48", "better-auth": "^1.5.5", "fastify": "^5.0.0", + "node-cron": "^4.2.1", "reflect-metadata": "^0.2.0", "rxjs": "^7.8.0", "socket.io": "^4.8.0", @@ -43,6 +44,7 @@ }, "devDependencies": { "@types/node": "^22.0.0", + "@types/node-cron": "^3.0.11", "@types/uuid": "^10.0.0", "tsx": "^4.0.0", "typescript": "^5.8.0", diff --git a/apps/gateway/src/log/cron.service.ts b/apps/gateway/src/log/cron.service.ts new file mode 100644 index 0000000..536e396 --- /dev/null +++ b/apps/gateway/src/log/cron.service.ts @@ -0,0 +1,44 @@ +import { Injectable, Logger, type OnModuleInit, type OnModuleDestroy } from '@nestjs/common'; +import cron from 'node-cron'; +import { SummarizationService } from './summarization.service.js'; + +@Injectable() +export class CronService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(CronService.name); + private readonly tasks: cron.ScheduledTask[] = []; + + constructor(private readonly summarization: SummarizationService) {} + + onModuleInit(): void { + const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours + const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am + + this.tasks.push( + cron.schedule(summarizationSchedule, () => { + this.summarization.runSummarization().catch((err) => { + this.logger.error(`Scheduled summarization failed: ${err}`); + }); + }), + ); + + this.tasks.push( + cron.schedule(tierManagementSchedule, () => { + this.summarization.runTierManagement().catch((err) => { + this.logger.error(`Scheduled tier management failed: ${err}`); + }); + }), + ); + + this.logger.log( + `Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}"`, + ); + } + + onModuleDestroy(): void { + for (const task of this.tasks) { + task.stop(); + } + this.tasks.length = 0; + this.logger.log('Cron tasks stopped'); + } +} diff --git a/apps/gateway/src/log/log.module.ts b/apps/gateway/src/log/log.module.ts index 5963030..063f12a 100644 --- a/apps/gateway/src/log/log.module.ts +++ b/apps/gateway/src/log/log.module.ts @@ -4,6 +4,8 @@ import type { Db } from '@mosaic/db'; import { DB } from '../database/database.module.js'; import { LOG_SERVICE } from './log.tokens.js'; import { LogController } from './log.controller.js'; +import { SummarizationService } from './summarization.service.js'; +import { CronService } from './cron.service.js'; @Global() @Module({ @@ -13,8 +15,10 @@ import { LogController } from './log.controller.js'; useFactory: (db: Db): LogService => createLogService(db), inject: [DB], }, + SummarizationService, + CronService, ], controllers: [LogController], - exports: [LOG_SERVICE], + exports: [LOG_SERVICE, SummarizationService], }) export class LogModule {} diff --git a/apps/gateway/src/log/summarization.service.ts b/apps/gateway/src/log/summarization.service.ts new file mode 100644 index 0000000..cd511c9 --- /dev/null +++ b/apps/gateway/src/log/summarization.service.ts @@ -0,0 +1,178 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import type { LogService } from '@mosaic/log'; +import type { Memory } from '@mosaic/memory'; +import { LOG_SERVICE } from './log.tokens.js'; +import { MEMORY } from '../memory/memory.tokens.js'; +import { EmbeddingService } from '../memory/embedding.service.js'; +import type { Db } from '@mosaic/db'; +import { sql, summarizationJobs } from '@mosaic/db'; +import { DB } from '../database/database.module.js'; + +const SUMMARIZATION_PROMPT = `You are a knowledge extraction assistant. Given the following agent interaction logs, extract the key decisions, learnings, and patterns. Output a concise summary (2-4 sentences) that captures the most important information for future reference. Focus on actionable insights, not raw events. + +Logs: +{logs} + +Summary:`; + +interface ChatCompletion { + choices: Array<{ message: { content: string } }>; +} + +@Injectable() +export class SummarizationService { + private readonly logger = new Logger(SummarizationService.name); + private readonly apiKey: string | undefined; + private readonly baseUrl: string; + private readonly model: string; + + constructor( + @Inject(LOG_SERVICE) private readonly logService: LogService, + @Inject(MEMORY) private readonly memory: Memory, + private readonly embeddings: EmbeddingService, + @Inject(DB) private readonly db: Db, + ) { + this.apiKey = process.env['OPENAI_API_KEY']; + this.baseUrl = process.env['SUMMARIZATION_API_URL'] ?? 'https://api.openai.com/v1'; + this.model = process.env['SUMMARIZATION_MODEL'] ?? 'gpt-4o-mini'; + } + + /** + * Run one summarization cycle: + * 1. Find hot logs older than 24h with decision/learning/tool_use categories + * 2. Group by session + * 3. Summarize each group via cheap LLM + * 4. Store as insights with embeddings + * 5. Transition processed logs to warm tier + */ + async runSummarization(): Promise<{ logsProcessed: number; insightsCreated: number }> { + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24h ago + + // Create job record + const [job] = await this.db + .insert(summarizationJobs) + .values({ status: 'running', startedAt: new Date() }) + .returning(); + + try { + const logs = await this.logService.logs.getLogsForSummarization(cutoff, 200); + if (logs.length === 0) { + await this.db + .update(summarizationJobs) + .set({ status: 'completed', completedAt: new Date() }) + .where(sql`id = ${job!.id}`); + return { logsProcessed: 0, insightsCreated: 0 }; + } + + // Group logs by session + const bySession = new Map(); + for (const log of logs) { + const group = bySession.get(log.sessionId) ?? []; + group.push(log); + bySession.set(log.sessionId, group); + } + + let insightsCreated = 0; + + for (const [sessionId, sessionLogs] of bySession) { + const userId = sessionLogs[0]?.userId; + if (!userId) continue; + + const logsText = sessionLogs.map((l) => `[${l.category}] ${l.content}`).join('\n'); + + const summary = await this.summarize(logsText); + if (!summary) continue; + + const embedding = this.embeddings.available + ? await this.embeddings.embed(summary) + : undefined; + + await this.memory.insights.create({ + userId, + content: summary, + embedding: embedding ?? null, + source: 'summarization', + category: 'learning', + metadata: { sessionId, logCount: sessionLogs.length }, + }); + insightsCreated++; + } + + // Transition processed logs to warm + await this.logService.logs.promoteToWarm(cutoff); + + await this.db + .update(summarizationJobs) + .set({ + status: 'completed', + logsProcessed: logs.length, + insightsCreated, + completedAt: new Date(), + }) + .where(sql`id = ${job!.id}`); + + this.logger.log(`Summarization complete: ${logs.length} logs → ${insightsCreated} insights`); + return { logsProcessed: logs.length, insightsCreated }; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await this.db + .update(summarizationJobs) + .set({ status: 'failed', errorMessage: message, completedAt: new Date() }) + .where(sql`id = ${job!.id}`); + this.logger.error(`Summarization failed: ${message}`); + throw error; + } + } + + /** + * Run tier management: + * - Warm logs older than 30 days → cold + * - Cold logs older than 90 days → purged + * - Decay old insight relevance scores + */ + async runTierManagement(): Promise { + const warmCutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + const coldCutoff = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000); + const decayCutoff = new Date(Date.now() - 14 * 24 * 60 * 60 * 1000); + + const promoted = await this.logService.logs.promoteToCold(warmCutoff); + const purged = await this.logService.logs.purge(coldCutoff); + const decayed = await this.memory.insights.decayOldInsights(decayCutoff); + + this.logger.log( + `Tier management: ${promoted} logs→cold, ${purged} purged, ${decayed} insights decayed`, + ); + } + + private async summarize(logsText: string): Promise { + if (!this.apiKey) { + this.logger.warn('No API key configured — skipping summarization'); + return null; + } + + const prompt = SUMMARIZATION_PROMPT.replace('{logs}', logsText); + + const response = await fetch(`${this.baseUrl}/chat/completions`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.apiKey}`, + }, + body: JSON.stringify({ + model: this.model, + messages: [{ role: 'user', content: prompt }], + max_tokens: 300, + temperature: 0.3, + }), + }); + + if (!response.ok) { + const body = await response.text(); + this.logger.error(`Summarization API error: ${response.status} ${body}`); + return null; + } + + const json = (await response.json()) as ChatCompletion; + return json.choices[0]?.message.content ?? null; + } +} diff --git a/docs/TASKS.md b/docs/TASKS.md index 563a562..89c9844 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -40,7 +40,7 @@ | P4-001 | in-progress | Phase 4 | @mosaic/memory — preference + insight stores | — | #34 | | P4-002 | in-progress | Phase 4 | Semantic search — pgvector embeddings + search API | — | #35 | | P4-003 | in-progress | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 | -| P4-004 | not-started | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 | +| P4-004 | in-progress | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 | | P4-005 | not-started | Phase 4 | Memory integration — inject into agent sessions | — | #38 | | P4-006 | not-started | Phase 4 | Skill management — catalog, install, config | — | #39 | | P4-007 | not-started | Phase 4 | Verify Phase 4 — memory + log pipeline working | — | #40 | diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4a21ec0..48827c7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -113,6 +113,9 @@ importers: fastify: specifier: ^5.0.0 version: 5.8.2 + node-cron: + specifier: ^4.2.1 + version: 4.2.1 reflect-metadata: specifier: ^0.2.0 version: 0.2.2 @@ -129,6 +132,9 @@ importers: '@types/node': specifier: ^22.0.0 version: 22.19.15 + '@types/node-cron': + specifier: ^3.0.11 + version: 3.0.11 '@types/uuid': specifier: ^10.0.0 version: 10.0.0 @@ -2747,6 +2753,9 @@ packages: '@types/mysql@2.15.27': resolution: {integrity: sha512-YfWiV16IY0OeBfBCk8+hXKmdTKrKlwKN1MNKAPBu5JYxLwBEZl7QzeEpGnlZb3VMGJrrGmB84gXiH+ofs/TezA==} + '@types/node-cron@3.0.11': + resolution: {integrity: sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==} + '@types/node@22.19.15': resolution: {integrity: sha512-F0R/h2+dsy5wJAUe3tAU6oqa2qbWY5TpNfL/RGmo1y38hiyO1w3x2jPtt76wmuaJI4DQnOBu21cNXQ2STIUUWg==} @@ -4174,6 +4183,10 @@ packages: sass: optional: true + node-cron@4.2.1: + resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==} + engines: {node: '>=6.0.0'} + node-domexception@1.0.0: resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} engines: {node: '>=10.5.0'} @@ -7548,6 +7561,8 @@ snapshots: dependencies: '@types/node': 22.19.15 + '@types/node-cron@3.0.11': {} + '@types/node@22.19.15': dependencies: undici-types: 6.21.0 @@ -9001,6 +9016,8 @@ snapshots: - '@babel/core' - babel-plugin-macros + node-cron@4.2.1: {} + node-domexception@1.0.0: {} node-fetch@3.3.2: