From 666d2bc36d074325ded46e24315103e150458e52 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 13 Mar 2026 08:51:10 -0500 Subject: [PATCH] =?UTF-8?q?feat(P4-003):=20@mosaic/log=20=E2=80=94=20log?= =?UTF-8?q?=20ingest,=20parsing,=20tiered=20storage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement AgentLogsRepo with structured log ingest (single + batch), flexible query builder (filter by session, level, category, tier, date range), and tiered storage management (hot→warm→cold→purge). Add getLogsForSummarization() for the summarization pipeline. Wire LogModule into gateway with REST endpoints at /api/logs. Co-Authored-By: Claude Opus 4.6 --- apps/gateway/package.json | 1 + apps/gateway/src/app.module.ts | 2 + apps/gateway/src/log/log.controller.ts | 62 +++++++++++++ apps/gateway/src/log/log.dto.ts | 18 ++++ apps/gateway/src/log/log.module.ts | 20 +++++ apps/gateway/src/log/log.tokens.ts | 1 + docs/TASKS.md | 2 +- packages/log/package.json | 5 ++ packages/log/src/agent-logs.ts | 117 +++++++++++++++++++++++++ packages/log/src/index.ts | 12 ++- packages/log/src/log-service.ts | 12 +++ pnpm-lock.yaml | 10 +++ 12 files changed, 260 insertions(+), 2 deletions(-) create mode 100644 apps/gateway/src/log/log.controller.ts create mode 100644 apps/gateway/src/log/log.dto.ts create mode 100644 apps/gateway/src/log/log.module.ts create mode 100644 apps/gateway/src/log/log.tokens.ts create mode 100644 packages/log/src/agent-logs.ts create mode 100644 packages/log/src/log-service.ts diff --git a/apps/gateway/package.json b/apps/gateway/package.json index 76da96a..9e64384 100644 --- a/apps/gateway/package.json +++ b/apps/gateway/package.json @@ -18,6 +18,7 @@ "@mosaic/brain": "workspace:^", "@mosaic/coord": "workspace:^", "@mosaic/db": "workspace:^", + "@mosaic/log": "workspace:^", "@mosaic/memory": "workspace:^", "@mosaic/types": "workspace:^", "@nestjs/common": "^11.0.0", diff --git a/apps/gateway/src/app.module.ts b/apps/gateway/src/app.module.ts index eb0a9cf..c41fabc 100644 --- a/apps/gateway/src/app.module.ts +++ b/apps/gateway/src/app.module.ts @@ -11,6 +11,7 @@ import { MissionsModule } from './missions/missions.module.js'; import { TasksModule } from './tasks/tasks.module.js'; import { CoordModule } from './coord/coord.module.js'; import { MemoryModule } from './memory/memory.module.js'; +import { LogModule } from './log/log.module.js'; @Module({ imports: [ @@ -25,6 +26,7 @@ import { MemoryModule } from './memory/memory.module.js'; TasksModule, CoordModule, MemoryModule, + LogModule, ], controllers: [HealthController], }) diff --git a/apps/gateway/src/log/log.controller.ts b/apps/gateway/src/log/log.controller.ts new file mode 100644 index 0000000..91ad6da --- /dev/null +++ b/apps/gateway/src/log/log.controller.ts @@ -0,0 +1,62 @@ +import { Body, Controller, Get, Inject, Param, Post, Query, UseGuards } from '@nestjs/common'; +import type { LogService } from '@mosaic/log'; +import { LOG_SERVICE } from './log.tokens.js'; +import { AuthGuard } from '../auth/auth.guard.js'; +import type { IngestLogDto, QueryLogsDto } from './log.dto.js'; + +@Controller('api/logs') +@UseGuards(AuthGuard) +export class LogController { + constructor(@Inject(LOG_SERVICE) private readonly logService: LogService) {} + + @Post() + async ingest(@Query('userId') userId: string, @Body() dto: IngestLogDto) { + return this.logService.logs.ingest({ + sessionId: dto.sessionId, + userId, + level: dto.level, + category: dto.category, + content: dto.content, + metadata: dto.metadata, + }); + } + + @Post('batch') + async ingestBatch(@Query('userId') userId: string, @Body() dtos: IngestLogDto[]) { + const entries = dtos.map((dto) => ({ + sessionId: dto.sessionId, + userId, + level: dto.level as 'debug' | 'info' | 'warn' | 'error' | undefined, + category: dto.category as + | 'decision' + | 'tool_use' + | 'learning' + | 'error' + | 'general' + | undefined, + content: dto.content, + metadata: dto.metadata, + })); + return this.logService.logs.ingestBatch(entries); + } + + @Get() + async query(@Query('userId') userId: string, @Query() params: QueryLogsDto) { + return this.logService.logs.query({ + userId, + sessionId: params.sessionId, + level: params.level, + category: params.category, + tier: params.tier, + since: params.since ? new Date(params.since) : undefined, + until: params.until ? new Date(params.until) : undefined, + limit: params.limit ? Number(params.limit) : undefined, + offset: params.offset ? Number(params.offset) : undefined, + }); + } + + @Get(':id') + async findOne(@Param('id') id: string) { + return this.logService.logs.findById(id); + } +} diff --git a/apps/gateway/src/log/log.dto.ts b/apps/gateway/src/log/log.dto.ts new file mode 100644 index 0000000..b0eac35 --- /dev/null +++ b/apps/gateway/src/log/log.dto.ts @@ -0,0 +1,18 @@ +export interface IngestLogDto { + sessionId: string; + level?: 'debug' | 'info' | 'warn' | 'error'; + category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general'; + content: string; + metadata?: Record; +} + +export interface QueryLogsDto { + sessionId?: string; + level?: 'debug' | 'info' | 'warn' | 'error'; + category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general'; + tier?: 'hot' | 'warm' | 'cold'; + since?: string; + until?: string; + limit?: string; + offset?: string; +} diff --git a/apps/gateway/src/log/log.module.ts b/apps/gateway/src/log/log.module.ts new file mode 100644 index 0000000..5963030 --- /dev/null +++ b/apps/gateway/src/log/log.module.ts @@ -0,0 +1,20 @@ +import { Global, Module } from '@nestjs/common'; +import { createLogService, type LogService } from '@mosaic/log'; +import type { Db } from '@mosaic/db'; +import { DB } from '../database/database.module.js'; +import { LOG_SERVICE } from './log.tokens.js'; +import { LogController } from './log.controller.js'; + +@Global() +@Module({ + providers: [ + { + provide: LOG_SERVICE, + useFactory: (db: Db): LogService => createLogService(db), + inject: [DB], + }, + ], + controllers: [LogController], + exports: [LOG_SERVICE], +}) +export class LogModule {} diff --git a/apps/gateway/src/log/log.tokens.ts b/apps/gateway/src/log/log.tokens.ts new file mode 100644 index 0000000..3117b2a --- /dev/null +++ b/apps/gateway/src/log/log.tokens.ts @@ -0,0 +1 @@ +export const LOG_SERVICE = 'LOG_SERVICE'; diff --git a/docs/TASKS.md b/docs/TASKS.md index 92c8c2c..563a562 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -39,7 +39,7 @@ | P3-008 | done | Phase 3 | Verify Phase 3 — web dashboard functional E2E | — | #33 | | P4-001 | in-progress | Phase 4 | @mosaic/memory — preference + insight stores | — | #34 | | P4-002 | in-progress | Phase 4 | Semantic search — pgvector embeddings + search API | — | #35 | -| P4-003 | not-started | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 | +| P4-003 | in-progress | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 | | P4-004 | not-started | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 | | P4-005 | not-started | Phase 4 | Memory integration — inject into agent sessions | — | #38 | | P4-006 | not-started | Phase 4 | Skill management — catalog, install, config | — | #39 | diff --git a/packages/log/package.json b/packages/log/package.json index b1ae41f..08cf723 100644 --- a/packages/log/package.json +++ b/packages/log/package.json @@ -1,6 +1,7 @@ { "name": "@mosaic/log", "version": "0.0.0", + "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", "exports": { @@ -15,6 +16,10 @@ "typecheck": "tsc --noEmit", "test": "vitest run --passWithNoTests" }, + "dependencies": { + "@mosaic/db": "workspace:*", + "drizzle-orm": "^0.45.1" + }, "devDependencies": { "typescript": "^5.8.0", "vitest": "^2.0.0" diff --git a/packages/log/src/agent-logs.ts b/packages/log/src/agent-logs.ts new file mode 100644 index 0000000..36237f4 --- /dev/null +++ b/packages/log/src/agent-logs.ts @@ -0,0 +1,117 @@ +import { eq, and, desc, lt, sql, type Db, agentLogs } from '@mosaic/db'; + +export type AgentLog = typeof agentLogs.$inferSelect; +export type NewAgentLog = typeof agentLogs.$inferInsert; + +export type LogLevel = 'debug' | 'info' | 'warn' | 'error'; +export type LogCategory = 'decision' | 'tool_use' | 'learning' | 'error' | 'general'; +export type LogTier = 'hot' | 'warm' | 'cold'; + +export interface LogQuery { + userId?: string; + sessionId?: string; + level?: LogLevel; + category?: LogCategory; + tier?: LogTier; + since?: Date; + until?: Date; + limit?: number; + offset?: number; +} + +export function createAgentLogsRepo(db: Db) { + return { + async ingest(entry: NewAgentLog): Promise { + const rows = await db.insert(agentLogs).values(entry).returning(); + return rows[0]!; + }, + + async ingestBatch(entries: NewAgentLog[]): Promise { + if (entries.length === 0) return []; + return db.insert(agentLogs).values(entries).returning(); + }, + + async query(params: LogQuery): Promise { + const conditions = []; + + if (params.userId) conditions.push(eq(agentLogs.userId, params.userId)); + if (params.sessionId) conditions.push(eq(agentLogs.sessionId, params.sessionId)); + if (params.level) conditions.push(eq(agentLogs.level, params.level)); + if (params.category) conditions.push(eq(agentLogs.category, params.category)); + if (params.tier) conditions.push(eq(agentLogs.tier, params.tier)); + if (params.since) conditions.push(sql`${agentLogs.createdAt} >= ${params.since}`); + if (params.until) conditions.push(sql`${agentLogs.createdAt} <= ${params.until}`); + + const where = conditions.length > 0 ? and(...conditions) : undefined; + + return db + .select() + .from(agentLogs) + .where(where) + .orderBy(desc(agentLogs.createdAt)) + .limit(params.limit ?? 100) + .offset(params.offset ?? 0); + }, + + async findById(id: string): Promise { + const rows = await db.select().from(agentLogs).where(eq(agentLogs.id, id)); + return rows[0]; + }, + + /** + * Transition hot logs older than the cutoff to warm tier. + * Returns the number of logs transitioned. + */ + async promoteToWarm(olderThan: Date): Promise { + const result = await db + .update(agentLogs) + .set({ tier: 'warm', summarizedAt: new Date() }) + .where(and(eq(agentLogs.tier, 'hot'), lt(agentLogs.createdAt, olderThan))) + .returning(); + return result.length; + }, + + /** + * Transition warm logs older than the cutoff to cold tier. + */ + async promoteToCold(olderThan: Date): Promise { + const result = await db + .update(agentLogs) + .set({ tier: 'cold', archivedAt: new Date() }) + .where(and(eq(agentLogs.tier, 'warm'), lt(agentLogs.createdAt, olderThan))) + .returning(); + return result.length; + }, + + /** + * Delete cold logs older than the retention period. + */ + async purge(olderThan: Date): Promise { + const result = await db + .delete(agentLogs) + .where(and(eq(agentLogs.tier, 'cold'), lt(agentLogs.createdAt, olderThan))) + .returning(); + return result.length; + }, + + /** + * Get hot logs ready for summarization (decisions + learnings). + */ + async getLogsForSummarization(olderThan: Date, limit = 100): Promise { + return db + .select() + .from(agentLogs) + .where( + and( + eq(agentLogs.tier, 'hot'), + lt(agentLogs.createdAt, olderThan), + sql`${agentLogs.category} IN ('decision', 'learning', 'tool_use')`, + ), + ) + .orderBy(agentLogs.createdAt) + .limit(limit); + }, + }; +} + +export type AgentLogsRepo = ReturnType; diff --git a/packages/log/src/index.ts b/packages/log/src/index.ts index 0c18d5d..f73585f 100644 --- a/packages/log/src/index.ts +++ b/packages/log/src/index.ts @@ -1 +1,11 @@ -export const VERSION = '0.0.0'; +export { createLogService, type LogService } from './log-service.js'; +export { + createAgentLogsRepo, + type AgentLogsRepo, + type AgentLog, + type NewAgentLog, + type LogLevel, + type LogCategory, + type LogTier, + type LogQuery, +} from './agent-logs.js'; diff --git a/packages/log/src/log-service.ts b/packages/log/src/log-service.ts new file mode 100644 index 0000000..aa050a9 --- /dev/null +++ b/packages/log/src/log-service.ts @@ -0,0 +1,12 @@ +import type { Db } from '@mosaic/db'; +import { createAgentLogsRepo, type AgentLogsRepo } from './agent-logs.js'; + +export interface LogService { + logs: AgentLogsRepo; +} + +export function createLogService(db: Db): LogService { + return { + logs: createAgentLogsRepo(db), + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5e36928..4a21ec0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,6 +59,9 @@ importers: '@mosaic/db': specifier: workspace:^ version: link:../../packages/db + '@mosaic/log': + specifier: workspace:^ + version: link:../../packages/log '@mosaic/memory': specifier: workspace:^ version: link:../../packages/memory @@ -324,6 +327,13 @@ importers: version: 2.1.9(@types/node@22.19.15)(lightningcss@1.31.1) packages/log: + dependencies: + '@mosaic/db': + specifier: workspace:* + version: link:../db + drizzle-orm: + specifier: ^0.45.1 + version: 0.45.1(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(kysely@0.28.11)(postgres@3.4.8) devDependencies: typescript: specifier: ^5.8.0