import { Inject, Injectable, Logger, Optional, type OnModuleDestroy } from '@nestjs/common'; import { createAgentSession, DefaultResourceLoader, SessionManager, type AgentSession as PiAgentSession, type AgentSessionEvent, type ToolDefinition, } from '@mariozechner/pi-coding-agent'; import type { Brain } from '@mosaic/brain'; import type { Memory } from '@mosaic/memory'; import { BRAIN } from '../brain/brain.tokens.js'; import { MEMORY } from '../memory/memory.tokens.js'; import { EmbeddingService } from '../memory/embedding.service.js'; import { CoordService } from '../coord/coord.service.js'; import { ProviderService } from './provider.service.js'; import { McpClientService } from '../mcp-client/mcp-client.service.js'; import { SkillLoaderService } from './skill-loader.service.js'; import { createBrainTools } from './tools/brain-tools.js'; import { createCoordTools } from './tools/coord-tools.js'; import { createMemoryTools } from './tools/memory-tools.js'; import { createFileTools } from './tools/file-tools.js'; import { createGitTools } from './tools/git-tools.js'; import { createShellTools } from './tools/shell-tools.js'; import { createWebTools } from './tools/web-tools.js'; import { createSearchTools } from './tools/search-tools.js'; import type { SessionInfoDto, SessionMetrics } from './session.dto.js'; import { SystemOverrideService } from '../preferences/system-override.service.js'; import { PreferencesService } from '../preferences/preferences.service.js'; import { SessionGCService } from '../gc/session-gc.service.js'; /** A single message from DB conversation history, used for context injection. */ export interface ConversationHistoryMessage { role: 'user' | 'assistant' | 'system'; content: string; createdAt: Date; } export interface AgentSessionOptions { provider?: string; modelId?: string; /** * Sandbox working directory for the session. * File, git, and shell tools will be restricted to this directory. * Falls back to AGENT_FILE_SANDBOX_DIR env var or process.cwd(). */ sandboxDir?: string; /** * Platform-level system prompt for this session. * Merged with skill prompt additions (platform prompt first, then skills). * Falls back to AGENT_SYSTEM_PROMPT env var when omitted. */ systemPrompt?: string; /** * Explicit allowlist of tool names available in this session. * When set, only listed tools are registered with the agent. * When omitted for non-admin users, falls back to AGENT_USER_TOOLS env var. * Admins (isAdmin=true) always receive the full tool set unless explicitly restricted. */ allowedTools?: string[]; /** Whether the requesting user has admin privileges. Controls default tool access. */ isAdmin?: boolean; /** * DB agent config ID. When provided, loads agent config from DB and merges * provider, model, systemPrompt, and allowedTools. Explicit call-site options * take precedence over config values. */ agentConfigId?: string; /** ID of the user who owns this session. Used for preferences and system override lookups. */ userId?: string; /** * Prior conversation messages to inject as context when resuming a session. * These messages are formatted and prepended to the system prompt so the * agent is aware of what was discussed in previous sessions. */ conversationHistory?: ConversationHistoryMessage[]; } export interface AgentSession { id: string; provider: string; modelId: string; piSession: PiAgentSession; listeners: Set<(event: AgentSessionEvent) => void>; unsubscribe: () => void; createdAt: number; promptCount: number; channels: Set; /** System prompt additions injected from enabled prompt-type skills. */ skillPromptAdditions: string[]; /** Resolved sandbox directory for this session. */ sandboxDir: string; /** Tool names available in this session, or null when all tools are available. */ allowedTools: string[] | null; /** User ID that owns this session, used for preference lookups. */ userId?: string; /** Agent config ID applied to this session, if any (M5-001). */ agentConfigId?: string; /** Human-readable agent name applied to this session, if any (M5-001). */ agentName?: string; /** M5-007: per-session metrics. */ metrics: SessionMetrics; } @Injectable() export class AgentService implements OnModuleDestroy { private readonly logger = new Logger(AgentService.name); private readonly sessions = new Map(); private readonly creating = new Map>(); constructor( @Inject(ProviderService) private readonly providerService: ProviderService, @Inject(BRAIN) private readonly brain: Brain, @Inject(MEMORY) private readonly memory: Memory, @Inject(EmbeddingService) private readonly embeddingService: EmbeddingService, @Inject(CoordService) private readonly coordService: CoordService, @Inject(McpClientService) private readonly mcpClientService: McpClientService, @Inject(SkillLoaderService) private readonly skillLoaderService: SkillLoaderService, @Optional() @Inject(SystemOverrideService) private readonly systemOverride: SystemOverrideService | null, @Optional() @Inject(PreferencesService) private readonly preferencesService: PreferencesService | null, @Inject(SessionGCService) private readonly gc: SessionGCService, ) {} /** * Build the full set of custom tools scoped to the given sandbox directory and session user. * Brain/coord/memory/web tools are stateless with respect to cwd; file/git/shell * tools receive the resolved sandboxDir so they operate within the sandbox. * Memory tools are bound to sessionUserId so the LLM cannot access another user's data. */ private buildToolsForSandbox( sandboxDir: string, sessionUserId: string | undefined, ): ToolDefinition[] { return [ ...createBrainTools(this.brain), ...createCoordTools(this.coordService), ...createMemoryTools( this.memory, this.embeddingService.available ? this.embeddingService : null, sessionUserId, ), ...createFileTools(sandboxDir), ...createGitTools(sandboxDir), ...createShellTools(sandboxDir), ...createWebTools(), ...createSearchTools(), ]; } /** * Resolve the tool allowlist for a session. * - Admin users: all tools unless an explicit allowedTools list is passed. * - Regular users: use allowedTools if provided, otherwise parse AGENT_USER_TOOLS env var. * Returns null when all tools should be available. */ private resolveAllowedTools(isAdmin: boolean, allowedTools?: string[]): string[] | null { if (allowedTools !== undefined) { return allowedTools.length === 0 ? [] : allowedTools; } if (isAdmin) { return null; // admins get everything } const envTools = process.env['AGENT_USER_TOOLS']; if (!envTools) { return null; // no restriction configured } return envTools .split(',') .map((t) => t.trim()) .filter((t) => t.length > 0); } async createSession(sessionId: string, options?: AgentSessionOptions): Promise { const existing = this.sessions.get(sessionId); if (existing) return existing; const inflight = this.creating.get(sessionId); if (inflight) return inflight; const promise = this.doCreateSession(sessionId, options).finally(() => { this.creating.delete(sessionId); }); this.creating.set(sessionId, promise); return promise; } private async doCreateSession( sessionId: string, options?: AgentSessionOptions, ): Promise { // Merge DB agent config when agentConfigId is provided (M5-001) let mergedOptions = options; let resolvedAgentName: string | undefined; if (options?.agentConfigId) { const agentConfig = await this.brain.agents.findById(options.agentConfigId); if (agentConfig) { resolvedAgentName = agentConfig.name; mergedOptions = { provider: options.provider ?? agentConfig.provider, modelId: options.modelId ?? agentConfig.model, systemPrompt: options.systemPrompt ?? agentConfig.systemPrompt ?? undefined, allowedTools: options.allowedTools ?? agentConfig.allowedTools ?? undefined, sandboxDir: options.sandboxDir, isAdmin: options.isAdmin, agentConfigId: options.agentConfigId, userId: options.userId, conversationHistory: options.conversationHistory, }; this.logger.log( `Merged agent config "${agentConfig.name}" (${agentConfig.id}) into session ${sessionId}`, ); } } const model = this.resolveModel(mergedOptions); const providerName = model?.provider ?? 'default'; const modelId = model?.id ?? 'default'; // Resolve sandbox directory: option > env var > process.cwd() const sandboxDir = mergedOptions?.sandboxDir ?? process.env['AGENT_FILE_SANDBOX_DIR'] ?? process.cwd(); // Resolve allowed tool set const allowedTools = this.resolveAllowedTools( mergedOptions?.isAdmin ?? false, mergedOptions?.allowedTools, ); this.logger.log( `Creating agent session: ${sessionId} (provider=${providerName}, model=${modelId}, sandbox=${sandboxDir}, tools=${allowedTools === null ? 'all' : allowedTools.join(',') || 'none'})`, ); // Load skill tools from the catalog const { metaTools: skillMetaTools, promptAdditions } = await this.skillLoaderService.loadForSession(); if (skillMetaTools.length > 0) { this.logger.log(`Attaching ${skillMetaTools.length} skill tool(s) to session ${sessionId}`); } if (promptAdditions.length > 0) { this.logger.log( `Injecting ${promptAdditions.length} skill prompt addition(s) into session ${sessionId}`, ); } // Build per-session tools scoped to the sandbox directory and authenticated user const sandboxTools = this.buildToolsForSandbox(sandboxDir, mergedOptions?.userId); // Combine static tools with dynamically discovered MCP client tools and skill tools const mcpTools = this.mcpClientService.getToolDefinitions(); let allCustomTools = [...sandboxTools, ...skillMetaTools, ...mcpTools]; if (mcpTools.length > 0) { this.logger.log(`Attaching ${mcpTools.length} MCP client tool(s) to session ${sessionId}`); } // Filter tools by allowlist when a restriction is in effect if (allowedTools !== null) { const allowedSet = new Set(allowedTools); const before = allCustomTools.length; allCustomTools = allCustomTools.filter((t) => allowedSet.has(t.name)); this.logger.log( `Tool restriction applied: ${allCustomTools.length}/${before} tools allowed for session ${sessionId}`, ); } // Build system prompt: platform prompt + skill additions appended const platformPrompt = mergedOptions?.systemPrompt ?? process.env['AGENT_SYSTEM_PROMPT'] ?? undefined; // Format conversation history for context injection (M1-004 / M1-005) const historyPromptSection = mergedOptions?.conversationHistory?.length ? this.buildHistoryPromptSection( mergedOptions.conversationHistory, model?.contextWindow ?? 8192, sessionId, ) : undefined; const appendParts: string[] = []; if (promptAdditions.length > 0) appendParts.push(promptAdditions.join('\n\n')); if (historyPromptSection) appendParts.push(historyPromptSection); const appendSystemPrompt = appendParts.length > 0 ? appendParts.join('\n\n') : undefined; // Construct a resource loader that injects the configured system prompt const resourceLoader = new DefaultResourceLoader({ cwd: sandboxDir, noExtensions: true, noSkills: true, noPromptTemplates: true, noThemes: true, systemPrompt: platformPrompt, appendSystemPrompt: appendSystemPrompt, }); await resourceLoader.reload(); let piSession: PiAgentSession; try { const result = await createAgentSession({ sessionManager: SessionManager.inMemory(), modelRegistry: this.providerService.getRegistry(), model: model ?? undefined, cwd: sandboxDir, tools: [], customTools: allCustomTools, resourceLoader, }); piSession = result.session; } catch (err) { this.logger.error( `Failed to create agent session for ${sessionId}`, err instanceof Error ? err.stack : String(err), ); throw new Error(`Agent session creation failed for ${sessionId}: ${String(err)}`); } const listeners = new Set<(event: AgentSessionEvent) => void>(); const unsubscribe = piSession.subscribe((event) => { for (const listener of listeners) { try { listener(event); } catch (err) { this.logger.error(`Event listener error in session ${sessionId}`, err); } } }); const session: AgentSession = { id: sessionId, provider: providerName, modelId, piSession, listeners, unsubscribe, createdAt: Date.now(), promptCount: 0, channels: new Set(), skillPromptAdditions: promptAdditions, sandboxDir, allowedTools, userId: mergedOptions?.userId, agentConfigId: mergedOptions?.agentConfigId, agentName: resolvedAgentName, metrics: { tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, modelSwitches: 0, messageCount: 0, lastActivityAt: new Date().toISOString(), }, }; this.sessions.set(sessionId, session); this.logger.log(`Agent session ${sessionId} ready (${providerName}/${modelId})`); if (resolvedAgentName) { this.logger.log( `Agent session ${sessionId} using agent config "${resolvedAgentName}" (M5-001)`, ); } return session; } /** * Estimate token count for a string using a rough 4-chars-per-token heuristic. */ private estimateTokens(text: string): number { return Math.ceil(text.length / 4); } /** * Build a conversation history section for injection into the system prompt. * Implements M1-004 (history loading) and M1-005 (context window management). * * - Formats messages as a readable conversation transcript. * - If the full history exceeds 80% of the model's context window, older messages * are summarized and only the most recent messages are kept verbatim. * - Summarization is a simple extractive approach (no LLM required). */ private buildHistoryPromptSection( history: ConversationHistoryMessage[], contextWindow: number, sessionId: string, ): string { const TOKEN_BUDGET = Math.floor(contextWindow * 0.8); const HISTORY_HEADER = '## Conversation History (resumed session)\n\n'; const formatMessage = (msg: ConversationHistoryMessage): string => { const roleLabel = msg.role === 'user' ? 'User' : msg.role === 'assistant' ? 'Assistant' : 'System'; return `**${roleLabel}:** ${msg.content}`; }; const formatted = history.map((msg) => formatMessage(msg)); const fullHistory = formatted.join('\n\n'); const fullTokens = this.estimateTokens(HISTORY_HEADER + fullHistory); if (fullTokens <= TOKEN_BUDGET) { this.logger.debug( `Session ${sessionId}: injecting full history (${history.length} msgs, ~${fullTokens} tokens)`, ); return HISTORY_HEADER + fullHistory; } // History exceeds budget — summarize oldest messages, keep recent verbatim this.logger.log( `Session ${sessionId}: history (~${fullTokens} tokens) exceeds ${TOKEN_BUDGET} token budget; summarizing oldest messages`, ); // Reserve 20% of the budget for the summary prefix, rest for verbatim messages const SUMMARY_RESERVE = Math.floor(TOKEN_BUDGET * 0.2); const verbatimBudget = TOKEN_BUDGET - SUMMARY_RESERVE; let verbatimTokens = 0; let verbatimCutIndex = history.length; for (let i = history.length - 1; i >= 0; i--) { const t = this.estimateTokens(formatted[i]!); if (verbatimTokens + t > verbatimBudget) break; verbatimTokens += t; verbatimCutIndex = i; } const summarizedMessages = history.slice(0, verbatimCutIndex); const verbatimMessages = history.slice(verbatimCutIndex); let summaryText = ''; if (summarizedMessages.length > 0) { const topics = summarizedMessages .filter((m) => m.role === 'user') .map((m) => m.content.slice(0, 120).replace(/\n/g, ' ')) .join('; '); summaryText = `**Previous conversation summary** (${summarizedMessages.length} messages omitted for brevity):\n` + `Topics discussed: ${topics || '(no user messages in summarized portion)'}`; } const verbatimSection = verbatimMessages.map((m) => formatMessage(m)).join('\n\n'); const parts: string[] = [HISTORY_HEADER]; if (summaryText) parts.push(summaryText); if (verbatimSection) parts.push(verbatimSection); const result = parts.join('\n\n'); this.logger.log( `Session ${sessionId}: summarized ${summarizedMessages.length} messages, kept ${verbatimMessages.length} verbatim (~${this.estimateTokens(result)} tokens)`, ); return result; } private resolveModel(options?: AgentSessionOptions) { if (!options?.provider && !options?.modelId) { return this.providerService.getDefaultModel() ?? null; } if (options.provider && options.modelId) { const model = this.providerService.findModel(options.provider, options.modelId); if (!model) { throw new Error(`Model not found: ${options.provider}/${options.modelId}`); } return model; } if (options.modelId) { const available = this.providerService.listAvailableModels(); const match = available.find((m) => m.id === options.modelId); if (match) { return this.providerService.findModel(match.provider, match.id) ?? null; } } return this.providerService.getDefaultModel() ?? null; } getSession(sessionId: string): AgentSession | undefined { return this.sessions.get(sessionId); } listSessions(): SessionInfoDto[] { const now = Date.now(); return Array.from(this.sessions.values()).map((s) => ({ id: s.id, provider: s.provider, modelId: s.modelId, ...(s.agentName ? { agentName: s.agentName } : {}), createdAt: new Date(s.createdAt).toISOString(), promptCount: s.promptCount, channels: Array.from(s.channels), durationMs: now - s.createdAt, metrics: { ...s.metrics }, })); } getSessionInfo(sessionId: string): SessionInfoDto | undefined { const s = this.sessions.get(sessionId); if (!s) return undefined; return { id: s.id, provider: s.provider, modelId: s.modelId, ...(s.agentName ? { agentName: s.agentName } : {}), createdAt: new Date(s.createdAt).toISOString(), promptCount: s.promptCount, channels: Array.from(s.channels), durationMs: Date.now() - s.createdAt, metrics: { ...s.metrics }, }; } /** * Record token usage for a session turn (M5-007). * Accumulates tokens across the session lifetime. */ recordTokenUsage( sessionId: string, tokens: { input: number; output: number; cacheRead: number; cacheWrite: number; total: number }, ): void { const session = this.sessions.get(sessionId); if (!session) return; session.metrics.tokens.input += tokens.input; session.metrics.tokens.output += tokens.output; session.metrics.tokens.cacheRead += tokens.cacheRead; session.metrics.tokens.cacheWrite += tokens.cacheWrite; session.metrics.tokens.total += tokens.total; session.metrics.lastActivityAt = new Date().toISOString(); } /** * Record a model switch event for a session (M5-007). */ recordModelSwitch(sessionId: string): void { const session = this.sessions.get(sessionId); if (!session) return; session.metrics.modelSwitches += 1; session.metrics.lastActivityAt = new Date().toISOString(); } /** * Increment message count for a session (M5-007). */ recordMessage(sessionId: string): void { const session = this.sessions.get(sessionId); if (!session) return; session.metrics.messageCount += 1; session.metrics.lastActivityAt = new Date().toISOString(); } /** * Update the model tracked on a live session (M5-002). * This records the model change in the session metadata so subsequent * session:info emissions reflect the new model. The Pi session itself is * not reconstructed — the model is used on the next createSession call for * the same conversationId when the session is torn down or a new one is created. */ updateSessionModel(sessionId: string, modelId: string): void { const session = this.sessions.get(sessionId); if (!session) return; const prev = session.modelId; session.modelId = modelId; this.recordModelSwitch(sessionId); this.logger.log(`Session ${sessionId}: model updated ${prev} → ${modelId} (M5-002)`); } /** * Apply a new agent config to a live session mid-conversation (M5-003). * Updates agentName, agentConfigId, and modelId on the session object. * System prompt and tools take effect when the next session is created for * this conversationId (they are baked in at session creation time). */ applyAgentConfig( sessionId: string, agentConfigId: string, agentName: string, modelId?: string, ): void { const session = this.sessions.get(sessionId); if (!session) return; session.agentConfigId = agentConfigId; session.agentName = agentName; if (modelId) { this.updateSessionModel(sessionId, modelId); } this.logger.log( `Session ${sessionId}: agent switched to "${agentName}" (${agentConfigId}) (M5-003)`, ); } addChannel(sessionId: string, channel: string): void { const session = this.sessions.get(sessionId); if (session) { session.channels.add(channel); } } removeChannel(sessionId: string, channel: string): void { const session = this.sessions.get(sessionId); if (session) { session.channels.delete(channel); } } async prompt(sessionId: string, message: string): Promise { const session = this.sessions.get(sessionId); if (!session) { throw new Error(`No agent session found: ${sessionId}`); } session.promptCount += 1; // Prepend session-scoped system override if present (renew TTL on each turn) let effectiveMessage = message; if (this.systemOverride) { const override = await this.systemOverride.get(sessionId); if (override) { effectiveMessage = `[System Override]\n${override}\n\n${message}`; await this.systemOverride.renew(sessionId); this.logger.debug(`Applied system override for session ${sessionId}`); } } try { await session.piSession.prompt(effectiveMessage); } catch (err) { this.logger.error( `Prompt failed for session=${sessionId}, messageLength=${message.length}`, err instanceof Error ? err.stack : String(err), ); throw err; } } onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void { const session = this.sessions.get(sessionId); if (!session) { throw new Error(`No agent session found: ${sessionId}`); } session.listeners.add(listener); return () => session.listeners.delete(listener); } async destroySession(sessionId: string): Promise { const session = this.sessions.get(sessionId); if (!session) return; this.logger.log(`Destroying agent session ${sessionId}`); try { session.unsubscribe(); } catch (err) { this.logger.error(`Failed to unsubscribe session ${sessionId}`, String(err)); } try { session.piSession.dispose(); } catch (err) { this.logger.error(`Failed to dispose piSession for ${sessionId}`, String(err)); } session.listeners.clear(); session.channels.clear(); this.sessions.delete(sessionId); // Run GC cleanup for this session (fire and forget, errors are logged) this.gc.collect(sessionId).catch((err: unknown) => { this.logger.error( `GC collect failed for session ${sessionId}`, err instanceof Error ? err.stack : String(err), ); }); } async onModuleDestroy(): Promise { this.logger.log('Shutting down all agent sessions'); const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id)); const results = await Promise.allSettled(stops); for (const result of results) { if (result.status === 'rejected') { this.logger.error('Session shutdown failure', String(result.reason)); } } } }