feat(P4-004): summarization pipeline — LLM + cron scheduling
Add SummarizationService that reads hot agent logs (>24h), groups by session, calls a cheap LLM (gpt-4o-mini default, configurable via SUMMARIZATION_MODEL) to extract key insights, stores them with embeddings in the insights table, and transitions processed logs to warm tier. Add CronService with node-cron for scheduled execution (summarization every 6h, tier management daily at 3am). Tier management promotes warm→cold (30d) and purges cold logs (90d). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -36,6 +36,7 @@
|
||||
"@sinclair/typebox": "^0.34.48",
|
||||
"better-auth": "^1.5.5",
|
||||
"fastify": "^5.0.0",
|
||||
"node-cron": "^4.2.1",
|
||||
"reflect-metadata": "^0.2.0",
|
||||
"rxjs": "^7.8.0",
|
||||
"socket.io": "^4.8.0",
|
||||
@@ -43,6 +44,7 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.0.0",
|
||||
"@types/node-cron": "^3.0.11",
|
||||
"@types/uuid": "^10.0.0",
|
||||
"tsx": "^4.0.0",
|
||||
"typescript": "^5.8.0",
|
||||
|
||||
44
apps/gateway/src/log/cron.service.ts
Normal file
44
apps/gateway/src/log/cron.service.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { Injectable, Logger, type OnModuleInit, type OnModuleDestroy } from '@nestjs/common';
|
||||
import cron from 'node-cron';
|
||||
import { SummarizationService } from './summarization.service.js';
|
||||
|
||||
@Injectable()
|
||||
export class CronService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(CronService.name);
|
||||
private readonly tasks: cron.ScheduledTask[] = [];
|
||||
|
||||
constructor(private readonly summarization: SummarizationService) {}
|
||||
|
||||
onModuleInit(): void {
|
||||
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
|
||||
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
|
||||
|
||||
this.tasks.push(
|
||||
cron.schedule(summarizationSchedule, () => {
|
||||
this.summarization.runSummarization().catch((err) => {
|
||||
this.logger.error(`Scheduled summarization failed: ${err}`);
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
this.tasks.push(
|
||||
cron.schedule(tierManagementSchedule, () => {
|
||||
this.summarization.runTierManagement().catch((err) => {
|
||||
this.logger.error(`Scheduled tier management failed: ${err}`);
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}"`,
|
||||
);
|
||||
}
|
||||
|
||||
onModuleDestroy(): void {
|
||||
for (const task of this.tasks) {
|
||||
task.stop();
|
||||
}
|
||||
this.tasks.length = 0;
|
||||
this.logger.log('Cron tasks stopped');
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,8 @@ import type { Db } from '@mosaic/db';
|
||||
import { DB } from '../database/database.module.js';
|
||||
import { LOG_SERVICE } from './log.tokens.js';
|
||||
import { LogController } from './log.controller.js';
|
||||
import { SummarizationService } from './summarization.service.js';
|
||||
import { CronService } from './cron.service.js';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
@@ -13,8 +15,10 @@ import { LogController } from './log.controller.js';
|
||||
useFactory: (db: Db): LogService => createLogService(db),
|
||||
inject: [DB],
|
||||
},
|
||||
SummarizationService,
|
||||
CronService,
|
||||
],
|
||||
controllers: [LogController],
|
||||
exports: [LOG_SERVICE],
|
||||
exports: [LOG_SERVICE, SummarizationService],
|
||||
})
|
||||
export class LogModule {}
|
||||
|
||||
178
apps/gateway/src/log/summarization.service.ts
Normal file
178
apps/gateway/src/log/summarization.service.ts
Normal file
@@ -0,0 +1,178 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import type { LogService } from '@mosaic/log';
|
||||
import type { Memory } from '@mosaic/memory';
|
||||
import { LOG_SERVICE } from './log.tokens.js';
|
||||
import { MEMORY } from '../memory/memory.tokens.js';
|
||||
import { EmbeddingService } from '../memory/embedding.service.js';
|
||||
import type { Db } from '@mosaic/db';
|
||||
import { sql, summarizationJobs } from '@mosaic/db';
|
||||
import { DB } from '../database/database.module.js';
|
||||
|
||||
const SUMMARIZATION_PROMPT = `You are a knowledge extraction assistant. Given the following agent interaction logs, extract the key decisions, learnings, and patterns. Output a concise summary (2-4 sentences) that captures the most important information for future reference. Focus on actionable insights, not raw events.
|
||||
|
||||
Logs:
|
||||
{logs}
|
||||
|
||||
Summary:`;
|
||||
|
||||
interface ChatCompletion {
|
||||
choices: Array<{ message: { content: string } }>;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class SummarizationService {
|
||||
private readonly logger = new Logger(SummarizationService.name);
|
||||
private readonly apiKey: string | undefined;
|
||||
private readonly baseUrl: string;
|
||||
private readonly model: string;
|
||||
|
||||
constructor(
|
||||
@Inject(LOG_SERVICE) private readonly logService: LogService,
|
||||
@Inject(MEMORY) private readonly memory: Memory,
|
||||
private readonly embeddings: EmbeddingService,
|
||||
@Inject(DB) private readonly db: Db,
|
||||
) {
|
||||
this.apiKey = process.env['OPENAI_API_KEY'];
|
||||
this.baseUrl = process.env['SUMMARIZATION_API_URL'] ?? 'https://api.openai.com/v1';
|
||||
this.model = process.env['SUMMARIZATION_MODEL'] ?? 'gpt-4o-mini';
|
||||
}
|
||||
|
||||
/**
|
||||
* Run one summarization cycle:
|
||||
* 1. Find hot logs older than 24h with decision/learning/tool_use categories
|
||||
* 2. Group by session
|
||||
* 3. Summarize each group via cheap LLM
|
||||
* 4. Store as insights with embeddings
|
||||
* 5. Transition processed logs to warm tier
|
||||
*/
|
||||
async runSummarization(): Promise<{ logsProcessed: number; insightsCreated: number }> {
|
||||
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24h ago
|
||||
|
||||
// Create job record
|
||||
const [job] = await this.db
|
||||
.insert(summarizationJobs)
|
||||
.values({ status: 'running', startedAt: new Date() })
|
||||
.returning();
|
||||
|
||||
try {
|
||||
const logs = await this.logService.logs.getLogsForSummarization(cutoff, 200);
|
||||
if (logs.length === 0) {
|
||||
await this.db
|
||||
.update(summarizationJobs)
|
||||
.set({ status: 'completed', completedAt: new Date() })
|
||||
.where(sql`id = ${job!.id}`);
|
||||
return { logsProcessed: 0, insightsCreated: 0 };
|
||||
}
|
||||
|
||||
// Group logs by session
|
||||
const bySession = new Map<string, typeof logs>();
|
||||
for (const log of logs) {
|
||||
const group = bySession.get(log.sessionId) ?? [];
|
||||
group.push(log);
|
||||
bySession.set(log.sessionId, group);
|
||||
}
|
||||
|
||||
let insightsCreated = 0;
|
||||
|
||||
for (const [sessionId, sessionLogs] of bySession) {
|
||||
const userId = sessionLogs[0]?.userId;
|
||||
if (!userId) continue;
|
||||
|
||||
const logsText = sessionLogs.map((l) => `[${l.category}] ${l.content}`).join('\n');
|
||||
|
||||
const summary = await this.summarize(logsText);
|
||||
if (!summary) continue;
|
||||
|
||||
const embedding = this.embeddings.available
|
||||
? await this.embeddings.embed(summary)
|
||||
: undefined;
|
||||
|
||||
await this.memory.insights.create({
|
||||
userId,
|
||||
content: summary,
|
||||
embedding: embedding ?? null,
|
||||
source: 'summarization',
|
||||
category: 'learning',
|
||||
metadata: { sessionId, logCount: sessionLogs.length },
|
||||
});
|
||||
insightsCreated++;
|
||||
}
|
||||
|
||||
// Transition processed logs to warm
|
||||
await this.logService.logs.promoteToWarm(cutoff);
|
||||
|
||||
await this.db
|
||||
.update(summarizationJobs)
|
||||
.set({
|
||||
status: 'completed',
|
||||
logsProcessed: logs.length,
|
||||
insightsCreated,
|
||||
completedAt: new Date(),
|
||||
})
|
||||
.where(sql`id = ${job!.id}`);
|
||||
|
||||
this.logger.log(`Summarization complete: ${logs.length} logs → ${insightsCreated} insights`);
|
||||
return { logsProcessed: logs.length, insightsCreated };
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
await this.db
|
||||
.update(summarizationJobs)
|
||||
.set({ status: 'failed', errorMessage: message, completedAt: new Date() })
|
||||
.where(sql`id = ${job!.id}`);
|
||||
this.logger.error(`Summarization failed: ${message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run tier management:
|
||||
* - Warm logs older than 30 days → cold
|
||||
* - Cold logs older than 90 days → purged
|
||||
* - Decay old insight relevance scores
|
||||
*/
|
||||
async runTierManagement(): Promise<void> {
|
||||
const warmCutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
|
||||
const coldCutoff = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000);
|
||||
const decayCutoff = new Date(Date.now() - 14 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const promoted = await this.logService.logs.promoteToCold(warmCutoff);
|
||||
const purged = await this.logService.logs.purge(coldCutoff);
|
||||
const decayed = await this.memory.insights.decayOldInsights(decayCutoff);
|
||||
|
||||
this.logger.log(
|
||||
`Tier management: ${promoted} logs→cold, ${purged} purged, ${decayed} insights decayed`,
|
||||
);
|
||||
}
|
||||
|
||||
private async summarize(logsText: string): Promise<string | null> {
|
||||
if (!this.apiKey) {
|
||||
this.logger.warn('No API key configured — skipping summarization');
|
||||
return null;
|
||||
}
|
||||
|
||||
const prompt = SUMMARIZATION_PROMPT.replace('{logs}', logsText);
|
||||
|
||||
const response = await fetch(`${this.baseUrl}/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
model: this.model,
|
||||
messages: [{ role: 'user', content: prompt }],
|
||||
max_tokens: 300,
|
||||
temperature: 0.3,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const body = await response.text();
|
||||
this.logger.error(`Summarization API error: ${response.status} ${body}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const json = (await response.json()) as ChatCompletion;
|
||||
return json.choices[0]?.message.content ?? null;
|
||||
}
|
||||
}
|
||||
@@ -40,7 +40,7 @@
|
||||
| P4-001 | in-progress | Phase 4 | @mosaic/memory — preference + insight stores | — | #34 |
|
||||
| P4-002 | in-progress | Phase 4 | Semantic search — pgvector embeddings + search API | — | #35 |
|
||||
| P4-003 | in-progress | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 |
|
||||
| P4-004 | not-started | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 |
|
||||
| P4-004 | in-progress | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 |
|
||||
| P4-005 | not-started | Phase 4 | Memory integration — inject into agent sessions | — | #38 |
|
||||
| P4-006 | not-started | Phase 4 | Skill management — catalog, install, config | — | #39 |
|
||||
| P4-007 | not-started | Phase 4 | Verify Phase 4 — memory + log pipeline working | — | #40 |
|
||||
|
||||
17
pnpm-lock.yaml
generated
17
pnpm-lock.yaml
generated
@@ -113,6 +113,9 @@ importers:
|
||||
fastify:
|
||||
specifier: ^5.0.0
|
||||
version: 5.8.2
|
||||
node-cron:
|
||||
specifier: ^4.2.1
|
||||
version: 4.2.1
|
||||
reflect-metadata:
|
||||
specifier: ^0.2.0
|
||||
version: 0.2.2
|
||||
@@ -129,6 +132,9 @@ importers:
|
||||
'@types/node':
|
||||
specifier: ^22.0.0
|
||||
version: 22.19.15
|
||||
'@types/node-cron':
|
||||
specifier: ^3.0.11
|
||||
version: 3.0.11
|
||||
'@types/uuid':
|
||||
specifier: ^10.0.0
|
||||
version: 10.0.0
|
||||
@@ -2747,6 +2753,9 @@ packages:
|
||||
'@types/mysql@2.15.27':
|
||||
resolution: {integrity: sha512-YfWiV16IY0OeBfBCk8+hXKmdTKrKlwKN1MNKAPBu5JYxLwBEZl7QzeEpGnlZb3VMGJrrGmB84gXiH+ofs/TezA==}
|
||||
|
||||
'@types/node-cron@3.0.11':
|
||||
resolution: {integrity: sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==}
|
||||
|
||||
'@types/node@22.19.15':
|
||||
resolution: {integrity: sha512-F0R/h2+dsy5wJAUe3tAU6oqa2qbWY5TpNfL/RGmo1y38hiyO1w3x2jPtt76wmuaJI4DQnOBu21cNXQ2STIUUWg==}
|
||||
|
||||
@@ -4174,6 +4183,10 @@ packages:
|
||||
sass:
|
||||
optional: true
|
||||
|
||||
node-cron@4.2.1:
|
||||
resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==}
|
||||
engines: {node: '>=6.0.0'}
|
||||
|
||||
node-domexception@1.0.0:
|
||||
resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==}
|
||||
engines: {node: '>=10.5.0'}
|
||||
@@ -7548,6 +7561,8 @@ snapshots:
|
||||
dependencies:
|
||||
'@types/node': 22.19.15
|
||||
|
||||
'@types/node-cron@3.0.11': {}
|
||||
|
||||
'@types/node@22.19.15':
|
||||
dependencies:
|
||||
undici-types: 6.21.0
|
||||
@@ -9001,6 +9016,8 @@ snapshots:
|
||||
- '@babel/core'
|
||||
- babel-plugin-macros
|
||||
|
||||
node-cron@4.2.1: {}
|
||||
|
||||
node-domexception@1.0.0: {}
|
||||
|
||||
node-fetch@3.3.2:
|
||||
|
||||
Reference in New Issue
Block a user