import { promises as fs } from 'node:fs'; import path from 'node:path'; import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; import { runAgentLoop, type AgentContext, type AgentEvent, type AgentLoopConfig, type AgentMessage, type AgentTool, } from '@mariozechner/pi-agent-core'; import { getModel, Type, type AssistantMessage, type AssistantMessageEvent, type Model, type Static, } from '@mariozechner/pi-ai'; const execFileAsync = promisify(execFile); type JsonValue = string | number | boolean | null | JsonValue[] | { [key: string]: JsonValue }; export interface PiBridgeOptions { model: string; systemPrompt: string; prompt: string; workDir: string; timeoutMs: number; logPath: string; signal?: AbortSignal; onEvent?: (event: AgentEvent) => void | Promise; } export interface PiBridgeResult { exitCode: number; output: string; messages: AgentMessage[]; tokenUsage: { input: number; output: number }; stopReason: string; } type TranscriptEvent = { timestamp: string; type: string; data?: JsonValue; }; function nowIso(): string { return new Date().toISOString(); } function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; } function asJsonValue(value: unknown): JsonValue { if ( value === null || typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean' ) { return value; } if (Array.isArray(value)) { return value.map((item) => asJsonValue(item)); } if (isRecord(value)) { return Object.fromEntries(Object.entries(value).map(([key, item]) => [key, asJsonValue(item)])); } return String(value); } function resolvePath(workDir: string, targetPath: string): string { if (path.isAbsolute(targetPath)) { return path.normalize(targetPath); } return path.resolve(workDir, targetPath); } async function runCommand( command: string, workDir: string, timeoutMs: number, ): Promise<{ stdout: string; stderr: string }> { const result = await execFileAsync('bash', ['-lc', command], { cwd: workDir, encoding: 'utf-8', maxBuffer: 10 * 1024 * 1024, timeout: Math.max(1, timeoutMs), }); return { stdout: result.stdout ?? '', stderr: result.stderr ?? '' }; } function extractText(message: AgentMessage | undefined): string { if ( !message || !('role' in message) || message.role !== 'assistant' || !Array.isArray(message.content) ) { return ''; } return message.content .filter( (part): part is { type: 'text'; text: string } => isRecord(part) && part.type === 'text' && typeof part.text === 'string', ) .map((part) => part.text) .join('\n') .trim(); } function getFinalAssistantMessage(messages: AgentMessage[]): AssistantMessage | undefined { return [...messages] .reverse() .find( (message): message is AssistantMessage => 'role' in message && message.role === 'assistant', ); } function resolveModel(modelRef: string): Model { const slashIndex = modelRef.indexOf('/'); if (slashIndex < 1) { throw new Error(`Invalid Pi model "${modelRef}". Expected provider/model.`); } const provider = modelRef.slice(0, slashIndex); const modelId = modelRef.slice(slashIndex + 1); if (!modelId) { throw new Error(`Invalid Pi model "${modelRef}". Expected provider/model.`); } const isOpenAiOAuth = provider === 'openai' && (process.env.OPENAI_API_KEY?.startsWith('eyJ') ?? false); try { const model = getModel(provider as never, modelId as never); if (isOpenAiOAuth && model.api === 'openai-responses') { return { ...model, api: 'openai-completions' }; } return model; } catch { const fallbackApi = provider === 'anthropic' ? 'anthropic-messages' : provider === 'openai' ? isOpenAiOAuth ? 'openai-completions' : 'openai-responses' : 'openai-completions'; return { id: modelId, name: modelId, api: fallbackApi, provider, baseUrl: '', reasoning: true, input: ['text'], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 131072, maxTokens: 16384, }; } } function createDefaultTools(workDir: string): AgentTool[] { const readFileSchema = Type.Object({ path: Type.String({ description: 'Relative or absolute path to read.' }), }); const writeFileSchema = Type.Object({ path: Type.String({ description: 'Relative or absolute path to write.' }), content: Type.String({ description: 'UTF-8 file content.' }), append: Type.Optional(Type.Boolean({ description: 'Append instead of overwrite.' })), }); const editFileSchema = Type.Object({ path: Type.String({ description: 'Relative or absolute path to edit.' }), search: Type.String({ description: 'The exact text to replace.' }), replace: Type.String({ description: 'Replacement text.' }), replaceAll: Type.Optional(Type.Boolean({ description: 'Replace every occurrence.' })), }); const execShellSchema = Type.Object({ command: Type.String({ description: 'Shell command to execute in the worktree.' }), timeoutMs: Type.Optional( Type.Number({ description: 'Optional timeout override in milliseconds.' }), ), }); const listDirSchema = Type.Object({ path: Type.Optional(Type.String({ description: 'Directory path relative to the worktree.' })), }); const gitSchema = Type.Object({ args: Type.Array(Type.String({ description: 'Git CLI argument.' }), { description: 'Arguments passed to git.', minItems: 1, }), }); const readFileTool: AgentTool = { name: 'read_file', label: 'Read File', description: 'Read a UTF-8 text file from disk.', parameters: readFileSchema, async execute(_toolCallId, params: Static) { const filePath = resolvePath(workDir, params.path); const content = await fs.readFile(filePath, 'utf-8'); return { content: [{ type: 'text', text: content }], details: { path: filePath }, }; }, }; const writeFileTool: AgentTool = { name: 'write_file', label: 'Write File', description: 'Write or append a UTF-8 text file.', parameters: writeFileSchema, async execute(_toolCallId, params) { const filePath = resolvePath(workDir, params.path); await fs.mkdir(path.dirname(filePath), { recursive: true }); if (params.append) { await fs.appendFile(filePath, params.content, 'utf-8'); } else { await fs.writeFile(filePath, params.content, 'utf-8'); } return { content: [{ type: 'text', text: `Wrote ${filePath}` }], details: { path: filePath, append: Boolean(params.append) }, }; }, }; const editFileTool: AgentTool = { name: 'edit_file', label: 'Edit File', description: 'Apply an exact-match text replacement to a UTF-8 text file.', parameters: editFileSchema, async execute(_toolCallId, params) { const filePath = resolvePath(workDir, params.path); const original = await fs.readFile(filePath, 'utf-8'); if (!original.includes(params.search)) { throw new Error(`Search text not found in ${filePath}`); } const updated = params.replaceAll ? original.split(params.search).join(params.replace) : original.replace(params.search, params.replace); await fs.writeFile(filePath, updated, 'utf-8'); return { content: [{ type: 'text', text: `Updated ${filePath}` }], details: { path: filePath, replaceAll: Boolean(params.replaceAll) }, }; }, }; const execShellTool: AgentTool = { name: 'exec_shell', label: 'Exec Shell', description: 'Execute an unrestricted shell command inside the worktree.', parameters: execShellSchema, async execute(_toolCallId, params) { const result = await runCommand(params.command, workDir, params.timeoutMs ?? 300_000); return { content: [ { type: 'text', text: [result.stdout.trim(), result.stderr.trim()].filter(Boolean).join('\n') || '(no output)', }, ], details: { command: params.command, stdout: result.stdout, stderr: result.stderr }, }; }, }; const listDirTool: AgentTool = { name: 'list_dir', label: 'List Dir', description: 'List directory entries for a relative or absolute path.', parameters: listDirSchema, async execute(_toolCallId, params) { const dirPath = resolvePath(workDir, params.path ?? '.'); const entries = await fs.readdir(dirPath, { withFileTypes: true }); const lines = entries .sort((left, right) => left.name.localeCompare(right.name)) .map((entry) => `${entry.isDirectory() ? 'dir ' : 'file'} ${entry.name}`); return { content: [{ type: 'text', text: lines.join('\n') }], details: { path: dirPath, entries: lines }, }; }, }; const gitTool: AgentTool = { name: 'git', label: 'Git', description: 'Run git commands such as status, diff, add, or commit in the worktree.', parameters: gitSchema, async execute(_toolCallId, params) { const result = await execFileAsync('git', params.args, { cwd: workDir, encoding: 'utf-8', maxBuffer: 10 * 1024 * 1024, timeout: 300_000, }); const text = [result.stdout?.trim(), result.stderr?.trim()].filter(Boolean).join('\n') || '(no output)'; return { content: [{ type: 'text', text }], details: { args: params.args, stdout: result.stdout ?? '', stderr: result.stderr ?? '' }, }; }, }; return [readFileTool, writeFileTool, editFileTool, execShellTool, listDirTool, gitTool]; } function buildLogEntry(event: unknown): TranscriptEvent { if (!isRecord(event) || typeof event.type !== 'string') { return { timestamp: nowIso(), type: 'unknown', data: asJsonValue(event) }; } const summary: Record = {}; for (const [key, value] of Object.entries(event)) { if (key === 'message' || key === 'toolResults' || key === 'messages') { summary[key] = asJsonValue(value); continue; } if (key !== 'type') { summary[key] = asJsonValue(value); } } return { timestamp: nowIso(), type: event.type, data: summary }; } function inferExitCode(finalMessage: AssistantMessage | undefined, output: string): number { if (!finalMessage) { return 1; } if (finalMessage.stopReason === 'error' || finalMessage.stopReason === 'aborted') { return 1; } if (/^(failed|failure|blocked)\b/i.test(output)) { return 1; } return 0; } export function formatAssistantEvent(event: AssistantMessageEvent): { text?: string; stream?: 'output' | 'thought'; tag?: string; } | null { switch (event.type) { case 'text_delta': return { text: event.delta, stream: 'output', tag: 'agent_message_chunk' }; case 'thinking_delta': return { text: event.delta, stream: 'thought', tag: 'agent_thought_chunk' }; case 'toolcall_start': return { text: JSON.stringify(event.partial.content[event.contentIndex] ?? {}), tag: 'tool_call', }; case 'toolcall_delta': return { text: event.delta, tag: 'tool_call_update' }; case 'done': return null; case 'error': return null; default: return null; } } export async function runPiTurn(options: PiBridgeOptions): Promise { const transcript: TranscriptEvent[] = []; const workDir = path.resolve(options.workDir); const logPath = path.resolve(options.logPath); const timeoutController = new AbortController(); const combinedSignal = options.signal ? AbortSignal.any([options.signal, timeoutController.signal]) : timeoutController.signal; const timeoutHandle = setTimeout(() => timeoutController.abort(), Math.max(1, options.timeoutMs)); const context: AgentContext = { systemPrompt: options.systemPrompt, messages: [], tools: createDefaultTools(workDir), }; const config: AgentLoopConfig = { model: resolveModel(options.model), reasoning: 'medium', convertToLlm: async (messages) => messages.filter( (message): message is AgentMessage => isRecord(message) && typeof message.role === 'string' && ['user', 'assistant', 'toolResult'].includes(message.role), ), }; const prompts: AgentMessage[] = [ { role: 'user', content: options.prompt, timestamp: Date.now(), }, ]; try { transcript.push({ timestamp: nowIso(), type: 'runner_start', data: { model: options.model, workDir, timeoutMs: options.timeoutMs, }, }); const messages = await runAgentLoop( prompts, context, config, async (event) => { transcript.push(buildLogEntry(event)); await options.onEvent?.(event); }, combinedSignal, ); const finalMessage = getFinalAssistantMessage(messages); const output = extractText(finalMessage); const tokenUsage = finalMessage ? { input: finalMessage.usage?.input ?? 0, output: finalMessage.usage?.output ?? 0, } : { input: 0, output: 0 }; const result: PiBridgeResult = { exitCode: inferExitCode(finalMessage, output), output, messages, tokenUsage, stopReason: finalMessage?.stopReason ?? 'stop', }; transcript.push({ timestamp: nowIso(), type: 'runner_end', data: { exitCode: result.exitCode, output: result.output, tokenUsage: result.tokenUsage, stopReason: result.stopReason, }, }); await fs.mkdir(path.dirname(logPath), { recursive: true }); await fs.writeFile(logPath, `${JSON.stringify({ transcript, result }, null, 2)}\n`, 'utf-8'); return result; } catch (error) { const message = error instanceof Error ? error.message : String(error); transcript.push({ timestamp: nowIso(), type: 'runner_error', data: { error: message }, }); await fs.mkdir(path.dirname(logPath), { recursive: true }); await fs.writeFile( logPath, `${JSON.stringify({ transcript, result: { exitCode: 1, output: message, tokenUsage: { input: 0, output: 0 }, stopReason: 'error' } }, null, 2)}\n`, 'utf-8', ); return { exitCode: 1, output: message, messages: [], tokenUsage: { input: 0, output: 0 }, stopReason: combinedSignal.aborted ? 'aborted' : 'error', }; } finally { clearTimeout(timeoutHandle); } }