Implement AgentLogsRepo with structured log ingest (single + batch), flexible query builder (filter by session, level, category, tier, date range), and tiered storage management (hot→warm→cold→purge). Add getLogsForSummarization() for the summarization pipeline. Wire LogModule into gateway with REST endpoints at /api/logs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
118 lines
3.8 KiB
TypeScript
118 lines
3.8 KiB
TypeScript
import { eq, and, desc, lt, sql, type Db, agentLogs } from '@mosaic/db';
|
|
|
|
export type AgentLog = typeof agentLogs.$inferSelect;
|
|
export type NewAgentLog = typeof agentLogs.$inferInsert;
|
|
|
|
export type LogLevel = 'debug' | 'info' | 'warn' | 'error';
|
|
export type LogCategory = 'decision' | 'tool_use' | 'learning' | 'error' | 'general';
|
|
export type LogTier = 'hot' | 'warm' | 'cold';
|
|
|
|
export interface LogQuery {
|
|
userId?: string;
|
|
sessionId?: string;
|
|
level?: LogLevel;
|
|
category?: LogCategory;
|
|
tier?: LogTier;
|
|
since?: Date;
|
|
until?: Date;
|
|
limit?: number;
|
|
offset?: number;
|
|
}
|
|
|
|
export function createAgentLogsRepo(db: Db) {
|
|
return {
|
|
async ingest(entry: NewAgentLog): Promise<AgentLog> {
|
|
const rows = await db.insert(agentLogs).values(entry).returning();
|
|
return rows[0]!;
|
|
},
|
|
|
|
async ingestBatch(entries: NewAgentLog[]): Promise<AgentLog[]> {
|
|
if (entries.length === 0) return [];
|
|
return db.insert(agentLogs).values(entries).returning();
|
|
},
|
|
|
|
async query(params: LogQuery): Promise<AgentLog[]> {
|
|
const conditions = [];
|
|
|
|
if (params.userId) conditions.push(eq(agentLogs.userId, params.userId));
|
|
if (params.sessionId) conditions.push(eq(agentLogs.sessionId, params.sessionId));
|
|
if (params.level) conditions.push(eq(agentLogs.level, params.level));
|
|
if (params.category) conditions.push(eq(agentLogs.category, params.category));
|
|
if (params.tier) conditions.push(eq(agentLogs.tier, params.tier));
|
|
if (params.since) conditions.push(sql`${agentLogs.createdAt} >= ${params.since}`);
|
|
if (params.until) conditions.push(sql`${agentLogs.createdAt} <= ${params.until}`);
|
|
|
|
const where = conditions.length > 0 ? and(...conditions) : undefined;
|
|
|
|
return db
|
|
.select()
|
|
.from(agentLogs)
|
|
.where(where)
|
|
.orderBy(desc(agentLogs.createdAt))
|
|
.limit(params.limit ?? 100)
|
|
.offset(params.offset ?? 0);
|
|
},
|
|
|
|
async findById(id: string): Promise<AgentLog | undefined> {
|
|
const rows = await db.select().from(agentLogs).where(eq(agentLogs.id, id));
|
|
return rows[0];
|
|
},
|
|
|
|
/**
|
|
* Transition hot logs older than the cutoff to warm tier.
|
|
* Returns the number of logs transitioned.
|
|
*/
|
|
async promoteToWarm(olderThan: Date): Promise<number> {
|
|
const result = await db
|
|
.update(agentLogs)
|
|
.set({ tier: 'warm', summarizedAt: new Date() })
|
|
.where(and(eq(agentLogs.tier, 'hot'), lt(agentLogs.createdAt, olderThan)))
|
|
.returning();
|
|
return result.length;
|
|
},
|
|
|
|
/**
|
|
* Transition warm logs older than the cutoff to cold tier.
|
|
*/
|
|
async promoteToCold(olderThan: Date): Promise<number> {
|
|
const result = await db
|
|
.update(agentLogs)
|
|
.set({ tier: 'cold', archivedAt: new Date() })
|
|
.where(and(eq(agentLogs.tier, 'warm'), lt(agentLogs.createdAt, olderThan)))
|
|
.returning();
|
|
return result.length;
|
|
},
|
|
|
|
/**
|
|
* Delete cold logs older than the retention period.
|
|
*/
|
|
async purge(olderThan: Date): Promise<number> {
|
|
const result = await db
|
|
.delete(agentLogs)
|
|
.where(and(eq(agentLogs.tier, 'cold'), lt(agentLogs.createdAt, olderThan)))
|
|
.returning();
|
|
return result.length;
|
|
},
|
|
|
|
/**
|
|
* Get hot logs ready for summarization (decisions + learnings).
|
|
*/
|
|
async getLogsForSummarization(olderThan: Date, limit = 100): Promise<AgentLog[]> {
|
|
return db
|
|
.select()
|
|
.from(agentLogs)
|
|
.where(
|
|
and(
|
|
eq(agentLogs.tier, 'hot'),
|
|
lt(agentLogs.createdAt, olderThan),
|
|
sql`${agentLogs.category} IN ('decision', 'learning', 'tool_use')`,
|
|
),
|
|
)
|
|
.orderBy(agentLogs.createdAt)
|
|
.limit(limit);
|
|
},
|
|
};
|
|
}
|
|
|
|
export type AgentLogsRepo = ReturnType<typeof createAgentLogsRepo>;
|