Files
stack/apps/gateway/src/agent/agent.service.ts
Jarvis 774b76447d
Some checks failed
ci/woodpecker/pr/ci Pipeline failed
ci/woodpecker/push/ci Pipeline failed
fix: rename all packages from @mosaic/* to @mosaicstack/*
- Updated all package.json name fields and dependency references
- Updated all TypeScript/JavaScript imports
- Updated .woodpecker/publish.yml filters and registry paths
- Updated tools/install.sh scope default
- Updated .npmrc registry paths (worktree + host)
- Enhanced update-checker.ts with checkForAllUpdates() multi-package support
- Updated CLI update command to show table of all packages
- Added KNOWN_PACKAGES, formatAllPackagesTable, getInstallAllCommand
- Marked checkForUpdate() with @deprecated JSDoc

Closes #391
2026-04-04 21:43:23 -05:00

679 lines
24 KiB
TypeScript

import { Inject, Injectable, Logger, Optional, type OnModuleDestroy } from '@nestjs/common';
import {
createAgentSession,
DefaultResourceLoader,
SessionManager,
type AgentSession as PiAgentSession,
type AgentSessionEvent,
type ToolDefinition,
} from '@mariozechner/pi-coding-agent';
import type { Brain } from '@mosaicstack/brain';
import type { Memory } from '@mosaicstack/memory';
import { BRAIN } from '../brain/brain.tokens.js';
import { MEMORY } from '../memory/memory.tokens.js';
import { EmbeddingService } from '../memory/embedding.service.js';
import { CoordService } from '../coord/coord.service.js';
import { ProviderService } from './provider.service.js';
import { McpClientService } from '../mcp-client/mcp-client.service.js';
import { SkillLoaderService } from './skill-loader.service.js';
import { createBrainTools } from './tools/brain-tools.js';
import { createCoordTools } from './tools/coord-tools.js';
import { createMemoryTools } from './tools/memory-tools.js';
import { createFileTools } from './tools/file-tools.js';
import { createGitTools } from './tools/git-tools.js';
import { createShellTools } from './tools/shell-tools.js';
import { createWebTools } from './tools/web-tools.js';
import { createSearchTools } from './tools/search-tools.js';
import type { SessionInfoDto, SessionMetrics } from './session.dto.js';
import { SystemOverrideService } from '../preferences/system-override.service.js';
import { PreferencesService } from '../preferences/preferences.service.js';
import { SessionGCService } from '../gc/session-gc.service.js';
/** A single message from DB conversation history, used for context injection. */
export interface ConversationHistoryMessage {
role: 'user' | 'assistant' | 'system';
content: string;
createdAt: Date;
}
export interface AgentSessionOptions {
provider?: string;
modelId?: string;
/**
* Sandbox working directory for the session.
* File, git, and shell tools will be restricted to this directory.
* Falls back to AGENT_FILE_SANDBOX_DIR env var or process.cwd().
*/
sandboxDir?: string;
/**
* Platform-level system prompt for this session.
* Merged with skill prompt additions (platform prompt first, then skills).
* Falls back to AGENT_SYSTEM_PROMPT env var when omitted.
*/
systemPrompt?: string;
/**
* Explicit allowlist of tool names available in this session.
* When set, only listed tools are registered with the agent.
* When omitted for non-admin users, falls back to AGENT_USER_TOOLS env var.
* Admins (isAdmin=true) always receive the full tool set unless explicitly restricted.
*/
allowedTools?: string[];
/** Whether the requesting user has admin privileges. Controls default tool access. */
isAdmin?: boolean;
/**
* DB agent config ID. When provided, loads agent config from DB and merges
* provider, model, systemPrompt, and allowedTools. Explicit call-site options
* take precedence over config values.
*/
agentConfigId?: string;
/** ID of the user who owns this session. Used for preferences and system override lookups. */
userId?: string;
/**
* Prior conversation messages to inject as context when resuming a session.
* These messages are formatted and prepended to the system prompt so the
* agent is aware of what was discussed in previous sessions.
*/
conversationHistory?: ConversationHistoryMessage[];
}
export interface AgentSession {
id: string;
provider: string;
modelId: string;
piSession: PiAgentSession;
listeners: Set<(event: AgentSessionEvent) => void>;
unsubscribe: () => void;
createdAt: number;
promptCount: number;
channels: Set<string>;
/** System prompt additions injected from enabled prompt-type skills. */
skillPromptAdditions: string[];
/** Resolved sandbox directory for this session. */
sandboxDir: string;
/** Tool names available in this session, or null when all tools are available. */
allowedTools: string[] | null;
/** User ID that owns this session, used for preference lookups. */
userId?: string;
/** Agent config ID applied to this session, if any (M5-001). */
agentConfigId?: string;
/** Human-readable agent name applied to this session, if any (M5-001). */
agentName?: string;
/** M5-007: per-session metrics. */
metrics: SessionMetrics;
}
@Injectable()
export class AgentService implements OnModuleDestroy {
private readonly logger = new Logger(AgentService.name);
private readonly sessions = new Map<string, AgentSession>();
private readonly creating = new Map<string, Promise<AgentSession>>();
constructor(
@Inject(ProviderService) private readonly providerService: ProviderService,
@Inject(BRAIN) private readonly brain: Brain,
@Inject(MEMORY) private readonly memory: Memory,
@Inject(EmbeddingService) private readonly embeddingService: EmbeddingService,
@Inject(CoordService) private readonly coordService: CoordService,
@Inject(McpClientService) private readonly mcpClientService: McpClientService,
@Inject(SkillLoaderService) private readonly skillLoaderService: SkillLoaderService,
@Optional()
@Inject(SystemOverrideService)
private readonly systemOverride: SystemOverrideService | null,
@Optional()
@Inject(PreferencesService)
private readonly preferencesService: PreferencesService | null,
@Inject(SessionGCService) private readonly gc: SessionGCService,
) {}
/**
* Build the full set of custom tools scoped to the given sandbox directory and session user.
* Brain/coord/memory/web tools are stateless with respect to cwd; file/git/shell
* tools receive the resolved sandboxDir so they operate within the sandbox.
* Memory tools are bound to sessionUserId so the LLM cannot access another user's data.
*/
private buildToolsForSandbox(
sandboxDir: string,
sessionUserId: string | undefined,
): ToolDefinition[] {
return [
...createBrainTools(this.brain),
...createCoordTools(this.coordService),
...createMemoryTools(
this.memory,
this.embeddingService.available ? this.embeddingService : null,
sessionUserId,
),
...createFileTools(sandboxDir),
...createGitTools(sandboxDir),
...createShellTools(sandboxDir),
...createWebTools(),
...createSearchTools(),
];
}
/**
* Resolve the tool allowlist for a session.
* - Admin users: all tools unless an explicit allowedTools list is passed.
* - Regular users: use allowedTools if provided, otherwise parse AGENT_USER_TOOLS env var.
* Returns null when all tools should be available.
*/
private resolveAllowedTools(isAdmin: boolean, allowedTools?: string[]): string[] | null {
if (allowedTools !== undefined) {
return allowedTools.length === 0 ? [] : allowedTools;
}
if (isAdmin) {
return null; // admins get everything
}
const envTools = process.env['AGENT_USER_TOOLS'];
if (!envTools) {
return null; // no restriction configured
}
return envTools
.split(',')
.map((t) => t.trim())
.filter((t) => t.length > 0);
}
async createSession(sessionId: string, options?: AgentSessionOptions): Promise<AgentSession> {
const existing = this.sessions.get(sessionId);
if (existing) return existing;
const inflight = this.creating.get(sessionId);
if (inflight) return inflight;
const promise = this.doCreateSession(sessionId, options).finally(() => {
this.creating.delete(sessionId);
});
this.creating.set(sessionId, promise);
return promise;
}
private async doCreateSession(
sessionId: string,
options?: AgentSessionOptions,
): Promise<AgentSession> {
// Merge DB agent config when agentConfigId is provided (M5-001)
let mergedOptions = options;
let resolvedAgentName: string | undefined;
if (options?.agentConfigId) {
const agentConfig = await this.brain.agents.findById(options.agentConfigId);
if (agentConfig) {
resolvedAgentName = agentConfig.name;
mergedOptions = {
provider: options.provider ?? agentConfig.provider,
modelId: options.modelId ?? agentConfig.model,
systemPrompt: options.systemPrompt ?? agentConfig.systemPrompt ?? undefined,
allowedTools: options.allowedTools ?? agentConfig.allowedTools ?? undefined,
sandboxDir: options.sandboxDir,
isAdmin: options.isAdmin,
agentConfigId: options.agentConfigId,
userId: options.userId,
conversationHistory: options.conversationHistory,
};
this.logger.log(
`Merged agent config "${agentConfig.name}" (${agentConfig.id}) into session ${sessionId}`,
);
}
}
const model = this.resolveModel(mergedOptions);
const providerName = model?.provider ?? 'default';
const modelId = model?.id ?? 'default';
// Resolve sandbox directory: option > env var > process.cwd()
const sandboxDir =
mergedOptions?.sandboxDir ?? process.env['AGENT_FILE_SANDBOX_DIR'] ?? process.cwd();
// Resolve allowed tool set
const allowedTools = this.resolveAllowedTools(
mergedOptions?.isAdmin ?? false,
mergedOptions?.allowedTools,
);
this.logger.log(
`Creating agent session: ${sessionId} (provider=${providerName}, model=${modelId}, sandbox=${sandboxDir}, tools=${allowedTools === null ? 'all' : allowedTools.join(',') || 'none'})`,
);
// Load skill tools from the catalog
const { metaTools: skillMetaTools, promptAdditions } =
await this.skillLoaderService.loadForSession();
if (skillMetaTools.length > 0) {
this.logger.log(`Attaching ${skillMetaTools.length} skill tool(s) to session ${sessionId}`);
}
if (promptAdditions.length > 0) {
this.logger.log(
`Injecting ${promptAdditions.length} skill prompt addition(s) into session ${sessionId}`,
);
}
// Build per-session tools scoped to the sandbox directory and authenticated user
const sandboxTools = this.buildToolsForSandbox(sandboxDir, mergedOptions?.userId);
// Combine static tools with dynamically discovered MCP client tools and skill tools
const mcpTools = this.mcpClientService.getToolDefinitions();
let allCustomTools = [...sandboxTools, ...skillMetaTools, ...mcpTools];
if (mcpTools.length > 0) {
this.logger.log(`Attaching ${mcpTools.length} MCP client tool(s) to session ${sessionId}`);
}
// Filter tools by allowlist when a restriction is in effect
if (allowedTools !== null) {
const allowedSet = new Set(allowedTools);
const before = allCustomTools.length;
allCustomTools = allCustomTools.filter((t) => allowedSet.has(t.name));
this.logger.log(
`Tool restriction applied: ${allCustomTools.length}/${before} tools allowed for session ${sessionId}`,
);
}
// Build system prompt: platform prompt + skill additions appended
const platformPrompt =
mergedOptions?.systemPrompt ?? process.env['AGENT_SYSTEM_PROMPT'] ?? undefined;
// Format conversation history for context injection (M1-004 / M1-005)
const historyPromptSection = mergedOptions?.conversationHistory?.length
? this.buildHistoryPromptSection(
mergedOptions.conversationHistory,
model?.contextWindow ?? 8192,
sessionId,
)
: undefined;
const appendParts: string[] = [];
if (promptAdditions.length > 0) appendParts.push(promptAdditions.join('\n\n'));
if (historyPromptSection) appendParts.push(historyPromptSection);
const appendSystemPrompt = appendParts.length > 0 ? appendParts.join('\n\n') : undefined;
// Construct a resource loader that injects the configured system prompt
const resourceLoader = new DefaultResourceLoader({
cwd: sandboxDir,
noExtensions: true,
noSkills: true,
noPromptTemplates: true,
noThemes: true,
systemPrompt: platformPrompt,
appendSystemPrompt: appendSystemPrompt,
});
await resourceLoader.reload();
let piSession: PiAgentSession;
try {
const result = await createAgentSession({
sessionManager: SessionManager.inMemory(),
modelRegistry: this.providerService.getRegistry(),
model: model ?? undefined,
cwd: sandboxDir,
tools: [],
customTools: allCustomTools,
resourceLoader,
});
piSession = result.session;
} catch (err) {
this.logger.error(
`Failed to create agent session for ${sessionId}`,
err instanceof Error ? err.stack : String(err),
);
throw new Error(`Agent session creation failed for ${sessionId}: ${String(err)}`);
}
const listeners = new Set<(event: AgentSessionEvent) => void>();
const unsubscribe = piSession.subscribe((event) => {
for (const listener of listeners) {
try {
listener(event);
} catch (err) {
this.logger.error(`Event listener error in session ${sessionId}`, err);
}
}
});
const session: AgentSession = {
id: sessionId,
provider: providerName,
modelId,
piSession,
listeners,
unsubscribe,
createdAt: Date.now(),
promptCount: 0,
channels: new Set(),
skillPromptAdditions: promptAdditions,
sandboxDir,
allowedTools,
userId: mergedOptions?.userId,
agentConfigId: mergedOptions?.agentConfigId,
agentName: resolvedAgentName,
metrics: {
tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
modelSwitches: 0,
messageCount: 0,
lastActivityAt: new Date().toISOString(),
},
};
this.sessions.set(sessionId, session);
this.logger.log(`Agent session ${sessionId} ready (${providerName}/${modelId})`);
if (resolvedAgentName) {
this.logger.log(
`Agent session ${sessionId} using agent config "${resolvedAgentName}" (M5-001)`,
);
}
return session;
}
/**
* Estimate token count for a string using a rough 4-chars-per-token heuristic.
*/
private estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
/**
* Build a conversation history section for injection into the system prompt.
* Implements M1-004 (history loading) and M1-005 (context window management).
*
* - Formats messages as a readable conversation transcript.
* - If the full history exceeds 80% of the model's context window, older messages
* are summarized and only the most recent messages are kept verbatim.
* - Summarization is a simple extractive approach (no LLM required).
*/
private buildHistoryPromptSection(
history: ConversationHistoryMessage[],
contextWindow: number,
sessionId: string,
): string {
const TOKEN_BUDGET = Math.floor(contextWindow * 0.8);
const HISTORY_HEADER = '## Conversation History (resumed session)\n\n';
const formatMessage = (msg: ConversationHistoryMessage): string => {
const roleLabel =
msg.role === 'user' ? 'User' : msg.role === 'assistant' ? 'Assistant' : 'System';
return `**${roleLabel}:** ${msg.content}`;
};
const formatted = history.map((msg) => formatMessage(msg));
const fullHistory = formatted.join('\n\n');
const fullTokens = this.estimateTokens(HISTORY_HEADER + fullHistory);
if (fullTokens <= TOKEN_BUDGET) {
this.logger.debug(
`Session ${sessionId}: injecting full history (${history.length} msgs, ~${fullTokens} tokens)`,
);
return HISTORY_HEADER + fullHistory;
}
// History exceeds budget — summarize oldest messages, keep recent verbatim
this.logger.log(
`Session ${sessionId}: history (~${fullTokens} tokens) exceeds ${TOKEN_BUDGET} token budget; summarizing oldest messages`,
);
// Reserve 20% of the budget for the summary prefix, rest for verbatim messages
const SUMMARY_RESERVE = Math.floor(TOKEN_BUDGET * 0.2);
const verbatimBudget = TOKEN_BUDGET - SUMMARY_RESERVE;
let verbatimTokens = 0;
let verbatimCutIndex = history.length;
for (let i = history.length - 1; i >= 0; i--) {
const t = this.estimateTokens(formatted[i]!);
if (verbatimTokens + t > verbatimBudget) break;
verbatimTokens += t;
verbatimCutIndex = i;
}
const summarizedMessages = history.slice(0, verbatimCutIndex);
const verbatimMessages = history.slice(verbatimCutIndex);
let summaryText = '';
if (summarizedMessages.length > 0) {
const topics = summarizedMessages
.filter((m) => m.role === 'user')
.map((m) => m.content.slice(0, 120).replace(/\n/g, ' '))
.join('; ');
summaryText =
`**Previous conversation summary** (${summarizedMessages.length} messages omitted for brevity):\n` +
`Topics discussed: ${topics || '(no user messages in summarized portion)'}`;
}
const verbatimSection = verbatimMessages.map((m) => formatMessage(m)).join('\n\n');
const parts: string[] = [HISTORY_HEADER];
if (summaryText) parts.push(summaryText);
if (verbatimSection) parts.push(verbatimSection);
const result = parts.join('\n\n');
this.logger.log(
`Session ${sessionId}: summarized ${summarizedMessages.length} messages, kept ${verbatimMessages.length} verbatim (~${this.estimateTokens(result)} tokens)`,
);
return result;
}
private resolveModel(options?: AgentSessionOptions) {
if (!options?.provider && !options?.modelId) {
return this.providerService.getDefaultModel() ?? null;
}
if (options.provider && options.modelId) {
const model = this.providerService.findModel(options.provider, options.modelId);
if (!model) {
throw new Error(`Model not found: ${options.provider}/${options.modelId}`);
}
return model;
}
if (options.modelId) {
const available = this.providerService.listAvailableModels();
const match = available.find((m) => m.id === options.modelId);
if (match) {
return this.providerService.findModel(match.provider, match.id) ?? null;
}
}
return this.providerService.getDefaultModel() ?? null;
}
getSession(sessionId: string): AgentSession | undefined {
return this.sessions.get(sessionId);
}
listSessions(): SessionInfoDto[] {
const now = Date.now();
return Array.from(this.sessions.values()).map((s) => ({
id: s.id,
provider: s.provider,
modelId: s.modelId,
...(s.agentName ? { agentName: s.agentName } : {}),
createdAt: new Date(s.createdAt).toISOString(),
promptCount: s.promptCount,
channels: Array.from(s.channels),
durationMs: now - s.createdAt,
metrics: { ...s.metrics },
}));
}
getSessionInfo(sessionId: string): SessionInfoDto | undefined {
const s = this.sessions.get(sessionId);
if (!s) return undefined;
return {
id: s.id,
provider: s.provider,
modelId: s.modelId,
...(s.agentName ? { agentName: s.agentName } : {}),
createdAt: new Date(s.createdAt).toISOString(),
promptCount: s.promptCount,
channels: Array.from(s.channels),
durationMs: Date.now() - s.createdAt,
metrics: { ...s.metrics },
};
}
/**
* Record token usage for a session turn (M5-007).
* Accumulates tokens across the session lifetime.
*/
recordTokenUsage(
sessionId: string,
tokens: { input: number; output: number; cacheRead: number; cacheWrite: number; total: number },
): void {
const session = this.sessions.get(sessionId);
if (!session) return;
session.metrics.tokens.input += tokens.input;
session.metrics.tokens.output += tokens.output;
session.metrics.tokens.cacheRead += tokens.cacheRead;
session.metrics.tokens.cacheWrite += tokens.cacheWrite;
session.metrics.tokens.total += tokens.total;
session.metrics.lastActivityAt = new Date().toISOString();
}
/**
* Record a model switch event for a session (M5-007).
*/
recordModelSwitch(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (!session) return;
session.metrics.modelSwitches += 1;
session.metrics.lastActivityAt = new Date().toISOString();
}
/**
* Increment message count for a session (M5-007).
*/
recordMessage(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (!session) return;
session.metrics.messageCount += 1;
session.metrics.lastActivityAt = new Date().toISOString();
}
/**
* Update the model tracked on a live session (M5-002).
* This records the model change in the session metadata so subsequent
* session:info emissions reflect the new model. The Pi session itself is
* not reconstructed — the model is used on the next createSession call for
* the same conversationId when the session is torn down or a new one is created.
*/
updateSessionModel(sessionId: string, modelId: string): void {
const session = this.sessions.get(sessionId);
if (!session) return;
const prev = session.modelId;
session.modelId = modelId;
this.recordModelSwitch(sessionId);
this.logger.log(`Session ${sessionId}: model updated ${prev}${modelId} (M5-002)`);
}
/**
* Apply a new agent config to a live session mid-conversation (M5-003).
* Updates agentName, agentConfigId, and modelId on the session object.
* System prompt and tools take effect when the next session is created for
* this conversationId (they are baked in at session creation time).
*/
applyAgentConfig(
sessionId: string,
agentConfigId: string,
agentName: string,
modelId?: string,
): void {
const session = this.sessions.get(sessionId);
if (!session) return;
session.agentConfigId = agentConfigId;
session.agentName = agentName;
if (modelId) {
this.updateSessionModel(sessionId, modelId);
}
this.logger.log(
`Session ${sessionId}: agent switched to "${agentName}" (${agentConfigId}) (M5-003)`,
);
}
addChannel(sessionId: string, channel: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.channels.add(channel);
}
}
removeChannel(sessionId: string, channel: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.channels.delete(channel);
}
}
async prompt(sessionId: string, message: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`No agent session found: ${sessionId}`);
}
session.promptCount += 1;
// Prepend session-scoped system override if present (renew TTL on each turn)
let effectiveMessage = message;
if (this.systemOverride) {
const override = await this.systemOverride.get(sessionId);
if (override) {
effectiveMessage = `[System Override]\n${override}\n\n${message}`;
await this.systemOverride.renew(sessionId);
this.logger.debug(`Applied system override for session ${sessionId}`);
}
}
try {
await session.piSession.prompt(effectiveMessage);
} catch (err) {
this.logger.error(
`Prompt failed for session=${sessionId}, messageLength=${message.length}`,
err instanceof Error ? err.stack : String(err),
);
throw err;
}
}
onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`No agent session found: ${sessionId}`);
}
session.listeners.add(listener);
return () => session.listeners.delete(listener);
}
async destroySession(sessionId: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session) return;
this.logger.log(`Destroying agent session ${sessionId}`);
try {
session.unsubscribe();
} catch (err) {
this.logger.error(`Failed to unsubscribe session ${sessionId}`, String(err));
}
try {
session.piSession.dispose();
} catch (err) {
this.logger.error(`Failed to dispose piSession for ${sessionId}`, String(err));
}
session.listeners.clear();
session.channels.clear();
this.sessions.delete(sessionId);
// Run GC cleanup for this session (fire and forget, errors are logged)
this.gc.collect(sessionId).catch((err: unknown) => {
this.logger.error(
`GC collect failed for session ${sessionId}`,
err instanceof Error ? err.stack : String(err),
);
});
}
async onModuleDestroy(): Promise<void> {
this.logger.log('Shutting down all agent sessions');
const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id));
const results = await Promise.allSettled(stops);
for (const result of results) {
if (result.status === 'rejected') {
this.logger.error('Session shutdown failure', String(result.reason));
}
}
}
}