feat(P4-003): @mosaic/log — log ingest, parsing, tiered storage

Implement AgentLogsRepo with structured log ingest (single + batch),
flexible query builder (filter by session, level, category, tier,
date range), and tiered storage management (hot→warm→cold→purge).
Add getLogsForSummarization() for the summarization pipeline.
Wire LogModule into gateway with REST endpoints at /api/logs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-13 08:51:10 -05:00
parent fb3c308efd
commit 666d2bc36d
12 changed files with 260 additions and 2 deletions

View File

@@ -18,6 +18,7 @@
"@mosaic/brain": "workspace:^",
"@mosaic/coord": "workspace:^",
"@mosaic/db": "workspace:^",
"@mosaic/log": "workspace:^",
"@mosaic/memory": "workspace:^",
"@mosaic/types": "workspace:^",
"@nestjs/common": "^11.0.0",

View File

@@ -11,6 +11,7 @@ import { MissionsModule } from './missions/missions.module.js';
import { TasksModule } from './tasks/tasks.module.js';
import { CoordModule } from './coord/coord.module.js';
import { MemoryModule } from './memory/memory.module.js';
import { LogModule } from './log/log.module.js';
@Module({
imports: [
@@ -25,6 +26,7 @@ import { MemoryModule } from './memory/memory.module.js';
TasksModule,
CoordModule,
MemoryModule,
LogModule,
],
controllers: [HealthController],
})

View File

@@ -0,0 +1,62 @@
import { Body, Controller, Get, Inject, Param, Post, Query, UseGuards } from '@nestjs/common';
import type { LogService } from '@mosaic/log';
import { LOG_SERVICE } from './log.tokens.js';
import { AuthGuard } from '../auth/auth.guard.js';
import type { IngestLogDto, QueryLogsDto } from './log.dto.js';
@Controller('api/logs')
@UseGuards(AuthGuard)
export class LogController {
constructor(@Inject(LOG_SERVICE) private readonly logService: LogService) {}
@Post()
async ingest(@Query('userId') userId: string, @Body() dto: IngestLogDto) {
return this.logService.logs.ingest({
sessionId: dto.sessionId,
userId,
level: dto.level,
category: dto.category,
content: dto.content,
metadata: dto.metadata,
});
}
@Post('batch')
async ingestBatch(@Query('userId') userId: string, @Body() dtos: IngestLogDto[]) {
const entries = dtos.map((dto) => ({
sessionId: dto.sessionId,
userId,
level: dto.level as 'debug' | 'info' | 'warn' | 'error' | undefined,
category: dto.category as
| 'decision'
| 'tool_use'
| 'learning'
| 'error'
| 'general'
| undefined,
content: dto.content,
metadata: dto.metadata,
}));
return this.logService.logs.ingestBatch(entries);
}
@Get()
async query(@Query('userId') userId: string, @Query() params: QueryLogsDto) {
return this.logService.logs.query({
userId,
sessionId: params.sessionId,
level: params.level,
category: params.category,
tier: params.tier,
since: params.since ? new Date(params.since) : undefined,
until: params.until ? new Date(params.until) : undefined,
limit: params.limit ? Number(params.limit) : undefined,
offset: params.offset ? Number(params.offset) : undefined,
});
}
@Get(':id')
async findOne(@Param('id') id: string) {
return this.logService.logs.findById(id);
}
}

View File

@@ -0,0 +1,18 @@
export interface IngestLogDto {
sessionId: string;
level?: 'debug' | 'info' | 'warn' | 'error';
category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
content: string;
metadata?: Record<string, unknown>;
}
export interface QueryLogsDto {
sessionId?: string;
level?: 'debug' | 'info' | 'warn' | 'error';
category?: 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
tier?: 'hot' | 'warm' | 'cold';
since?: string;
until?: string;
limit?: string;
offset?: string;
}

View File

@@ -0,0 +1,20 @@
import { Global, Module } from '@nestjs/common';
import { createLogService, type LogService } from '@mosaic/log';
import type { Db } from '@mosaic/db';
import { DB } from '../database/database.module.js';
import { LOG_SERVICE } from './log.tokens.js';
import { LogController } from './log.controller.js';
@Global()
@Module({
providers: [
{
provide: LOG_SERVICE,
useFactory: (db: Db): LogService => createLogService(db),
inject: [DB],
},
],
controllers: [LogController],
exports: [LOG_SERVICE],
})
export class LogModule {}

View File

@@ -0,0 +1 @@
export const LOG_SERVICE = 'LOG_SERVICE';

View File

@@ -39,7 +39,7 @@
| P3-008 | done | Phase 3 | Verify Phase 3 — web dashboard functional E2E | — | #33 |
| P4-001 | in-progress | Phase 4 | @mosaic/memory — preference + insight stores | — | #34 |
| P4-002 | in-progress | Phase 4 | Semantic search — pgvector embeddings + search API | — | #35 |
| P4-003 | not-started | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 |
| P4-003 | in-progress | Phase 4 | @mosaic/log — log ingest, parsing, tiered storage | — | #36 |
| P4-004 | not-started | Phase 4 | Summarization pipeline — Haiku-tier LLM + cron | — | #37 |
| P4-005 | not-started | Phase 4 | Memory integration — inject into agent sessions | — | #38 |
| P4-006 | not-started | Phase 4 | Skill management — catalog, install, config | — | #39 |

View File

@@ -1,6 +1,7 @@
{
"name": "@mosaic/log",
"version": "0.0.0",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
@@ -15,6 +16,10 @@
"typecheck": "tsc --noEmit",
"test": "vitest run --passWithNoTests"
},
"dependencies": {
"@mosaic/db": "workspace:*",
"drizzle-orm": "^0.45.1"
},
"devDependencies": {
"typescript": "^5.8.0",
"vitest": "^2.0.0"

View File

@@ -0,0 +1,117 @@
import { eq, and, desc, lt, sql, type Db, agentLogs } from '@mosaic/db';
export type AgentLog = typeof agentLogs.$inferSelect;
export type NewAgentLog = typeof agentLogs.$inferInsert;
export type LogLevel = 'debug' | 'info' | 'warn' | 'error';
export type LogCategory = 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
export type LogTier = 'hot' | 'warm' | 'cold';
export interface LogQuery {
userId?: string;
sessionId?: string;
level?: LogLevel;
category?: LogCategory;
tier?: LogTier;
since?: Date;
until?: Date;
limit?: number;
offset?: number;
}
export function createAgentLogsRepo(db: Db) {
return {
async ingest(entry: NewAgentLog): Promise<AgentLog> {
const rows = await db.insert(agentLogs).values(entry).returning();
return rows[0]!;
},
async ingestBatch(entries: NewAgentLog[]): Promise<AgentLog[]> {
if (entries.length === 0) return [];
return db.insert(agentLogs).values(entries).returning();
},
async query(params: LogQuery): Promise<AgentLog[]> {
const conditions = [];
if (params.userId) conditions.push(eq(agentLogs.userId, params.userId));
if (params.sessionId) conditions.push(eq(agentLogs.sessionId, params.sessionId));
if (params.level) conditions.push(eq(agentLogs.level, params.level));
if (params.category) conditions.push(eq(agentLogs.category, params.category));
if (params.tier) conditions.push(eq(agentLogs.tier, params.tier));
if (params.since) conditions.push(sql`${agentLogs.createdAt} >= ${params.since}`);
if (params.until) conditions.push(sql`${agentLogs.createdAt} <= ${params.until}`);
const where = conditions.length > 0 ? and(...conditions) : undefined;
return db
.select()
.from(agentLogs)
.where(where)
.orderBy(desc(agentLogs.createdAt))
.limit(params.limit ?? 100)
.offset(params.offset ?? 0);
},
async findById(id: string): Promise<AgentLog | undefined> {
const rows = await db.select().from(agentLogs).where(eq(agentLogs.id, id));
return rows[0];
},
/**
* Transition hot logs older than the cutoff to warm tier.
* Returns the number of logs transitioned.
*/
async promoteToWarm(olderThan: Date): Promise<number> {
const result = await db
.update(agentLogs)
.set({ tier: 'warm', summarizedAt: new Date() })
.where(and(eq(agentLogs.tier, 'hot'), lt(agentLogs.createdAt, olderThan)))
.returning();
return result.length;
},
/**
* Transition warm logs older than the cutoff to cold tier.
*/
async promoteToCold(olderThan: Date): Promise<number> {
const result = await db
.update(agentLogs)
.set({ tier: 'cold', archivedAt: new Date() })
.where(and(eq(agentLogs.tier, 'warm'), lt(agentLogs.createdAt, olderThan)))
.returning();
return result.length;
},
/**
* Delete cold logs older than the retention period.
*/
async purge(olderThan: Date): Promise<number> {
const result = await db
.delete(agentLogs)
.where(and(eq(agentLogs.tier, 'cold'), lt(agentLogs.createdAt, olderThan)))
.returning();
return result.length;
},
/**
* Get hot logs ready for summarization (decisions + learnings).
*/
async getLogsForSummarization(olderThan: Date, limit = 100): Promise<AgentLog[]> {
return db
.select()
.from(agentLogs)
.where(
and(
eq(agentLogs.tier, 'hot'),
lt(agentLogs.createdAt, olderThan),
sql`${agentLogs.category} IN ('decision', 'learning', 'tool_use')`,
),
)
.orderBy(agentLogs.createdAt)
.limit(limit);
},
};
}
export type AgentLogsRepo = ReturnType<typeof createAgentLogsRepo>;

View File

@@ -1 +1,11 @@
export const VERSION = '0.0.0';
export { createLogService, type LogService } from './log-service.js';
export {
createAgentLogsRepo,
type AgentLogsRepo,
type AgentLog,
type NewAgentLog,
type LogLevel,
type LogCategory,
type LogTier,
type LogQuery,
} from './agent-logs.js';

View File

@@ -0,0 +1,12 @@
import type { Db } from '@mosaic/db';
import { createAgentLogsRepo, type AgentLogsRepo } from './agent-logs.js';
export interface LogService {
logs: AgentLogsRepo;
}
export function createLogService(db: Db): LogService {
return {
logs: createAgentLogsRepo(db),
};
}

10
pnpm-lock.yaml generated
View File

@@ -59,6 +59,9 @@ importers:
'@mosaic/db':
specifier: workspace:^
version: link:../../packages/db
'@mosaic/log':
specifier: workspace:^
version: link:../../packages/log
'@mosaic/memory':
specifier: workspace:^
version: link:../../packages/memory
@@ -324,6 +327,13 @@ importers:
version: 2.1.9(@types/node@22.19.15)(lightningcss@1.31.1)
packages/log:
dependencies:
'@mosaic/db':
specifier: workspace:*
version: link:../db
drizzle-orm:
specifier: ^0.45.1
version: 0.45.1(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(kysely@0.28.11)(postgres@3.4.8)
devDependencies:
typescript:
specifier: ^5.8.0