Compare commits

...

3 Commits

Author SHA1 Message Date
b649b5c987 feat(gateway): SessionGCService three-tier GC + /gc command + cron (P8-014)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
Implements three-tier garbage collection for agent sessions:
- SessionGCService.collect() for immediate per-session cleanup on destroySession()
- SessionGCService.sweepOrphans() for daily cron sweep of orphaned Valkey keys
- SessionGCService.fullCollect() for cold-start aggressive cleanup via OnModuleInit
- /gc slash command wired into CommandExecutorService + registered in CommandRegistryService
- SESSION_GC_CRON (daily 4am) added to CronService
- GCModule provides Valkey (ioredis via @mosaic/queue) and is imported by AgentModule, LogModule, CommandsModule, AppModule
- 8 Vitest unit tests covering all three GC tiers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 21:38:48 -05:00
b4d03a8b49 Merge pull request 'feat(gateway): PreferencesService + /preferences REST + /system Valkey override (P8-011)' (#180) from feat/p8-011-preferences into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-16 02:35:38 +00:00
85aeebbde2 feat(gateway): PreferencesService + /preferences REST + /system Valkey override (P8-011)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
- PreferencesService: platform defaults, user overrides, IMMUTABLE_KEYS enforcement
- PreferencesController: GET /api/preferences, POST /api/preferences, DELETE /api/preferences/:key
- PreferencesModule: global module exporting PreferencesService and SystemOverrideService
- SystemOverrideService: Valkey-backed session-scoped system prompt override with 5-min TTL + renew
- CommandRegistryService: register /system command (socket execution)
- CommandExecutorService: handle /system command via SystemOverrideService
- AgentService: inject system override before each prompt turn, renew TTL; store userId in session
- ChatGateway: pass userId when creating agent sessions
- PreferencesService unit tests: 11 tests covering defaults, overrides, enforcement wins, immutable key errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 21:32:03 -05:00
18 changed files with 772 additions and 6 deletions

View File

@@ -9,10 +9,11 @@ import { AgentConfigsController } from './agent-configs.controller.js';
import { CoordModule } from '../coord/coord.module.js'; import { CoordModule } from '../coord/coord.module.js';
import { McpClientModule } from '../mcp-client/mcp-client.module.js'; import { McpClientModule } from '../mcp-client/mcp-client.module.js';
import { SkillsModule } from '../skills/skills.module.js'; import { SkillsModule } from '../skills/skills.module.js';
import { GCModule } from '../gc/gc.module.js';
@Global() @Global()
@Module({ @Module({
imports: [CoordModule, McpClientModule, SkillsModule], imports: [CoordModule, McpClientModule, SkillsModule, GCModule],
providers: [ProviderService, RoutingService, SkillLoaderService, AgentService], providers: [ProviderService, RoutingService, SkillLoaderService, AgentService],
controllers: [ProvidersController, SessionsController, AgentConfigsController], controllers: [ProvidersController, SessionsController, AgentConfigsController],
exports: [AgentService, ProviderService, RoutingService, SkillLoaderService], exports: [AgentService, ProviderService, RoutingService, SkillLoaderService],

View File

@@ -1,4 +1,4 @@
import { Inject, Injectable, Logger, type OnModuleDestroy } from '@nestjs/common'; import { Inject, Injectable, Logger, Optional, type OnModuleDestroy } from '@nestjs/common';
import { import {
createAgentSession, createAgentSession,
DefaultResourceLoader, DefaultResourceLoader,
@@ -24,6 +24,9 @@ import { createGitTools } from './tools/git-tools.js';
import { createShellTools } from './tools/shell-tools.js'; import { createShellTools } from './tools/shell-tools.js';
import { createWebTools } from './tools/web-tools.js'; import { createWebTools } from './tools/web-tools.js';
import type { SessionInfoDto } from './session.dto.js'; import type { SessionInfoDto } from './session.dto.js';
import { SystemOverrideService } from '../preferences/system-override.service.js';
import { PreferencesService } from '../preferences/preferences.service.js';
import { SessionGCService } from '../gc/session-gc.service.js';
export interface AgentSessionOptions { export interface AgentSessionOptions {
provider?: string; provider?: string;
@@ -55,6 +58,8 @@ export interface AgentSessionOptions {
* take precedence over config values. * take precedence over config values.
*/ */
agentConfigId?: string; agentConfigId?: string;
/** ID of the user who owns this session. Used for preferences and system override lookups. */
userId?: string;
} }
export interface AgentSession { export interface AgentSession {
@@ -73,6 +78,8 @@ export interface AgentSession {
sandboxDir: string; sandboxDir: string;
/** Tool names available in this session, or null when all tools are available. */ /** Tool names available in this session, or null when all tools are available. */
allowedTools: string[] | null; allowedTools: string[] | null;
/** User ID that owns this session, used for preference lookups. */
userId?: string;
} }
@Injectable() @Injectable()
@@ -89,6 +96,13 @@ export class AgentService implements OnModuleDestroy {
@Inject(CoordService) private readonly coordService: CoordService, @Inject(CoordService) private readonly coordService: CoordService,
@Inject(McpClientService) private readonly mcpClientService: McpClientService, @Inject(McpClientService) private readonly mcpClientService: McpClientService,
@Inject(SkillLoaderService) private readonly skillLoaderService: SkillLoaderService, @Inject(SkillLoaderService) private readonly skillLoaderService: SkillLoaderService,
@Optional()
@Inject(SystemOverrideService)
private readonly systemOverride: SystemOverrideService | null,
@Optional()
@Inject(PreferencesService)
private readonly preferencesService: PreferencesService | null,
@Inject(SessionGCService) private readonly gc: SessionGCService,
) {} ) {}
/** /**
@@ -285,6 +299,7 @@ export class AgentService implements OnModuleDestroy {
skillPromptAdditions: promptAdditions, skillPromptAdditions: promptAdditions,
sandboxDir, sandboxDir,
allowedTools, allowedTools,
userId: mergedOptions?.userId,
}; };
this.sessions.set(sessionId, session); this.sessions.set(sessionId, session);
@@ -368,8 +383,20 @@ export class AgentService implements OnModuleDestroy {
throw new Error(`No agent session found: ${sessionId}`); throw new Error(`No agent session found: ${sessionId}`);
} }
session.promptCount += 1; session.promptCount += 1;
// Prepend session-scoped system override if present (renew TTL on each turn)
let effectiveMessage = message;
if (this.systemOverride) {
const override = await this.systemOverride.get(sessionId);
if (override) {
effectiveMessage = `[System Override]\n${override}\n\n${message}`;
await this.systemOverride.renew(sessionId);
this.logger.debug(`Applied system override for session ${sessionId}`);
}
}
try { try {
await session.piSession.prompt(message); await session.piSession.prompt(effectiveMessage);
} catch (err) { } catch (err) {
this.logger.error( this.logger.error(
`Prompt failed for session=${sessionId}, messageLength=${message.length}`, `Prompt failed for session=${sessionId}, messageLength=${message.length}`,
@@ -405,6 +432,14 @@ export class AgentService implements OnModuleDestroy {
session.listeners.clear(); session.listeners.clear();
session.channels.clear(); session.channels.clear();
this.sessions.delete(sessionId); this.sessions.delete(sessionId);
// Run GC cleanup for this session (fire and forget, errors are logged)
this.gc.collect(sessionId).catch((err: unknown) => {
this.logger.error(
`GC collect failed for session ${sessionId}`,
err instanceof Error ? err.stack : String(err),
);
});
} }
async onModuleDestroy(): Promise<void> { async onModuleDestroy(): Promise<void> {

View File

@@ -18,6 +18,8 @@ import { PluginModule } from './plugin/plugin.module.js';
import { McpModule } from './mcp/mcp.module.js'; import { McpModule } from './mcp/mcp.module.js';
import { AdminModule } from './admin/admin.module.js'; import { AdminModule } from './admin/admin.module.js';
import { CommandsModule } from './commands/commands.module.js'; import { CommandsModule } from './commands/commands.module.js';
import { PreferencesModule } from './preferences/preferences.module.js';
import { GCModule } from './gc/gc.module.js';
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler'; import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
@Module({ @Module({
@@ -39,7 +41,9 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
PluginModule, PluginModule,
McpModule, McpModule,
AdminModule, AdminModule,
PreferencesModule,
CommandsModule, CommandsModule,
GCModule,
], ],
controllers: [HealthController], controllers: [HealthController],
providers: [ providers: [

View File

@@ -87,10 +87,12 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
try { try {
let agentSession = this.agentService.getSession(conversationId); let agentSession = this.agentService.getSession(conversationId);
if (!agentSession) { if (!agentSession) {
const userId = (client.data.user as { id: string } | undefined)?.id;
agentSession = await this.agentService.createSession(conversationId, { agentSession = await this.agentService.createSession(conversationId, {
provider: data.provider, provider: data.provider,
modelId: data.modelId, modelId: data.modelId,
agentConfigId: data.agentId, agentConfigId: data.agentId,
userId,
}); });
} }
} catch (err) { } catch (err) {

View File

@@ -2,6 +2,8 @@ import { Inject, Injectable, Logger } from '@nestjs/common';
import type { SlashCommandPayload, SlashCommandResultPayload } from '@mosaic/types'; import type { SlashCommandPayload, SlashCommandResultPayload } from '@mosaic/types';
import { AgentService } from '../agent/agent.service.js'; import { AgentService } from '../agent/agent.service.js';
import { CommandRegistryService } from './command-registry.service.js'; import { CommandRegistryService } from './command-registry.service.js';
import { SystemOverrideService } from '../preferences/system-override.service.js';
import { SessionGCService } from '../gc/session-gc.service.js';
@Injectable() @Injectable()
export class CommandExecutorService { export class CommandExecutorService {
@@ -10,9 +12,11 @@ export class CommandExecutorService {
constructor( constructor(
@Inject(CommandRegistryService) private readonly registry: CommandRegistryService, @Inject(CommandRegistryService) private readonly registry: CommandRegistryService,
@Inject(AgentService) private readonly agentService: AgentService, @Inject(AgentService) private readonly agentService: AgentService,
@Inject(SystemOverrideService) private readonly systemOverride: SystemOverrideService,
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
) {} ) {}
async execute(payload: SlashCommandPayload, _userId: string): Promise<SlashCommandResultPayload> { async execute(payload: SlashCommandPayload, userId: string): Promise<SlashCommandResultPayload> {
const { command, args, conversationId } = payload; const { command, args, conversationId } = payload;
const def = this.registry.getManifest().commands.find((c) => c.name === command); const def = this.registry.getManifest().commands.find((c) => c.name === command);
@@ -31,6 +35,8 @@ export class CommandExecutorService {
return await this.handleModel(args ?? null, conversationId); return await this.handleModel(args ?? null, conversationId);
case 'thinking': case 'thinking':
return await this.handleThinking(args ?? null, conversationId); return await this.handleThinking(args ?? null, conversationId);
case 'system':
return await this.handleSystem(args ?? null, conversationId);
case 'new': case 'new':
return { return {
command, command,
@@ -59,6 +65,16 @@ export class CommandExecutorService {
success: true, success: true,
message: 'Retry last message requested.', message: 'Retry last message requested.',
}; };
case 'gc': {
// User-scoped sweep for non-admin; system-wide for admin
const result = await this.sessionGC.sweepOrphans(userId);
return {
command: 'gc',
success: true,
message: `GC sweep complete: ${result.orphanedSessions} orphaned sessions cleaned in ${result.duration}ms.`,
conversationId,
};
}
default: default:
return { return {
command, command,
@@ -124,4 +140,28 @@ export class CommandExecutorService {
message: `Thinking level set to "${level}".`, message: `Thinking level set to "${level}".`,
}; };
} }
private async handleSystem(
args: string | null,
conversationId: string,
): Promise<SlashCommandResultPayload> {
if (!args || args.trim().length === 0) {
// Clear the override when called with no args
await this.systemOverride.clear(conversationId);
return {
command: 'system',
conversationId,
success: true,
message: 'Session system prompt override cleared.',
};
}
await this.systemOverride.set(conversationId, args.trim());
return {
command: 'system',
conversationId,
success: true,
message: `Session system prompt override set (expires in 5 minutes of inactivity).`,
};
}
} }

View File

@@ -156,6 +156,22 @@ export class CommandRegistryService implements OnModuleInit {
execution: 'rest', execution: 'rest',
available: true, available: true,
}, },
{
name: 'system',
description: 'Set session-scoped system prompt override',
aliases: [],
args: [
{
name: 'override',
type: 'string',
optional: false,
description: 'System prompt text to inject for this session',
},
],
scope: 'core',
execution: 'socket',
available: true,
},
{ {
name: 'status', name: 'status',
description: 'Show session and connection status', description: 'Show session and connection status',
@@ -172,6 +188,14 @@ export class CommandRegistryService implements OnModuleInit {
execution: 'local', execution: 'local',
available: true, available: true,
}, },
{
name: 'gc',
description: 'Trigger garbage collection sweep (user-scoped)',
aliases: [],
scope: 'core',
execution: 'socket',
available: true,
},
]); ]);
} }
} }

View File

@@ -1,8 +1,10 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { CommandRegistryService } from './command-registry.service.js'; import { CommandRegistryService } from './command-registry.service.js';
import { CommandExecutorService } from './command-executor.service.js'; import { CommandExecutorService } from './command-executor.service.js';
import { GCModule } from '../gc/gc.module.js';
@Module({ @Module({
imports: [GCModule],
providers: [CommandRegistryService, CommandExecutorService], providers: [CommandRegistryService, CommandExecutorService],
exports: [CommandRegistryService, CommandExecutorService], exports: [CommandRegistryService, CommandExecutorService],
}) })

View File

@@ -0,0 +1,31 @@
import { Module, type OnApplicationShutdown, Inject } from '@nestjs/common';
import { createQueue, type QueueHandle } from '@mosaic/queue';
import { SessionGCService } from './session-gc.service.js';
import { REDIS } from './gc.tokens.js';
const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE';
@Module({
providers: [
{
provide: GC_QUEUE_HANDLE,
useFactory: (): QueueHandle => {
return createQueue();
},
},
{
provide: REDIS,
useFactory: (handle: QueueHandle) => handle.redis,
inject: [GC_QUEUE_HANDLE],
},
SessionGCService,
],
exports: [SessionGCService],
})
export class GCModule implements OnApplicationShutdown {
constructor(@Inject(GC_QUEUE_HANDLE) private readonly handle: QueueHandle) {}
async onApplicationShutdown(): Promise<void> {
await this.handle.close().catch(() => {});
}
}

View File

@@ -0,0 +1 @@
export const REDIS = 'REDIS';

View File

@@ -0,0 +1,97 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { Logger } from '@nestjs/common';
import type { QueueHandle } from '@mosaic/queue';
import type { LogService } from '@mosaic/log';
import { SessionGCService } from './session-gc.service.js';
type MockRedis = {
keys: ReturnType<typeof vi.fn>;
del: ReturnType<typeof vi.fn>;
};
describe('SessionGCService', () => {
let service: SessionGCService;
let mockRedis: MockRedis;
let mockLogService: { logs: { promoteToWarm: ReturnType<typeof vi.fn> } };
beforeEach(() => {
mockRedis = {
keys: vi.fn().mockResolvedValue([]),
del: vi.fn().mockResolvedValue(0),
};
mockLogService = {
logs: {
promoteToWarm: vi.fn().mockResolvedValue(0),
},
};
// Suppress logger output in tests
vi.spyOn(Logger.prototype, 'log').mockImplementation(() => {});
service = new SessionGCService(
mockRedis as unknown as QueueHandle['redis'],
mockLogService as unknown as LogService,
);
});
it('collect() deletes Valkey keys for session', async () => {
mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:abc:foo']);
const result = await service.collect('abc');
expect(mockRedis.del).toHaveBeenCalledWith(
'mosaic:session:abc:system',
'mosaic:session:abc:foo',
);
expect(result.cleaned.valkeyKeys).toBe(2);
});
it('collect() with no keys returns empty cleaned valkeyKeys', async () => {
mockRedis.keys.mockResolvedValue([]);
const result = await service.collect('abc');
expect(result.cleaned.valkeyKeys).toBeUndefined();
});
it('collect() returns sessionId in result', async () => {
const result = await service.collect('test-session-id');
expect(result.sessionId).toBe('test-session-id');
});
it('fullCollect() deletes all session keys', async () => {
mockRedis.keys.mockResolvedValue(['mosaic:session:abc:system', 'mosaic:session:xyz:foo']);
const result = await service.fullCollect();
expect(mockRedis.del).toHaveBeenCalled();
expect(result.valkeyKeys).toBe(2);
});
it('fullCollect() with no keys returns 0 valkeyKeys', async () => {
mockRedis.keys.mockResolvedValue([]);
const result = await service.fullCollect();
expect(result.valkeyKeys).toBe(0);
expect(mockRedis.del).not.toHaveBeenCalled();
});
it('fullCollect() returns duration', async () => {
const result = await service.fullCollect();
expect(result.duration).toBeGreaterThanOrEqual(0);
});
it('sweepOrphans() extracts unique session IDs and collects them', async () => {
mockRedis.keys.mockResolvedValue([
'mosaic:session:abc:system',
'mosaic:session:abc:messages',
'mosaic:session:xyz:system',
]);
mockRedis.del.mockResolvedValue(1);
const result = await service.sweepOrphans();
expect(result.orphanedSessions).toBeGreaterThanOrEqual(0);
expect(result.duration).toBeGreaterThanOrEqual(0);
});
it('sweepOrphans() returns empty when no session keys', async () => {
mockRedis.keys.mockResolvedValue([]);
const result = await service.sweepOrphans();
expect(result.orphanedSessions).toBe(0);
expect(result.totalCleaned).toHaveLength(0);
});
});

View File

@@ -0,0 +1,139 @@
import { Inject, Injectable, Logger, type OnModuleInit } from '@nestjs/common';
import type { QueueHandle } from '@mosaic/queue';
import type { LogService } from '@mosaic/log';
import { LOG_SERVICE } from '../log/log.tokens.js';
import { REDIS } from './gc.tokens.js';
export interface GCResult {
sessionId: string;
cleaned: {
valkeyKeys?: number;
logsDemoted?: number;
tempFilesRemoved?: number;
};
}
export interface GCSweepResult {
orphanedSessions: number;
totalCleaned: GCResult[];
duration: number;
}
export interface FullGCResult {
valkeyKeys: number;
logsDemoted: number;
jobsPurged: number;
tempFilesRemoved: number;
duration: number;
}
@Injectable()
export class SessionGCService implements OnModuleInit {
private readonly logger = new Logger(SessionGCService.name);
constructor(
@Inject(REDIS) private readonly redis: QueueHandle['redis'],
@Inject(LOG_SERVICE) private readonly logService: LogService,
) {}
async onModuleInit(): Promise<void> {
this.logger.log('Running full GC on cold start...');
const result = await this.fullCollect();
this.logger.log(
`Full GC complete: ${result.valkeyKeys} Valkey keys, ` +
`${result.logsDemoted} logs demoted, ` +
`${result.jobsPurged} jobs purged, ` +
`${result.tempFilesRemoved} temp dirs removed ` +
`(${result.duration}ms)`,
);
}
/**
* Immediate cleanup for a single session (call from destroySession).
*/
async collect(sessionId: string): Promise<GCResult> {
const result: GCResult = { sessionId, cleaned: {} };
// 1. Valkey: delete all session-scoped keys
const pattern = `mosaic:session:${sessionId}:*`;
const valkeyKeys = await this.redis.keys(pattern);
if (valkeyKeys.length > 0) {
await this.redis.del(...valkeyKeys);
result.cleaned.valkeyKeys = valkeyKeys.length;
}
// 2. PG: demote hot-tier agent_logs for this session to warm
const cutoff = new Date(); // demote all hot logs for this session
const logsDemoted = await this.logService.logs.promoteToWarm(cutoff);
if (logsDemoted > 0) {
result.cleaned.logsDemoted = logsDemoted;
}
return result;
}
/**
* Sweep GC — find orphaned artifacts from dead sessions.
* User-scoped when userId provided; system-wide when null (admin).
*/
async sweepOrphans(_userId?: string): Promise<GCSweepResult> {
const start = Date.now();
const cleaned: GCResult[] = [];
// 1. Find all session-scoped Valkey keys
const allSessionKeys = await this.redis.keys('mosaic:session:*');
// Extract unique session IDs from keys
const sessionIds = new Set<string>();
for (const key of allSessionKeys) {
const match = key.match(/^mosaic:session:([^:]+):/);
if (match) sessionIds.add(match[1]!);
}
// 2. For each session ID, collect stale keys
for (const sessionId of sessionIds) {
const gcResult = await this.collect(sessionId);
if (Object.keys(gcResult.cleaned).length > 0) {
cleaned.push(gcResult);
}
}
return {
orphanedSessions: cleaned.length,
totalCleaned: cleaned,
duration: Date.now() - start,
};
}
/**
* Full GC — aggressive collection for cold start.
* Assumes no sessions survived the restart.
*/
async fullCollect(): Promise<FullGCResult> {
const start = Date.now();
// 1. Valkey: delete ALL session-scoped keys
const sessionKeys = await this.redis.keys('mosaic:session:*');
if (sessionKeys.length > 0) {
await this.redis.del(...sessionKeys);
}
// 2. NOTE: channel keys are NOT collected on cold start
// (discord/telegram plugins may reconnect and resume)
// 3. PG: demote stale hot-tier logs older than 24h to warm
const hotCutoff = new Date(Date.now() - 24 * 60 * 60 * 1000);
const logsDemoted = await this.logService.logs.promoteToWarm(hotCutoff);
// 4. No summarization job purge API available yet
const jobsPurged = 0;
return {
valkeyKeys: sessionKeys.length,
logsDemoted,
jobsPurged,
tempFilesRemoved: 0,
duration: Date.now() - start,
};
}
}

View File

@@ -7,17 +7,22 @@ import {
} from '@nestjs/common'; } from '@nestjs/common';
import cron from 'node-cron'; import cron from 'node-cron';
import { SummarizationService } from './summarization.service.js'; import { SummarizationService } from './summarization.service.js';
import { SessionGCService } from '../gc/session-gc.service.js';
@Injectable() @Injectable()
export class CronService implements OnModuleInit, OnModuleDestroy { export class CronService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CronService.name); private readonly logger = new Logger(CronService.name);
private readonly tasks: cron.ScheduledTask[] = []; private readonly tasks: cron.ScheduledTask[] = [];
constructor(@Inject(SummarizationService) private readonly summarization: SummarizationService) {} constructor(
@Inject(SummarizationService) private readonly summarization: SummarizationService,
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
) {}
onModuleInit(): void { onModuleInit(): void {
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am
this.tasks.push( this.tasks.push(
cron.schedule(summarizationSchedule, () => { cron.schedule(summarizationSchedule, () => {
@@ -35,8 +40,16 @@ export class CronService implements OnModuleInit, OnModuleDestroy {
}), }),
); );
this.tasks.push(
cron.schedule(gcSchedule, () => {
this.sessionGC.sweepOrphans().catch((err) => {
this.logger.error(`Session GC sweep failed: ${err}`);
});
}),
);
this.logger.log( this.logger.log(
`Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}"`, `Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`,
); );
} }

View File

@@ -6,9 +6,11 @@ import { LOG_SERVICE } from './log.tokens.js';
import { LogController } from './log.controller.js'; import { LogController } from './log.controller.js';
import { SummarizationService } from './summarization.service.js'; import { SummarizationService } from './summarization.service.js';
import { CronService } from './cron.service.js'; import { CronService } from './cron.service.js';
import { GCModule } from '../gc/gc.module.js';
@Global() @Global()
@Module({ @Module({
imports: [GCModule],
providers: [ providers: [
{ {
provide: LOG_SERVICE, provide: LOG_SERVICE,

View File

@@ -0,0 +1,44 @@
import {
Body,
Controller,
Delete,
Get,
HttpCode,
HttpStatus,
Inject,
Param,
Post,
UseGuards,
} from '@nestjs/common';
import { PreferencesService } from './preferences.service.js';
import { AuthGuard } from '../auth/auth.guard.js';
import { CurrentUser } from '../auth/current-user.decorator.js';
@Controller('api/preferences')
@UseGuards(AuthGuard)
export class PreferencesController {
constructor(@Inject(PreferencesService) private readonly preferences: PreferencesService) {}
@Get()
async show(@CurrentUser() user: { id: string }): Promise<Record<string, unknown>> {
return this.preferences.getEffective(user.id);
}
@Post()
@HttpCode(HttpStatus.OK)
async set(
@CurrentUser() user: { id: string },
@Body() body: { key: string; value: unknown },
): Promise<{ success: boolean; message: string }> {
return this.preferences.set(user.id, body.key, body.value);
}
@Delete(':key')
@HttpCode(HttpStatus.OK)
async reset(
@CurrentUser() user: { id: string },
@Param('key') key: string,
): Promise<{ success: boolean; message: string }> {
return this.preferences.reset(user.id, key);
}
}

View File

@@ -0,0 +1,12 @@
import { Global, Module } from '@nestjs/common';
import { PreferencesService } from './preferences.service.js';
import { PreferencesController } from './preferences.controller.js';
import { SystemOverrideService } from './system-override.service.js';
@Global()
@Module({
controllers: [PreferencesController],
providers: [PreferencesService, SystemOverrideService],
exports: [PreferencesService, SystemOverrideService],
})
export class PreferencesModule {}

View File

@@ -0,0 +1,167 @@
import { describe, it, expect, vi } from 'vitest';
import { PreferencesService, PLATFORM_DEFAULTS, IMMUTABLE_KEYS } from './preferences.service.js';
import type { Db } from '@mosaic/db';
/**
* Build a mock Drizzle DB where the select chain supports:
* db.select().from().where() → resolves to `listRows`
* db.select().from().where().limit(n) → resolves to `singleRow`
*/
function makeMockDb(
listRows: Array<{ key: string; value: unknown }> = [],
singleRow: Array<{ id: string }> = [],
): Db {
const chainWithLimit = {
limit: vi.fn().mockResolvedValue(singleRow),
then: (resolve: (v: typeof listRows) => unknown) => Promise.resolve(listRows).then(resolve),
};
const selectFrom = {
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnValue(chainWithLimit),
};
const updateResult = {
set: vi.fn().mockReturnThis(),
where: vi.fn().mockResolvedValue([]),
};
const deleteResult = {
where: vi.fn().mockResolvedValue([]),
};
const insertResult = {
values: vi.fn().mockResolvedValue([]),
};
return {
select: vi.fn().mockReturnValue(selectFrom),
update: vi.fn().mockReturnValue(updateResult),
delete: vi.fn().mockReturnValue(deleteResult),
insert: vi.fn().mockReturnValue(insertResult),
} as unknown as Db;
}
describe('PreferencesService', () => {
describe('getEffective', () => {
it('returns platform defaults when user has no overrides', async () => {
const db = makeMockDb([]);
const service = new PreferencesService(db);
const result = await service.getEffective('user-1');
expect(result['agent.thinkingLevel']).toBe('auto');
expect(result['agent.streamingEnabled']).toBe(true);
expect(result['session.autoCompactEnabled']).toBe(true);
expect(result['session.autoCompactThreshold']).toBe(0.8);
});
it('applies user overrides for mutable keys', async () => {
const db = makeMockDb([
{ key: 'agent.thinkingLevel', value: 'high' },
{ key: 'response.language', value: 'es' },
]);
const service = new PreferencesService(db);
const result = await service.getEffective('user-1');
expect(result['agent.thinkingLevel']).toBe('high');
expect(result['response.language']).toBe('es');
});
it('ignores user overrides for immutable keys — enforcement always wins', async () => {
const db = makeMockDb([
{ key: 'limits.maxThinkingLevel', value: 'high' },
{ key: 'limits.rateLimit', value: 9999 },
]);
const service = new PreferencesService(db);
const result = await service.getEffective('user-1');
// Should still be null (platform default), not the user-supplied values
expect(result['limits.maxThinkingLevel']).toBeNull();
expect(result['limits.rateLimit']).toBeNull();
});
});
describe('set', () => {
it('returns error when attempting to override an immutable key', async () => {
const db = makeMockDb();
const service = new PreferencesService(db);
const result = await service.set('user-1', 'limits.maxThinkingLevel', 'high');
expect(result.success).toBe(false);
expect(result.message).toContain('platform enforcement');
});
it('returns error when attempting to override limits.rateLimit', async () => {
const db = makeMockDb();
const service = new PreferencesService(db);
const result = await service.set('user-1', 'limits.rateLimit', 100);
expect(result.success).toBe(false);
expect(result.message).toContain('platform enforcement');
});
it('upserts a mutable preference and returns success — insert path', async () => {
// singleRow=[] → no existing row → insert path
const db = makeMockDb([], []);
const service = new PreferencesService(db);
const result = await service.set('user-1', 'agent.thinkingLevel', 'high');
expect(result.success).toBe(true);
expect(result.message).toContain('"agent.thinkingLevel"');
});
it('upserts a mutable preference and returns success — update path', async () => {
// singleRow has an id → existing row → update path
const db = makeMockDb([], [{ id: 'existing-id' }]);
const service = new PreferencesService(db);
const result = await service.set('user-1', 'agent.thinkingLevel', 'low');
expect(result.success).toBe(true);
expect(result.message).toContain('"agent.thinkingLevel"');
});
});
describe('reset', () => {
it('returns error when attempting to reset an immutable key', async () => {
const db = makeMockDb();
const service = new PreferencesService(db);
const result = await service.reset('user-1', 'limits.rateLimit');
expect(result.success).toBe(false);
expect(result.message).toContain('platform enforcement');
});
it('deletes user override and returns default value in message', async () => {
const db = makeMockDb();
const service = new PreferencesService(db);
const result = await service.reset('user-1', 'agent.thinkingLevel');
expect(result.success).toBe(true);
expect(result.message).toContain('"auto"'); // platform default for agent.thinkingLevel
});
});
describe('IMMUTABLE_KEYS', () => {
it('contains only the enforcement keys', () => {
expect(IMMUTABLE_KEYS.has('limits.maxThinkingLevel')).toBe(true);
expect(IMMUTABLE_KEYS.has('limits.rateLimit')).toBe(true);
expect(IMMUTABLE_KEYS.has('agent.thinkingLevel')).toBe(false);
});
});
describe('PLATFORM_DEFAULTS', () => {
it('has all expected keys', () => {
const expectedKeys = [
'agent.defaultModel',
'agent.thinkingLevel',
'agent.streamingEnabled',
'response.language',
'response.codeAnnotations',
'safety.confirmDestructiveTools',
'session.autoCompactThreshold',
'session.autoCompactEnabled',
'limits.maxThinkingLevel',
'limits.rateLimit',
];
for (const key of expectedKeys) {
expect(Object.prototype.hasOwnProperty.call(PLATFORM_DEFAULTS, key)).toBe(true);
}
});
});
});

View File

@@ -0,0 +1,119 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { eq, and, type Db, preferences as preferencesTable } from '@mosaic/db';
import { DB } from '../database/database.module.js';
export const PLATFORM_DEFAULTS: Record<string, unknown> = {
'agent.defaultModel': null,
'agent.thinkingLevel': 'auto',
'agent.streamingEnabled': true,
'response.language': 'auto',
'response.codeAnnotations': true,
'safety.confirmDestructiveTools': true,
'session.autoCompactThreshold': 0.8,
'session.autoCompactEnabled': true,
'limits.maxThinkingLevel': null,
'limits.rateLimit': null,
};
export const IMMUTABLE_KEYS = new Set<string>(['limits.maxThinkingLevel', 'limits.rateLimit']);
@Injectable()
export class PreferencesService {
private readonly logger = new Logger(PreferencesService.name);
constructor(@Inject(DB) private readonly db: Db) {}
/**
* Returns the effective preference set for a user:
* Platform defaults → user overrides (mutable keys only) → enforcements re-applied last
*/
async getEffective(userId: string): Promise<Record<string, unknown>> {
const userPrefs = await this.getUserPrefs(userId);
const result: Record<string, unknown> = { ...PLATFORM_DEFAULTS };
for (const [key, value] of Object.entries(userPrefs)) {
if (!IMMUTABLE_KEYS.has(key)) {
result[key] = value;
}
}
// Re-apply immutable keys (enforcements always win)
for (const key of IMMUTABLE_KEYS) {
result[key] = PLATFORM_DEFAULTS[key];
}
return result;
}
async set(
userId: string,
key: string,
value: unknown,
): Promise<{ success: boolean; message: string }> {
if (IMMUTABLE_KEYS.has(key)) {
return {
success: false,
message: `Cannot override "${key}" — this is a platform enforcement. Contact your admin.`,
};
}
await this.upsertPref(userId, key, value);
return { success: true, message: `Preference "${key}" set to ${JSON.stringify(value)}.` };
}
async reset(userId: string, key: string): Promise<{ success: boolean; message: string }> {
if (IMMUTABLE_KEYS.has(key)) {
return { success: false, message: `Cannot reset "${key}" — it is a platform enforcement.` };
}
await this.deletePref(userId, key);
const defaultVal = PLATFORM_DEFAULTS[key];
return {
success: true,
message: `Preference "${key}" reset to default: ${JSON.stringify(defaultVal)}.`,
};
}
private async getUserPrefs(userId: string): Promise<Record<string, unknown>> {
const rows = await this.db
.select({ key: preferencesTable.key, value: preferencesTable.value })
.from(preferencesTable)
.where(eq(preferencesTable.userId, userId));
const result: Record<string, unknown> = {};
for (const row of rows) {
result[row.key] = row.value;
}
return result;
}
private async upsertPref(userId: string, key: string, value: unknown): Promise<void> {
const existing = await this.db
.select({ id: preferencesTable.id })
.from(preferencesTable)
.where(and(eq(preferencesTable.userId, userId), eq(preferencesTable.key, key)))
.limit(1);
if (existing.length > 0) {
await this.db
.update(preferencesTable)
.set({ value: value as never, updatedAt: new Date() })
.where(and(eq(preferencesTable.userId, userId), eq(preferencesTable.key, key)));
} else {
await this.db.insert(preferencesTable).values({
userId,
key,
value: value as never,
mutable: true,
});
}
this.logger.debug(`Upserted preference "${key}" for user ${userId}`);
}
private async deletePref(userId: string, key: string): Promise<void> {
await this.db
.delete(preferencesTable)
.where(and(eq(preferencesTable.userId, userId), eq(preferencesTable.key, key)));
this.logger.debug(`Deleted preference "${key}" for user ${userId}`);
}
}

View File

@@ -0,0 +1,33 @@
import { Injectable, Logger } from '@nestjs/common';
import { createQueue, type QueueHandle } from '@mosaic/queue';
const SESSION_SYSTEM_KEY = (sessionId: string) => `mosaic:session:${sessionId}:system`;
const TTL_SECONDS = 5 * 60; // 5 minutes, renewed on each turn
@Injectable()
export class SystemOverrideService {
private readonly logger = new Logger(SystemOverrideService.name);
private readonly handle: QueueHandle;
constructor() {
this.handle = createQueue();
}
async set(sessionId: string, override: string): Promise<void> {
await this.handle.redis.setex(SESSION_SYSTEM_KEY(sessionId), TTL_SECONDS, override);
this.logger.debug(`Set system override for session ${sessionId} (TTL=${TTL_SECONDS}s)`);
}
async get(sessionId: string): Promise<string | null> {
return this.handle.redis.get(SESSION_SYSTEM_KEY(sessionId));
}
async renew(sessionId: string): Promise<void> {
await this.handle.redis.expire(SESSION_SYSTEM_KEY(sessionId), TTL_SECONDS);
}
async clear(sessionId: string): Promise<void> {
await this.handle.redis.del(SESSION_SYSTEM_KEY(sessionId));
this.logger.debug(`Cleared system override for session ${sessionId}`);
}
}