From 4b135ae1f0d954681ed87087261ca8b95841d01f Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 7 Mar 2026 19:06:36 +0000 Subject: [PATCH] feat(orchestrator): MS23-P1-002 InternalAgentProvider (#719) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- .../api/agents/agent-control.service.spec.ts | 34 ++- .../src/api/agents/agent-control.service.ts | 13 +- .../src/api/agents/agent-tree.service.spec.ts | 139 +++++++++++ .../src/api/agents/agent-tree.service.ts | 118 +++++++++- .../src/api/agents/agents.module.ts | 3 + .../agents/internal-agent.provider.spec.ts | 216 +++++++++++++++++ .../src/api/agents/internal-agent.provider.ts | 218 ++++++++++++++++++ 7 files changed, 737 insertions(+), 4 deletions(-) create mode 100644 apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts create mode 100644 apps/orchestrator/src/api/agents/internal-agent.provider.ts diff --git a/apps/orchestrator/src/api/agents/agent-control.service.spec.ts b/apps/orchestrator/src/api/agents/agent-control.service.spec.ts index 641e881..0742ab7 100644 --- a/apps/orchestrator/src/api/agents/agent-control.service.spec.ts +++ b/apps/orchestrator/src/api/agents/agent-control.service.spec.ts @@ -1,6 +1,7 @@ import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { AgentControlService } from "./agent-control.service"; import { PrismaService } from "../../prisma/prisma.service"; +import { KillswitchService } from "../../killswitch/killswitch.service"; describe("AgentControlService", () => { let service: AgentControlService; @@ -16,6 +17,9 @@ describe("AgentControlService", () => { create: ReturnType; }; }; + let killswitchService: { + killAgent: ReturnType; + }; beforeEach(() => { prisma = { @@ -31,7 +35,14 @@ describe("AgentControlService", () => { }, }; - service = new AgentControlService(prisma as unknown as PrismaService); + killswitchService = { + killAgent: vi.fn().mockResolvedValue(undefined), + }; + + service = new AgentControlService( + prisma as unknown as PrismaService, + killswitchService as unknown as KillswitchService + ); }); afterEach(() => { @@ -137,4 +148,25 @@ describe("AgentControlService", () => { }); }); }); + + describe("killAgent", () => { + it("delegates kill to killswitch and logs audit", async () => { + await service.killAgent("agent-654", "operator-kill", false); + + expect(killswitchService.killAgent).toHaveBeenCalledWith("agent-654"); + expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({ + data: { + sessionId: "agent-654", + userId: "operator-kill", + provider: "internal", + action: "kill", + metadata: { + payload: { + force: false, + }, + }, + }, + }); + }); + }); }); diff --git a/apps/orchestrator/src/api/agents/agent-control.service.ts b/apps/orchestrator/src/api/agents/agent-control.service.ts index e45ae0f..5030e40 100644 --- a/apps/orchestrator/src/api/agents/agent-control.service.ts +++ b/apps/orchestrator/src/api/agents/agent-control.service.ts @@ -1,10 +1,14 @@ import { Injectable } from "@nestjs/common"; import type { Prisma } from "@prisma/client"; +import { KillswitchService } from "../../killswitch/killswitch.service"; import { PrismaService } from "../../prisma/prisma.service"; @Injectable() export class AgentControlService { - constructor(private readonly prisma: PrismaService) {} + constructor( + private readonly prisma: PrismaService, + private readonly killswitchService: KillswitchService + ) {} private toJsonValue(value: Record): Prisma.InputJsonValue { return value as Prisma.InputJsonValue; @@ -13,7 +17,7 @@ export class AgentControlService { private async createOperatorAuditLog( agentId: string, operatorId: string, - action: "inject" | "pause" | "resume", + action: "inject" | "pause" | "resume" | "kill", payload: Record ): Promise { await this.prisma.operatorAuditLog.create({ @@ -65,4 +69,9 @@ export class AgentControlService { await this.createOperatorAuditLog(agentId, operatorId, "resume", {}); } + + async killAgent(agentId: string, operatorId: string, force = true): Promise { + await this.killswitchService.killAgent(agentId); + await this.createOperatorAuditLog(agentId, operatorId, "kill", { force }); + } } diff --git a/apps/orchestrator/src/api/agents/agent-tree.service.spec.ts b/apps/orchestrator/src/api/agents/agent-tree.service.spec.ts index be64b04..519f19a 100644 --- a/apps/orchestrator/src/api/agents/agent-tree.service.spec.ts +++ b/apps/orchestrator/src/api/agents/agent-tree.service.spec.ts @@ -7,6 +7,8 @@ describe("AgentTreeService", () => { let prisma: { agentSessionTree: { findMany: ReturnType; + count: ReturnType; + findUnique: ReturnType; }; }; @@ -14,6 +16,8 @@ describe("AgentTreeService", () => { prisma = { agentSessionTree: { findMany: vi.fn(), + count: vi.fn(), + findUnique: vi.fn(), }, }; @@ -24,6 +28,141 @@ describe("AgentTreeService", () => { vi.clearAllMocks(); }); + describe("listSessions", () => { + it("returns paginated sessions and cursor", async () => { + const sessions = [ + { + id: "tree-2", + sessionId: "agent-2", + parentSessionId: null, + provider: "internal", + missionId: null, + taskId: "task-2", + taskSource: "queue", + agentType: "worker", + status: "running", + spawnedAt: new Date("2026-03-07T11:00:00.000Z"), + completedAt: null, + metadata: {}, + }, + { + id: "tree-1", + sessionId: "agent-1", + parentSessionId: null, + provider: "internal", + missionId: null, + taskId: "task-1", + taskSource: "queue", + agentType: "worker", + status: "running", + spawnedAt: new Date("2026-03-07T10:00:00.000Z"), + completedAt: null, + metadata: {}, + }, + ]; + + prisma.agentSessionTree.findMany.mockResolvedValue(sessions); + prisma.agentSessionTree.count.mockResolvedValue(7); + + const result = await service.listSessions(undefined, 2); + + expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({ + where: undefined, + orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }], + take: 2, + }); + expect(prisma.agentSessionTree.count).toHaveBeenCalledWith(); + expect(result.sessions).toEqual(sessions); + expect(result.total).toBe(7); + expect(result.cursor).toBeTypeOf("string"); + }); + + it("applies cursor filter when provided", async () => { + prisma.agentSessionTree.findMany.mockResolvedValue([]); + prisma.agentSessionTree.count.mockResolvedValue(0); + + const cursorDate = "2026-03-07T10:00:00.000Z"; + const cursorSessionId = "agent-5"; + const cursor = Buffer.from( + JSON.stringify({ + spawnedAt: cursorDate, + sessionId: cursorSessionId, + }), + "utf8" + ).toString("base64url"); + + await service.listSessions(cursor, 25); + + expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({ + where: { + OR: [ + { + spawnedAt: { + lt: new Date(cursorDate), + }, + }, + { + spawnedAt: new Date(cursorDate), + sessionId: { + lt: cursorSessionId, + }, + }, + ], + }, + orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }], + take: 25, + }); + }); + + it("ignores invalid cursor values", async () => { + prisma.agentSessionTree.findMany.mockResolvedValue([]); + prisma.agentSessionTree.count.mockResolvedValue(0); + + await service.listSessions("invalid-cursor", 10); + + expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({ + where: undefined, + orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }], + take: 10, + }); + }); + }); + + describe("getSession", () => { + it("returns matching session entry", async () => { + const session = { + id: "tree-1", + sessionId: "agent-123", + parentSessionId: null, + provider: "internal", + missionId: null, + taskId: "task-1", + taskSource: "queue", + agentType: "worker", + status: "running", + spawnedAt: new Date("2026-03-07T11:00:00.000Z"), + completedAt: null, + metadata: {}, + }; + prisma.agentSessionTree.findUnique.mockResolvedValue(session); + + const result = await service.getSession("agent-123"); + + expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({ + where: { sessionId: "agent-123" }, + }); + expect(result).toEqual(session); + }); + + it("returns null when session does not exist", async () => { + prisma.agentSessionTree.findUnique.mockResolvedValue(null); + + const result = await service.getSession("agent-missing"); + + expect(result).toBeNull(); + }); + }); + describe("getTree", () => { it("returns mapped entries from Prisma", async () => { prisma.agentSessionTree.findMany.mockResolvedValue([ diff --git a/apps/orchestrator/src/api/agents/agent-tree.service.ts b/apps/orchestrator/src/api/agents/agent-tree.service.ts index 6a5ed68..2f724a9 100644 --- a/apps/orchestrator/src/api/agents/agent-tree.service.ts +++ b/apps/orchestrator/src/api/agents/agent-tree.service.ts @@ -1,11 +1,78 @@ import { Injectable } from "@nestjs/common"; -import { PrismaService } from "../../prisma/prisma.service"; +import type { AgentSessionTree, Prisma } from "@prisma/client"; import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto"; +import { PrismaService } from "../../prisma/prisma.service"; + +const DEFAULT_PAGE_LIMIT = 50; +const MAX_PAGE_LIMIT = 200; + +interface SessionCursor { + spawnedAt: Date; + sessionId: string; +} + +export interface AgentSessionTreeListResult { + sessions: AgentSessionTree[]; + total: number; + cursor?: string; +} @Injectable() export class AgentTreeService { constructor(private readonly prisma: PrismaService) {} + async listSessions( + cursor?: string, + limit = DEFAULT_PAGE_LIMIT + ): Promise { + const safeLimit = this.normalizeLimit(limit); + const parsedCursor = this.parseCursor(cursor); + + const where: Prisma.AgentSessionTreeWhereInput | undefined = parsedCursor + ? { + OR: [ + { + spawnedAt: { + lt: parsedCursor.spawnedAt, + }, + }, + { + spawnedAt: parsedCursor.spawnedAt, + sessionId: { + lt: parsedCursor.sessionId, + }, + }, + ], + } + : undefined; + + const [sessions, total] = await Promise.all([ + this.prisma.agentSessionTree.findMany({ + where, + orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }], + take: safeLimit, + }), + this.prisma.agentSessionTree.count(), + ]); + + const nextCursor = + sessions.length === safeLimit + ? this.serializeCursor(sessions[sessions.length - 1]) + : undefined; + + return { + sessions, + total, + ...(nextCursor !== undefined ? { cursor: nextCursor } : {}), + }; + } + + async getSession(sessionId: string): Promise { + return this.prisma.agentSessionTree.findUnique({ + where: { sessionId }, + }); + } + async getTree(): Promise { const entries = await this.prisma.agentSessionTree.findMany({ orderBy: { spawnedAt: "desc" }, @@ -27,4 +94,53 @@ export class AgentTreeService { return response; } + + private normalizeLimit(limit: number): number { + const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_PAGE_LIMIT; + if (normalized < 1) { + return 1; + } + + return Math.min(normalized, MAX_PAGE_LIMIT); + } + + private serializeCursor(entry: Pick): string { + return Buffer.from( + JSON.stringify({ + spawnedAt: entry.spawnedAt.toISOString(), + sessionId: entry.sessionId, + }), + "utf8" + ).toString("base64url"); + } + + private parseCursor(cursor?: string): SessionCursor | null { + if (!cursor) { + return null; + } + + try { + const decoded = Buffer.from(cursor, "base64url").toString("utf8"); + const parsed = JSON.parse(decoded) as { + spawnedAt?: string; + sessionId?: string; + }; + + if (typeof parsed.spawnedAt !== "string" || typeof parsed.sessionId !== "string") { + return null; + } + + const spawnedAt = new Date(parsed.spawnedAt); + if (Number.isNaN(spawnedAt.getTime())) { + return null; + } + + return { + spawnedAt, + sessionId: parsed.sessionId, + }; + } catch { + return null; + } + } } diff --git a/apps/orchestrator/src/api/agents/agents.module.ts b/apps/orchestrator/src/api/agents/agents.module.ts index a1d5d76..16b6857 100644 --- a/apps/orchestrator/src/api/agents/agents.module.ts +++ b/apps/orchestrator/src/api/agents/agents.module.ts @@ -10,6 +10,7 @@ import { PrismaModule } from "../../prisma/prisma.module"; import { AgentMessagesService } from "./agent-messages.service"; import { AgentControlService } from "./agent-control.service"; import { AgentTreeService } from "./agent-tree.service"; +import { InternalAgentProvider } from "./internal-agent.provider"; @Module({ imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule], @@ -20,6 +21,8 @@ import { AgentTreeService } from "./agent-tree.service"; AgentMessagesService, AgentControlService, AgentTreeService, + InternalAgentProvider, ], + exports: [InternalAgentProvider], }) export class AgentsModule {} diff --git a/apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts b/apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts new file mode 100644 index 0000000..97f1ca6 --- /dev/null +++ b/apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts @@ -0,0 +1,216 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client"; +import { AgentControlService } from "./agent-control.service"; +import { AgentMessagesService } from "./agent-messages.service"; +import { AgentTreeService } from "./agent-tree.service"; +import { InternalAgentProvider } from "./internal-agent.provider"; + +describe("InternalAgentProvider", () => { + let provider: InternalAgentProvider; + let messagesService: { + getMessages: ReturnType; + getReplayMessages: ReturnType; + getMessagesAfter: ReturnType; + }; + let controlService: { + injectMessage: ReturnType; + pauseAgent: ReturnType; + resumeAgent: ReturnType; + killAgent: ReturnType; + }; + let treeService: { + listSessions: ReturnType; + getSession: ReturnType; + }; + + beforeEach(() => { + messagesService = { + getMessages: vi.fn(), + getReplayMessages: vi.fn(), + getMessagesAfter: vi.fn(), + }; + + controlService = { + injectMessage: vi.fn().mockResolvedValue(undefined), + pauseAgent: vi.fn().mockResolvedValue(undefined), + resumeAgent: vi.fn().mockResolvedValue(undefined), + killAgent: vi.fn().mockResolvedValue(undefined), + }; + + treeService = { + listSessions: vi.fn(), + getSession: vi.fn(), + }; + + provider = new InternalAgentProvider( + messagesService as unknown as AgentMessagesService, + controlService as unknown as AgentControlService, + treeService as unknown as AgentTreeService + ); + }); + + it("maps paginated sessions", async () => { + const sessionEntry: AgentSessionTree = { + id: "tree-1", + sessionId: "session-1", + parentSessionId: "parent-1", + provider: "internal", + missionId: null, + taskId: "task-123", + taskSource: "queue", + agentType: "worker", + status: "running", + spawnedAt: new Date("2026-03-07T10:00:00.000Z"), + completedAt: null, + metadata: { branch: "feat/test" }, + }; + + treeService.listSessions.mockResolvedValue({ + sessions: [sessionEntry], + total: 1, + cursor: "next-cursor", + }); + + const result = await provider.listSessions("cursor-1", 25); + + expect(treeService.listSessions).toHaveBeenCalledWith("cursor-1", 25); + expect(result).toEqual({ + sessions: [ + { + id: "session-1", + providerId: "internal", + providerType: "internal", + label: "task-123", + status: "active", + parentSessionId: "parent-1", + createdAt: new Date("2026-03-07T10:00:00.000Z"), + updatedAt: new Date("2026-03-07T10:00:00.000Z"), + metadata: { branch: "feat/test" }, + }, + ], + total: 1, + cursor: "next-cursor", + }); + }); + + it("returns null for missing session", async () => { + treeService.getSession.mockResolvedValue(null); + + const result = await provider.getSession("missing-session"); + + expect(treeService.getSession).toHaveBeenCalledWith("missing-session"); + expect(result).toBeNull(); + }); + + it("maps message history and parses skip cursor", async () => { + const message: AgentConversationMessage = { + id: "msg-1", + sessionId: "session-1", + provider: "internal", + role: "agent", + content: "hello", + timestamp: new Date("2026-03-07T10:05:00.000Z"), + metadata: { tokens: 42 }, + }; + + messagesService.getMessages.mockResolvedValue({ + messages: [message], + total: 10, + }); + + const result = await provider.getMessages("session-1", 30, "2"); + + expect(messagesService.getMessages).toHaveBeenCalledWith("session-1", 30, 2); + expect(result).toEqual([ + { + id: "msg-1", + sessionId: "session-1", + role: "assistant", + content: "hello", + timestamp: new Date("2026-03-07T10:05:00.000Z"), + metadata: { tokens: 42 }, + }, + ]); + }); + + it("routes control operations through AgentControlService", async () => { + const injectResult = await provider.injectMessage("session-1", "new instruction"); + + await provider.pauseSession("session-1"); + await provider.resumeSession("session-1"); + await provider.killSession("session-1", false); + + expect(controlService.injectMessage).toHaveBeenCalledWith( + "session-1", + "internal-provider", + "new instruction" + ); + expect(injectResult).toEqual({ accepted: true }); + expect(controlService.pauseAgent).toHaveBeenCalledWith("session-1", "internal-provider"); + expect(controlService.resumeAgent).toHaveBeenCalledWith("session-1", "internal-provider"); + expect(controlService.killAgent).toHaveBeenCalledWith("session-1", "internal-provider", false); + }); + + it("streams replay and incremental messages", async () => { + const replayMessage: AgentConversationMessage = { + id: "msg-replay", + sessionId: "session-1", + provider: "internal", + role: "agent", + content: "replay", + timestamp: new Date("2026-03-07T10:00:00.000Z"), + metadata: {}, + }; + const incrementalMessage: AgentConversationMessage = { + id: "msg-live", + sessionId: "session-1", + provider: "internal", + role: "operator", + content: "live", + timestamp: new Date("2026-03-07T10:00:01.000Z"), + metadata: {}, + }; + + messagesService.getReplayMessages.mockResolvedValue([replayMessage]); + messagesService.getMessagesAfter + .mockResolvedValueOnce([incrementalMessage]) + .mockResolvedValueOnce([]); + + const iterator = provider.streamMessages("session-1")[Symbol.asyncIterator](); + + const first = await iterator.next(); + const second = await iterator.next(); + + expect(first.done).toBe(false); + expect(first.value).toEqual({ + id: "msg-replay", + sessionId: "session-1", + role: "assistant", + content: "replay", + timestamp: new Date("2026-03-07T10:00:00.000Z"), + metadata: {}, + }); + expect(second.done).toBe(false); + expect(second.value).toEqual({ + id: "msg-live", + sessionId: "session-1", + role: "user", + content: "live", + timestamp: new Date("2026-03-07T10:00:01.000Z"), + metadata: {}, + }); + + await iterator.return?.(); + + expect(messagesService.getReplayMessages).toHaveBeenCalledWith("session-1", 50); + expect(messagesService.getMessagesAfter).toHaveBeenCalledWith( + "session-1", + new Date("2026-03-07T10:00:00.000Z"), + "msg-replay" + ); + }); + + it("reports provider availability", async () => { + await expect(provider.isAvailable()).resolves.toBe(true); + }); +}); diff --git a/apps/orchestrator/src/api/agents/internal-agent.provider.ts b/apps/orchestrator/src/api/agents/internal-agent.provider.ts new file mode 100644 index 0000000..48bc9d3 --- /dev/null +++ b/apps/orchestrator/src/api/agents/internal-agent.provider.ts @@ -0,0 +1,218 @@ +import { Injectable } from "@nestjs/common"; +import type { + AgentMessage, + AgentMessageRole, + AgentSession, + AgentSessionList, + AgentSessionStatus, + IAgentProvider, + InjectResult, +} from "@mosaic/shared"; +import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client"; +import { AgentControlService } from "./agent-control.service"; +import { AgentMessagesService } from "./agent-messages.service"; +import { AgentTreeService } from "./agent-tree.service"; + +const DEFAULT_SESSION_LIMIT = 50; +const DEFAULT_MESSAGE_LIMIT = 50; +const MAX_MESSAGE_LIMIT = 200; +const STREAM_POLL_INTERVAL_MS = 1000; +const INTERNAL_OPERATOR_ID = "internal-provider"; + +@Injectable() +export class InternalAgentProvider implements IAgentProvider { + readonly providerId = "internal"; + readonly providerType = "internal"; + readonly displayName = "Internal Orchestrator"; + + constructor( + private readonly messagesService: AgentMessagesService, + private readonly controlService: AgentControlService, + private readonly treeService: AgentTreeService + ) {} + + async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise { + const { + sessions, + total, + cursor: nextCursor, + } = await this.treeService.listSessions(cursor, limit); + + return { + sessions: sessions.map((session) => this.toAgentSession(session)), + total, + ...(nextCursor !== undefined ? { cursor: nextCursor } : {}), + }; + } + + async getSession(sessionId: string): Promise { + const session = await this.treeService.getSession(sessionId); + return session ? this.toAgentSession(session) : null; + } + + async getMessages( + sessionId: string, + limit = DEFAULT_MESSAGE_LIMIT, + before?: string + ): Promise { + const safeLimit = this.normalizeMessageLimit(limit); + const skip = this.parseSkip(before); + + const result = await this.messagesService.getMessages(sessionId, safeLimit, skip); + return result.messages.map((message) => this.toAgentMessage(message)); + } + + async injectMessage(sessionId: string, content: string): Promise { + await this.controlService.injectMessage(sessionId, INTERNAL_OPERATOR_ID, content); + + return { + accepted: true, + }; + } + + async pauseSession(sessionId: string): Promise { + await this.controlService.pauseAgent(sessionId, INTERNAL_OPERATOR_ID); + } + + async resumeSession(sessionId: string): Promise { + await this.controlService.resumeAgent(sessionId, INTERNAL_OPERATOR_ID); + } + + async killSession(sessionId: string, force = true): Promise { + await this.controlService.killAgent(sessionId, INTERNAL_OPERATOR_ID, force); + } + + async *streamMessages(sessionId: string): AsyncIterable { + const replayMessages = await this.messagesService.getReplayMessages( + sessionId, + DEFAULT_MESSAGE_LIMIT + ); + + let lastSeenTimestamp = new Date(); + let lastSeenMessageId: string | null = null; + + for (const message of replayMessages) { + yield this.toAgentMessage(message); + lastSeenTimestamp = message.timestamp; + lastSeenMessageId = message.id; + } + + for (;;) { + const newMessages = await this.messagesService.getMessagesAfter( + sessionId, + lastSeenTimestamp, + lastSeenMessageId + ); + + for (const message of newMessages) { + yield this.toAgentMessage(message); + lastSeenTimestamp = message.timestamp; + lastSeenMessageId = message.id; + } + + await this.delay(STREAM_POLL_INTERVAL_MS); + } + } + + isAvailable(): Promise { + return Promise.resolve(true); + } + + private toAgentSession(session: AgentSessionTree): AgentSession { + const metadata = this.toMetadata(session.metadata); + + return { + id: session.sessionId, + providerId: this.providerId, + providerType: this.providerType, + ...(session.taskId !== null ? { label: session.taskId } : {}), + status: this.toSessionStatus(session.status), + ...(session.parentSessionId !== null ? { parentSessionId: session.parentSessionId } : {}), + createdAt: session.spawnedAt, + updatedAt: session.completedAt ?? session.spawnedAt, + ...(metadata !== undefined ? { metadata } : {}), + }; + } + + private toAgentMessage(message: AgentConversationMessage): AgentMessage { + const metadata = this.toMetadata(message.metadata); + + return { + id: message.id, + sessionId: message.sessionId, + role: this.toMessageRole(message.role), + content: message.content, + timestamp: message.timestamp, + ...(metadata !== undefined ? { metadata } : {}), + }; + } + + private toSessionStatus(status: string): AgentSessionStatus { + switch (status) { + case "running": + return "active"; + case "paused": + return "paused"; + case "completed": + return "completed"; + case "failed": + case "killed": + return "failed"; + case "spawning": + default: + return "idle"; + } + } + + private toMessageRole(role: string): AgentMessageRole { + switch (role) { + case "agent": + case "assistant": + return "assistant"; + case "system": + return "system"; + case "tool": + return "tool"; + case "operator": + case "user": + default: + return "user"; + } + } + + private normalizeMessageLimit(limit: number): number { + const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_MESSAGE_LIMIT; + if (normalized < 1) { + return 1; + } + + return Math.min(normalized, MAX_MESSAGE_LIMIT); + } + + private parseSkip(before?: string): number { + if (!before) { + return 0; + } + + const parsed = Number.parseInt(before, 10); + if (Number.isNaN(parsed) || parsed < 0) { + return 0; + } + + return parsed; + } + + private toMetadata(value: unknown): Record | undefined { + if (value !== null && typeof value === "object" && !Array.isArray(value)) { + return value as Record; + } + + return undefined; + } + + private async delay(ms: number): Promise { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } +}