diff --git a/apps/gateway/src/commands/command-executor.service.ts b/apps/gateway/src/commands/command-executor.service.ts index 3714cf4..6132df9 100644 --- a/apps/gateway/src/commands/command-executor.service.ts +++ b/apps/gateway/src/commands/command-executor.service.ts @@ -77,8 +77,8 @@ export class CommandExecutorService { message: 'Retry last message requested.', }; case 'gc': { - // User-scoped sweep for non-admin; system-wide for admin - const result = await this.sessionGC.sweepOrphans(userId); + // Admin-only: system-wide GC sweep across all sessions + const result = await this.sessionGC.sweepOrphans(); return { command: 'gc', success: true, diff --git a/apps/gateway/src/commands/command-registry.service.ts b/apps/gateway/src/commands/command-registry.service.ts index 7aba9ba..cb05257 100644 --- a/apps/gateway/src/commands/command-registry.service.ts +++ b/apps/gateway/src/commands/command-registry.service.ts @@ -190,9 +190,9 @@ export class CommandRegistryService implements OnModuleInit { }, { name: 'gc', - description: 'Trigger garbage collection sweep (user-scoped)', + description: 'Trigger garbage collection sweep (admin only — system-wide)', aliases: [], - scope: 'core', + scope: 'admin', execution: 'socket', available: true, }, diff --git a/apps/gateway/src/commands/commands.integration.spec.ts b/apps/gateway/src/commands/commands.integration.spec.ts index eb69fba..9c040b8 100644 --- a/apps/gateway/src/commands/commands.integration.spec.ts +++ b/apps/gateway/src/commands/commands.integration.spec.ts @@ -166,11 +166,11 @@ describe('CommandExecutorService — integration', () => { expect(result.command).toBe('nonexistent'); }); - // /gc handler calls SessionGCService.sweepOrphans - it('/gc calls SessionGCService.sweepOrphans with userId', async () => { + // /gc handler calls SessionGCService.sweepOrphans (admin-only, no userId arg) + it('/gc calls SessionGCService.sweepOrphans without arguments', async () => { const payload: SlashCommandPayload = { command: 'gc', conversationId }; const result = await executor.execute(payload, userId); - expect(mockSessionGC.sweepOrphans).toHaveBeenCalledWith(userId); + expect(mockSessionGC.sweepOrphans).toHaveBeenCalledWith(); expect(result.success).toBe(true); expect(result.message).toContain('GC sweep complete'); expect(result.message).toContain('3 orphaned sessions'); diff --git a/apps/gateway/src/gc/session-gc.service.spec.ts b/apps/gateway/src/gc/session-gc.service.spec.ts index 9bd5754..88e29e4 100644 --- a/apps/gateway/src/gc/session-gc.service.spec.ts +++ b/apps/gateway/src/gc/session-gc.service.spec.ts @@ -5,7 +5,7 @@ import type { LogService } from '@mosaic/log'; import { SessionGCService } from './session-gc.service.js'; type MockRedis = { - keys: ReturnType; + scan: ReturnType; del: ReturnType; }; @@ -14,9 +14,17 @@ describe('SessionGCService', () => { let mockRedis: MockRedis; let mockLogService: { logs: { promoteToWarm: ReturnType } }; + /** + * Helper: build a scan mock that returns all provided keys in a single + * cursor iteration (cursor '0' in → ['0', keys] out). + */ + function makeScanMock(keys: string[]): ReturnType { + return vi.fn().mockResolvedValue(['0', keys]); + } + beforeEach(() => { mockRedis = { - keys: vi.fn().mockResolvedValue([]), + scan: makeScanMock([]), del: vi.fn().mockResolvedValue(0), }; @@ -36,7 +44,7 @@ describe('SessionGCService', () => { }); it('collect() deletes Valkey keys for session', async () => { - mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:abc:foo']); + mockRedis.scan = makeScanMock(['mosaic:session:abc:system', 'mosaic:session:abc:foo']); const result = await service.collect('abc'); expect(mockRedis.del).toHaveBeenCalledWith( 'mosaic:session:abc:system', @@ -46,7 +54,7 @@ describe('SessionGCService', () => { }); it('collect() with no keys returns empty cleaned valkeyKeys', async () => { - mockRedis.keys.mockResolvedValue([]); + mockRedis.scan = makeScanMock([]); const result = await service.collect('abc'); expect(result.cleaned.valkeyKeys).toBeUndefined(); }); @@ -57,14 +65,14 @@ describe('SessionGCService', () => { }); it('fullCollect() deletes all session keys', async () => { - mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:xyz:foo']); + mockRedis.scan = makeScanMock(['mosaic:session:abc:system', 'mosaic:session:xyz:foo']); const result = await service.fullCollect(); expect(mockRedis.del).toHaveBeenCalled(); expect(result.valkeyKeys).toBe(2); }); it('fullCollect() with no keys returns 0 valkeyKeys', async () => { - mockRedis.keys.mockResolvedValue([]); + mockRedis.scan = makeScanMock([]); const result = await service.fullCollect(); expect(result.valkeyKeys).toBe(0); expect(mockRedis.del).not.toHaveBeenCalled(); @@ -76,11 +84,18 @@ describe('SessionGCService', () => { }); it('sweepOrphans() extracts unique session IDs and collects them', async () => { - mockRedis.keys.mockResolvedValue([ - 'mosaic:session:abc:system', - 'mosaic:session:abc:messages', - 'mosaic:session:xyz:system', - ]); + // First scan call returns the global session list; subsequent calls return + // per-session keys during collect(). + mockRedis.scan = vi + .fn() + .mockResolvedValueOnce([ + '0', + ['mosaic:session:abc:system', 'mosaic:session:abc:messages', 'mosaic:session:xyz:system'], + ]) + // collect('abc') scan + .mockResolvedValueOnce(['0', ['mosaic:session:abc:system', 'mosaic:session:abc:messages']]) + // collect('xyz') scan + .mockResolvedValueOnce(['0', ['mosaic:session:xyz:system']]); mockRedis.del.mockResolvedValue(1); const result = await service.sweepOrphans(); @@ -89,7 +104,7 @@ describe('SessionGCService', () => { }); it('sweepOrphans() returns empty when no session keys', async () => { - mockRedis.keys.mockResolvedValue([]); + mockRedis.scan = makeScanMock([]); const result = await service.sweepOrphans(); expect(result.orphanedSessions).toBe(0); expect(result.totalCleaned).toHaveLength(0); diff --git a/apps/gateway/src/gc/session-gc.service.ts b/apps/gateway/src/gc/session-gc.service.ts index ee25b7b..67309b8 100644 --- a/apps/gateway/src/gc/session-gc.service.ts +++ b/apps/gateway/src/gc/session-gc.service.ts @@ -56,6 +56,22 @@ export class SessionGCService implements OnModuleInit { }); } + /** + * Scan Valkey for all keys matching a pattern using SCAN (non-blocking). + * KEYS is avoided because it blocks the Valkey event loop for the full scan + * duration, which can cause latency spikes under production key volumes. + */ + private async scanKeys(pattern: string): Promise { + const collected: string[] = []; + let cursor = '0'; + do { + const [nextCursor, keys] = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100); + cursor = nextCursor; + collected.push(...keys); + } while (cursor !== '0'); + return collected; + } + /** * Immediate cleanup for a single session (call from destroySession). */ @@ -64,7 +80,7 @@ export class SessionGCService implements OnModuleInit { // 1. Valkey: delete all session-scoped keys const pattern = `mosaic:session:${sessionId}:*`; - const valkeyKeys = await this.redis.keys(pattern); + const valkeyKeys = await this.scanKeys(pattern); if (valkeyKeys.length > 0) { await this.redis.del(...valkeyKeys); result.cleaned.valkeyKeys = valkeyKeys.length; @@ -82,14 +98,15 @@ export class SessionGCService implements OnModuleInit { /** * Sweep GC — find orphaned artifacts from dead sessions. - * User-scoped when userId provided; system-wide when null (admin). + * System-wide operation: only call from admin-authorized paths or internal + * scheduled jobs. Individual session cleanup is handled by collect(). */ - async sweepOrphans(_userId?: string): Promise { + async sweepOrphans(): Promise { const start = Date.now(); const cleaned: GCResult[] = []; - // 1. Find all session-scoped Valkey keys - const allSessionKeys = await this.redis.keys('mosaic:session:*'); + // 1. Find all session-scoped Valkey keys (non-blocking SCAN) + const allSessionKeys = await this.scanKeys('mosaic:session:*'); // Extract unique session IDs from keys const sessionIds = new Set(); @@ -120,8 +137,8 @@ export class SessionGCService implements OnModuleInit { async fullCollect(): Promise { const start = Date.now(); - // 1. Valkey: delete ALL session-scoped keys - const sessionKeys = await this.redis.keys('mosaic:session:*'); + // 1. Valkey: delete ALL session-scoped keys (non-blocking SCAN) + const sessionKeys = await this.scanKeys('mosaic:session:*'); if (sessionKeys.length > 0) { await this.redis.del(...sessionKeys); }