feat(Phase 4): Memory & Intelligence — memory, log, summarization, skills (#91)

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #91.
This commit is contained in:
2026-03-13 13:56:50 +00:00
committed by jason.woltje
parent d83ebe65e9
commit 9eb48e1d9b
35 changed files with 1481 additions and 16 deletions

View File

@@ -0,0 +1,44 @@
import { Injectable, Logger, type OnModuleInit, type OnModuleDestroy } from '@nestjs/common';
import cron from 'node-cron';
import { SummarizationService } from './summarization.service.js';
@Injectable()
export class CronService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CronService.name);
private readonly tasks: cron.ScheduledTask[] = [];
constructor(private readonly summarization: SummarizationService) {}
onModuleInit(): void {
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
this.tasks.push(
cron.schedule(summarizationSchedule, () => {
this.summarization.runSummarization().catch((err) => {
this.logger.error(`Scheduled summarization failed: ${err}`);
});
}),
);
this.tasks.push(
cron.schedule(tierManagementSchedule, () => {
this.summarization.runTierManagement().catch((err) => {
this.logger.error(`Scheduled tier management failed: ${err}`);
});
}),
);
this.logger.log(
`Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}"`,
);
}
onModuleDestroy(): void {
for (const task of this.tasks) {
task.stop();
}
this.tasks.length = 0;
this.logger.log('Cron tasks stopped');
}
}

View File

@@ -0,0 +1,62 @@
import { Body, Controller, Get, Inject, Param, Post, Query, UseGuards } from '@nestjs/common';
import type { LogService } from '@mosaic/log';
import { LOG_SERVICE } from './log.tokens.js';
import { AuthGuard } from '../auth/auth.guard.js';
import type { IngestLogDto, QueryLogsDto } from './log.dto.js';
@Controller('api/logs')
@UseGuards(AuthGuard)
export class LogController {
constructor(@Inject(LOG_SERVICE) private readonly logService: LogService) {}
@Post()
async ingest(@Query('userId') userId: string, @Body() dto: IngestLogDto) {
return this.logService.logs.ingest({
sessionId: dto.sessionId,
userId,
level: dto.level,
category: dto.category,
content: dto.content,
metadata: dto.metadata,
});
}
@Post('batch')
async ingestBatch(@Query('userId') userId: string, @Body() dtos: IngestLogDto[]) {
const entries = dtos.map((dto) => ({
sessionId: dto.sessionId,
userId,
level: dto.level as 'debug' | 'info' | 'warn' | 'error' | undefined,
category: dto.category as
| 'decision'
| 'tool_use'
| 'learning'
| 'error'
| 'general'
| undefined,
content: dto.content,
metadata: dto.metadata,
}));
return this.logService.logs.ingestBatch(entries);
}
@Get()
async query(@Query('userId') userId: string, @Query() params: QueryLogsDto) {
return this.logService.logs.query({
userId,
sessionId: params.sessionId,
level: params.level,
category: params.category,
tier: params.tier,
since: params.since ? new Date(params.since) : undefined,
until: params.until ? new Date(params.until) : undefined,
limit: params.limit ? Number(params.limit) : undefined,
offset: params.offset ? Number(params.offset) : undefined,
});
}
@Get(':id')
async findOne(@Param('id') id: string) {
return this.logService.logs.findById(id);
}
}

View File

@@ -0,0 +1,18 @@
export interface IngestLogDto {
sessionId: string;
level?: 'debug' | 'info' | 'warn' | 'error';
category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
content: string;
metadata?: Record<string, unknown>;
}
export interface QueryLogsDto {
sessionId?: string;
level?: 'debug' | 'info' | 'warn' | 'error';
category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
tier?: 'hot' | 'warm' | 'cold';
since?: string;
until?: string;
limit?: string;
offset?: string;
}

View File

@@ -0,0 +1,24 @@
import { Global, Module } from '@nestjs/common';
import { createLogService, type LogService } from '@mosaic/log';
import type { Db } from '@mosaic/db';
import { DB } from '../database/database.module.js';
import { LOG_SERVICE } from './log.tokens.js';
import { LogController } from './log.controller.js';
import { SummarizationService } from './summarization.service.js';
import { CronService } from './cron.service.js';
@Global()
@Module({
providers: [
{
provide: LOG_SERVICE,
useFactory: (db: Db): LogService => createLogService(db),
inject: [DB],
},
SummarizationService,
CronService,
],
controllers: [LogController],
exports: [LOG_SERVICE, SummarizationService],
})
export class LogModule {}

View File

@@ -0,0 +1 @@
export const LOG_SERVICE = 'LOG_SERVICE';

View File

@@ -0,0 +1,178 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import type { LogService } from '@mosaic/log';
import type { Memory } from '@mosaic/memory';
import { LOG_SERVICE } from './log.tokens.js';
import { MEMORY } from '../memory/memory.tokens.js';
import { EmbeddingService } from '../memory/embedding.service.js';
import type { Db } from '@mosaic/db';
import { sql, summarizationJobs } from '@mosaic/db';
import { DB } from '../database/database.module.js';
const SUMMARIZATION_PROMPT = `You are a knowledge extraction assistant. Given the following agent interaction logs, extract the key decisions, learnings, and patterns. Output a concise summary (2-4 sentences) that captures the most important information for future reference. Focus on actionable insights, not raw events.
Logs:
{logs}
Summary:`;
interface ChatCompletion {
choices: Array<{ message: { content: string } }>;
}
@Injectable()
export class SummarizationService {
private readonly logger = new Logger(SummarizationService.name);
private readonly apiKey: string | undefined;
private readonly baseUrl: string;
private readonly model: string;
constructor(
@Inject(LOG_SERVICE) private readonly logService: LogService,
@Inject(MEMORY) private readonly memory: Memory,
private readonly embeddings: EmbeddingService,
@Inject(DB) private readonly db: Db,
) {
this.apiKey = process.env['OPENAI_API_KEY'];
this.baseUrl = process.env['SUMMARIZATION_API_URL'] ?? 'https://api.openai.com/v1';
this.model = process.env['SUMMARIZATION_MODEL'] ?? 'gpt-4o-mini';
}
/**
* Run one summarization cycle:
* 1. Find hot logs older than 24h with decision/learning/tool_use categories
* 2. Group by session
* 3. Summarize each group via cheap LLM
* 4. Store as insights with embeddings
* 5. Transition processed logs to warm tier
*/
async runSummarization(): Promise<{ logsProcessed: number; insightsCreated: number }> {
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24h ago
// Create job record
const [job] = await this.db
.insert(summarizationJobs)
.values({ status: 'running', startedAt: new Date() })
.returning();
try {
const logs = await this.logService.logs.getLogsForSummarization(cutoff, 200);
if (logs.length === 0) {
await this.db
.update(summarizationJobs)
.set({ status: 'completed', completedAt: new Date() })
.where(sql`id = ${job!.id}`);
return { logsProcessed: 0, insightsCreated: 0 };
}
// Group logs by session
const bySession = new Map<string, typeof logs>();
for (const log of logs) {
const group = bySession.get(log.sessionId) ?? [];
group.push(log);
bySession.set(log.sessionId, group);
}
let insightsCreated = 0;
for (const [sessionId, sessionLogs] of bySession) {
const userId = sessionLogs[0]?.userId;
if (!userId) continue;
const logsText = sessionLogs.map((l) => `[${l.category}] ${l.content}`).join('\n');
const summary = await this.summarize(logsText);
if (!summary) continue;
const embedding = this.embeddings.available
? await this.embeddings.embed(summary)
: undefined;
await this.memory.insights.create({
userId,
content: summary,
embedding: embedding ?? null,
source: 'summarization',
category: 'learning',
metadata: { sessionId, logCount: sessionLogs.length },
});
insightsCreated++;
}
// Transition processed logs to warm
await this.logService.logs.promoteToWarm(cutoff);
await this.db
.update(summarizationJobs)
.set({
status: 'completed',
logsProcessed: logs.length,
insightsCreated,
completedAt: new Date(),
})
.where(sql`id = ${job!.id}`);
this.logger.log(`Summarization complete: ${logs.length} logs → ${insightsCreated} insights`);
return { logsProcessed: logs.length, insightsCreated };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await this.db
.update(summarizationJobs)
.set({ status: 'failed', errorMessage: message, completedAt: new Date() })
.where(sql`id = ${job!.id}`);
this.logger.error(`Summarization failed: ${message}`);
throw error;
}
}
/**
* Run tier management:
* - Warm logs older than 30 days → cold
* - Cold logs older than 90 days → purged
* - Decay old insight relevance scores
*/
async runTierManagement(): Promise<void> {
const warmCutoff = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const coldCutoff = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000);
const decayCutoff = new Date(Date.now() - 14 * 24 * 60 * 60 * 1000);
const promoted = await this.logService.logs.promoteToCold(warmCutoff);
const purged = await this.logService.logs.purge(coldCutoff);
const decayed = await this.memory.insights.decayOldInsights(decayCutoff);
this.logger.log(
`Tier management: ${promoted} logs→cold, ${purged} purged, ${decayed} insights decayed`,
);
}
private async summarize(logsText: string): Promise<string | null> {
if (!this.apiKey) {
this.logger.warn('No API key configured — skipping summarization');
return null;
}
const prompt = SUMMARIZATION_PROMPT.replace('{logs}', logsText);
const response = await fetch(`${this.baseUrl}/chat/completions`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.apiKey}`,
},
body: JSON.stringify({
model: this.model,
messages: [{ role: 'user', content: prompt }],
max_tokens: 300,
temperature: 0.3,
}),
});
if (!response.ok) {
const body = await response.text();
this.logger.error(`Summarization API error: ${response.status} ${body}`);
return null;
}
const json = (await response.json()) as ChatCompletion;
return json.choices[0]?.message.content ?? null;
}
}