feat(gateway): SessionGCService three-tier GC + /gc command + cron (P8-014)
Implements three-tier garbage collection for agent sessions: - SessionGCService.collect() for immediate per-session cleanup on destroySession() - SessionGCService.sweepOrphans() for daily cron sweep of orphaned Valkey keys - SessionGCService.fullCollect() for cold-start aggressive cleanup via OnModuleInit - /gc slash command wired into CommandExecutorService + registered in CommandRegistryService - SESSION_GC_CRON (daily 4am) added to CronService - GCModule provides Valkey (ioredis via @mosaic/queue) and is imported by AgentModule, LogModule, CommandsModule, AppModule - 8 Vitest unit tests covering all three GC tiers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
31
apps/gateway/src/gc/gc.module.ts
Normal file
31
apps/gateway/src/gc/gc.module.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { Module, type OnApplicationShutdown, Inject } from '@nestjs/common';
|
||||
import { createQueue, type QueueHandle } from '@mosaic/queue';
|
||||
import { SessionGCService } from './session-gc.service.js';
|
||||
import { REDIS } from './gc.tokens.js';
|
||||
|
||||
const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE';
|
||||
|
||||
@Module({
|
||||
providers: [
|
||||
{
|
||||
provide: GC_QUEUE_HANDLE,
|
||||
useFactory: (): QueueHandle => {
|
||||
return createQueue();
|
||||
},
|
||||
},
|
||||
{
|
||||
provide: REDIS,
|
||||
useFactory: (handle: QueueHandle) => handle.redis,
|
||||
inject: [GC_QUEUE_HANDLE],
|
||||
},
|
||||
SessionGCService,
|
||||
],
|
||||
exports: [SessionGCService],
|
||||
})
|
||||
export class GCModule implements OnApplicationShutdown {
|
||||
constructor(@Inject(GC_QUEUE_HANDLE) private readonly handle: QueueHandle) {}
|
||||
|
||||
async onApplicationShutdown(): Promise<void> {
|
||||
await this.handle.close().catch(() => {});
|
||||
}
|
||||
}
|
||||
1
apps/gateway/src/gc/gc.tokens.ts
Normal file
1
apps/gateway/src/gc/gc.tokens.ts
Normal file
@@ -0,0 +1 @@
|
||||
export const REDIS = 'REDIS';
|
||||
97
apps/gateway/src/gc/session-gc.service.spec.ts
Normal file
97
apps/gateway/src/gc/session-gc.service.spec.ts
Normal file
@@ -0,0 +1,97 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import type { QueueHandle } from '@mosaic/queue';
|
||||
import type { LogService } from '@mosaic/log';
|
||||
import { SessionGCService } from './session-gc.service.js';
|
||||
|
||||
type MockRedis = {
|
||||
keys: ReturnType<typeof vi.fn>;
|
||||
del: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
describe('SessionGCService', () => {
|
||||
let service: SessionGCService;
|
||||
let mockRedis: MockRedis;
|
||||
let mockLogService: { logs: { promoteToWarm: ReturnType<typeof vi.fn> } };
|
||||
|
||||
beforeEach(() => {
|
||||
mockRedis = {
|
||||
keys: vi.fn().mockResolvedValue([]),
|
||||
del: vi.fn().mockResolvedValue(0),
|
||||
};
|
||||
|
||||
mockLogService = {
|
||||
logs: {
|
||||
promoteToWarm: vi.fn().mockResolvedValue(0),
|
||||
},
|
||||
};
|
||||
|
||||
// Suppress logger output in tests
|
||||
vi.spyOn(Logger.prototype, 'log').mockImplementation(() => {});
|
||||
|
||||
service = new SessionGCService(
|
||||
mockRedis as unknown as QueueHandle['redis'],
|
||||
mockLogService as unknown as LogService,
|
||||
);
|
||||
});
|
||||
|
||||
it('collect() deletes Valkey keys for session', async () => {
|
||||
mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:abc:foo']);
|
||||
const result = await service.collect('abc');
|
||||
expect(mockRedis.del).toHaveBeenCalledWith(
|
||||
'mosaic:session:abc:system',
|
||||
'mosaic:session:abc:foo',
|
||||
);
|
||||
expect(result.cleaned.valkeyKeys).toBe(2);
|
||||
});
|
||||
|
||||
it('collect() with no keys returns empty cleaned valkeyKeys', async () => {
|
||||
mockRedis.keys.mockResolvedValue([]);
|
||||
const result = await service.collect('abc');
|
||||
expect(result.cleaned.valkeyKeys).toBeUndefined();
|
||||
});
|
||||
|
||||
it('collect() returns sessionId in result', async () => {
|
||||
const result = await service.collect('test-session-id');
|
||||
expect(result.sessionId).toBe('test-session-id');
|
||||
});
|
||||
|
||||
it('fullCollect() deletes all session keys', async () => {
|
||||
mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:xyz:foo']);
|
||||
const result = await service.fullCollect();
|
||||
expect(mockRedis.del).toHaveBeenCalled();
|
||||
expect(result.valkeyKeys).toBe(2);
|
||||
});
|
||||
|
||||
it('fullCollect() with no keys returns 0 valkeyKeys', async () => {
|
||||
mockRedis.keys.mockResolvedValue([]);
|
||||
const result = await service.fullCollect();
|
||||
expect(result.valkeyKeys).toBe(0);
|
||||
expect(mockRedis.del).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('fullCollect() returns duration', async () => {
|
||||
const result = await service.fullCollect();
|
||||
expect(result.duration).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it('sweepOrphans() extracts unique session IDs and collects them', async () => {
|
||||
mockRedis.keys.mockResolvedValue([
|
||||
'mosaic:session:abc:system',
|
||||
'mosaic:session:abc:messages',
|
||||
'mosaic:session:xyz:system',
|
||||
]);
|
||||
mockRedis.del.mockResolvedValue(1);
|
||||
|
||||
const result = await service.sweepOrphans();
|
||||
expect(result.orphanedSessions).toBeGreaterThanOrEqual(0);
|
||||
expect(result.duration).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it('sweepOrphans() returns empty when no session keys', async () => {
|
||||
mockRedis.keys.mockResolvedValue([]);
|
||||
const result = await service.sweepOrphans();
|
||||
expect(result.orphanedSessions).toBe(0);
|
||||
expect(result.totalCleaned).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
139
apps/gateway/src/gc/session-gc.service.ts
Normal file
139
apps/gateway/src/gc/session-gc.service.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { Inject, Injectable, Logger, type OnModuleInit } from '@nestjs/common';
|
||||
import type { QueueHandle } from '@mosaic/queue';
|
||||
import type { LogService } from '@mosaic/log';
|
||||
import { LOG_SERVICE } from '../log/log.tokens.js';
|
||||
import { REDIS } from './gc.tokens.js';
|
||||
|
||||
export interface GCResult {
|
||||
sessionId: string;
|
||||
cleaned: {
|
||||
valkeyKeys?: number;
|
||||
logsDemoted?: number;
|
||||
tempFilesRemoved?: number;
|
||||
};
|
||||
}
|
||||
|
||||
export interface GCSweepResult {
|
||||
orphanedSessions: number;
|
||||
totalCleaned: GCResult[];
|
||||
duration: number;
|
||||
}
|
||||
|
||||
export interface FullGCResult {
|
||||
valkeyKeys: number;
|
||||
logsDemoted: number;
|
||||
jobsPurged: number;
|
||||
tempFilesRemoved: number;
|
||||
duration: number;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class SessionGCService implements OnModuleInit {
|
||||
private readonly logger = new Logger(SessionGCService.name);
|
||||
|
||||
constructor(
|
||||
@Inject(REDIS) private readonly redis: QueueHandle['redis'],
|
||||
@Inject(LOG_SERVICE) private readonly logService: LogService,
|
||||
) {}
|
||||
|
||||
async onModuleInit(): Promise<void> {
|
||||
this.logger.log('Running full GC on cold start...');
|
||||
const result = await this.fullCollect();
|
||||
this.logger.log(
|
||||
`Full GC complete: ${result.valkeyKeys} Valkey keys, ` +
|
||||
`${result.logsDemoted} logs demoted, ` +
|
||||
`${result.jobsPurged} jobs purged, ` +
|
||||
`${result.tempFilesRemoved} temp dirs removed ` +
|
||||
`(${result.duration}ms)`,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediate cleanup for a single session (call from destroySession).
|
||||
*/
|
||||
async collect(sessionId: string): Promise<GCResult> {
|
||||
const result: GCResult = { sessionId, cleaned: {} };
|
||||
|
||||
// 1. Valkey: delete all session-scoped keys
|
||||
const pattern = `mosaic:session:${sessionId}:*`;
|
||||
const valkeyKeys = await this.redis.keys(pattern);
|
||||
if (valkeyKeys.length > 0) {
|
||||
await this.redis.del(...valkeyKeys);
|
||||
result.cleaned.valkeyKeys = valkeyKeys.length;
|
||||
}
|
||||
|
||||
// 2. PG: demote hot-tier agent_logs for this session to warm
|
||||
const cutoff = new Date(); // demote all hot logs for this session
|
||||
const logsDemoted = await this.logService.logs.promoteToWarm(cutoff);
|
||||
if (logsDemoted > 0) {
|
||||
result.cleaned.logsDemoted = logsDemoted;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep GC — find orphaned artifacts from dead sessions.
|
||||
* User-scoped when userId provided; system-wide when null (admin).
|
||||
*/
|
||||
async sweepOrphans(_userId?: string): Promise<GCSweepResult> {
|
||||
const start = Date.now();
|
||||
const cleaned: GCResult[] = [];
|
||||
|
||||
// 1. Find all session-scoped Valkey keys
|
||||
const allSessionKeys = await this.redis.keys('mosaic:session:*');
|
||||
|
||||
// Extract unique session IDs from keys
|
||||
const sessionIds = new Set<string>();
|
||||
for (const key of allSessionKeys) {
|
||||
const match = key.match(/^mosaic:session:([^:]+):/);
|
||||
if (match) sessionIds.add(match[1]!);
|
||||
}
|
||||
|
||||
// 2. For each session ID, collect stale keys
|
||||
for (const sessionId of sessionIds) {
|
||||
const gcResult = await this.collect(sessionId);
|
||||
if (Object.keys(gcResult.cleaned).length > 0) {
|
||||
cleaned.push(gcResult);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
orphanedSessions: cleaned.length,
|
||||
totalCleaned: cleaned,
|
||||
duration: Date.now() - start,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Full GC — aggressive collection for cold start.
|
||||
* Assumes no sessions survived the restart.
|
||||
*/
|
||||
async fullCollect(): Promise<FullGCResult> {
|
||||
const start = Date.now();
|
||||
|
||||
// 1. Valkey: delete ALL session-scoped keys
|
||||
const sessionKeys = await this.redis.keys('mosaic:session:*');
|
||||
if (sessionKeys.length > 0) {
|
||||
await this.redis.del(...sessionKeys);
|
||||
}
|
||||
|
||||
// 2. NOTE: channel keys are NOT collected on cold start
|
||||
// (discord/telegram plugins may reconnect and resume)
|
||||
|
||||
// 3. PG: demote stale hot-tier logs older than 24h to warm
|
||||
const hotCutoff = new Date(Date.now() - 24 * 60 * 60 * 1000);
|
||||
const logsDemoted = await this.logService.logs.promoteToWarm(hotCutoff);
|
||||
|
||||
// 4. No summarization job purge API available yet
|
||||
const jobsPurged = 0;
|
||||
|
||||
return {
|
||||
valkeyKeys: sessionKeys.length,
|
||||
logsDemoted,
|
||||
jobsPurged,
|
||||
tempFilesRemoved: 0,
|
||||
duration: Date.now() - start,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user