Compare commits

..

6 Commits

Author SHA1 Message Date
fddbfef0be feat(orchestrator): add internal agent provider adapter
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 12:42:50 -06:00
364619b332 feat(shared): MS23-P1-001 IAgentProvider interface + agent types (#718)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:29:18 +00:00
18ed3a5411 chore(tasks): MS23 Phase 0 complete — all 6 P0 tasks done (#717)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:13:12 +00:00
79ff3a921f test(orchestrator): MS23-P0-006 service unit tests for AgentMessages/Control/Tree (#716)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:12:47 +00:00
76f06d0291 chore(tasks): MS23-P0-005 done, P0-006 in-progress (#715)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:58:20 +00:00
03dd25f028 feat(orchestrator): MS23-P0-005 subagent tree endpoint (#714)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:57:55 +00:00
11 changed files with 1168 additions and 5 deletions

View File

@@ -0,0 +1,172 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentControlService } from "./agent-control.service";
import { PrismaService } from "../../prisma/prisma.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
describe("AgentControlService", () => {
let service: AgentControlService;
let prisma: {
agentSessionTree: {
findUnique: ReturnType<typeof vi.fn>;
updateMany: ReturnType<typeof vi.fn>;
};
agentConversationMessage: {
create: ReturnType<typeof vi.fn>;
};
operatorAuditLog: {
create: ReturnType<typeof vi.fn>;
};
};
let killswitchService: {
killAgent: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findUnique: vi.fn(),
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
agentConversationMessage: {
create: vi.fn().mockResolvedValue(undefined),
},
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
killswitchService = {
killAgent: vi.fn().mockResolvedValue(undefined),
};
service = new AgentControlService(
prisma as unknown as PrismaService,
killswitchService as unknown as KillswitchService
);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("injectMessage", () => {
it("creates conversation message and audit log when tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue({ id: "tree-1" });
await service.injectMessage("agent-123", "operator-abc", "Please continue");
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
where: { sessionId: "agent-123" },
select: { id: true },
});
expect(prisma.agentConversationMessage.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
role: "operator",
content: "Please continue",
provider: "internal",
metadata: {},
},
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
userId: "operator-abc",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Please continue",
},
},
},
});
});
it("creates only audit log when no tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
await service.injectMessage("agent-456", "operator-def", "Nudge message");
expect(prisma.agentConversationMessage.create).not.toHaveBeenCalled();
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-456",
userId: "operator-def",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Nudge message",
},
},
},
});
});
});
describe("pauseAgent", () => {
it("updates tree status to paused and creates audit log", async () => {
await service.pauseAgent("agent-789", "operator-pause");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-789" },
data: { status: "paused" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-789",
userId: "operator-pause",
provider: "internal",
action: "pause",
metadata: {
payload: {},
},
},
});
});
});
describe("resumeAgent", () => {
it("updates tree status to running and creates audit log", async () => {
await service.resumeAgent("agent-321", "operator-resume");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-321" },
data: { status: "running" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-321",
userId: "operator-resume",
provider: "internal",
action: "resume",
metadata: {
payload: {},
},
},
});
});
});
describe("killAgent", () => {
it("delegates kill to killswitch and logs audit", async () => {
await service.killAgent("agent-654", "operator-kill", false);
expect(killswitchService.killAgent).toHaveBeenCalledWith("agent-654");
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-654",
userId: "operator-kill",
provider: "internal",
action: "kill",
metadata: {
payload: {
force: false,
},
},
},
});
});
});
});

View File

@@ -1,10 +1,14 @@
import { Injectable } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentControlService {
constructor(private readonly prisma: PrismaService) {}
constructor(
private readonly prisma: PrismaService,
private readonly killswitchService: KillswitchService
) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
@@ -13,7 +17,7 @@ export class AgentControlService {
private async createOperatorAuditLog(
agentId: string,
operatorId: string,
action: "inject" | "pause" | "resume",
action: "inject" | "pause" | "resume" | "kill",
payload: Record<string, unknown>
): Promise<void> {
await this.prisma.operatorAuditLog.create({
@@ -65,4 +69,9 @@ export class AgentControlService {
await this.createOperatorAuditLog(agentId, operatorId, "resume", {});
}
async killAgent(agentId: string, operatorId: string, force = true): Promise<void> {
await this.killswitchService.killAgent(agentId);
await this.createOperatorAuditLog(agentId, operatorId, "kill", { force });
}
}

View File

@@ -0,0 +1,103 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentMessagesService } from "./agent-messages.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentMessagesService", () => {
let service: AgentMessagesService;
let prisma: {
agentConversationMessage: {
findMany: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentConversationMessage: {
findMany: vi.fn(),
count: vi.fn(),
},
};
service = new AgentMessagesService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("getMessages", () => {
it("returns paginated messages from Prisma", async () => {
const sessionId = "agent-123";
const messages = [
{
id: "msg-1",
sessionId,
provider: "internal",
role: "assistant",
content: "First message",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
metadata: {},
},
{
id: "msg-2",
sessionId,
provider: "internal",
role: "user",
content: "Second message",
timestamp: new Date("2026-03-07T15:59:00.000Z"),
metadata: {},
},
];
prisma.agentConversationMessage.findMany.mockResolvedValue(messages);
prisma.agentConversationMessage.count.mockResolvedValue(2);
const result = await service.getMessages(sessionId, 50, 0);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: 50,
skip: 0,
});
expect(prisma.agentConversationMessage.count).toHaveBeenCalledWith({ where: { sessionId } });
expect(result).toEqual({
messages,
total: 2,
});
});
it("applies limit and cursor (skip) correctly", async () => {
const sessionId = "agent-456";
const limit = 10;
const cursor = 20;
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(42);
await service.getMessages(sessionId, limit, cursor);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: limit,
skip: cursor,
});
});
it("returns empty messages array when no messages exist", async () => {
const sessionId = "agent-empty";
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(0);
const result = await service.getMessages(sessionId, 25, 0);
expect(result).toEqual({
messages: [],
total: 0,
});
});
});
});

View File

@@ -0,0 +1,245 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentTreeService } from "./agent-tree.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentTreeService", () => {
let service: AgentTreeService;
let prisma: {
agentSessionTree: {
findMany: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
findUnique: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findMany: vi.fn(),
count: vi.fn(),
findUnique: vi.fn(),
},
};
service = new AgentTreeService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("listSessions", () => {
it("returns paginated sessions and cursor", async () => {
const sessions = [
{
id: "tree-2",
sessionId: "agent-2",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-2",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
completedAt: null,
metadata: {},
},
{
id: "tree-1",
sessionId: "agent-1",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: null,
metadata: {},
},
];
prisma.agentSessionTree.findMany.mockResolvedValue(sessions);
prisma.agentSessionTree.count.mockResolvedValue(7);
const result = await service.listSessions(undefined, 2);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: undefined,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 2,
});
expect(prisma.agentSessionTree.count).toHaveBeenCalledWith();
expect(result.sessions).toEqual(sessions);
expect(result.total).toBe(7);
expect(result.cursor).toBeTypeOf("string");
});
it("applies cursor filter when provided", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
prisma.agentSessionTree.count.mockResolvedValue(0);
const cursorDate = "2026-03-07T10:00:00.000Z";
const cursorSessionId = "agent-5";
const cursor = Buffer.from(
JSON.stringify({
spawnedAt: cursorDate,
sessionId: cursorSessionId,
}),
"utf8"
).toString("base64url");
await service.listSessions(cursor, 25);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: {
OR: [
{
spawnedAt: {
lt: new Date(cursorDate),
},
},
{
spawnedAt: new Date(cursorDate),
sessionId: {
lt: cursorSessionId,
},
},
],
},
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 25,
});
});
it("ignores invalid cursor values", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
prisma.agentSessionTree.count.mockResolvedValue(0);
await service.listSessions("invalid-cursor", 10);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: undefined,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 10,
});
});
});
describe("getSession", () => {
it("returns matching session entry", async () => {
const session = {
id: "tree-1",
sessionId: "agent-123",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
completedAt: null,
metadata: {},
};
prisma.agentSessionTree.findUnique.mockResolvedValue(session);
const result = await service.getSession("agent-123");
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
where: { sessionId: "agent-123" },
});
expect(result).toEqual(session);
});
it("returns null when session does not exist", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
const result = await service.getSession("agent-missing");
expect(result).toBeNull();
});
});
describe("getTree", () => {
it("returns mapped entries from Prisma", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-1",
sessionId: "agent-1",
parentSessionId: "agent-root",
provider: "internal",
missionId: "mission-1",
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: new Date("2026-03-07T11:00:00.000Z"),
metadata: {},
},
]);
const result = await service.getTree();
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
orderBy: { spawnedAt: "desc" },
take: 200,
});
expect(result).toEqual([
{
sessionId: "agent-1",
parentSessionId: "agent-root",
status: "running",
agentType: "worker",
taskSource: "queue",
spawnedAt: "2026-03-07T10:00:00.000Z",
completedAt: "2026-03-07T11:00:00.000Z",
},
]);
});
it("returns empty array when no entries exist", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
const result = await service.getTree();
expect(result).toEqual([]);
});
it("maps null parentSessionId and completedAt correctly", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-2",
sessionId: "agent-root",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: null,
taskSource: null,
agentType: null,
status: "spawning",
spawnedAt: new Date("2026-03-07T09:00:00.000Z"),
completedAt: null,
metadata: {},
},
]);
const result = await service.getTree();
expect(result).toEqual([
{
sessionId: "agent-root",
parentSessionId: null,
status: "spawning",
agentType: null,
taskSource: null,
spawnedAt: "2026-03-07T09:00:00.000Z",
completedAt: null,
},
]);
});
});
});

View File

@@ -1,11 +1,78 @@
import { Injectable } from "@nestjs/common";
import { PrismaService } from "../../prisma/prisma.service";
import type { AgentSessionTree, Prisma } from "@prisma/client";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
import { PrismaService } from "../../prisma/prisma.service";
const DEFAULT_PAGE_LIMIT = 50;
const MAX_PAGE_LIMIT = 200;
interface SessionCursor {
spawnedAt: Date;
sessionId: string;
}
export interface AgentSessionTreeListResult {
sessions: AgentSessionTree[];
total: number;
cursor?: string;
}
@Injectable()
export class AgentTreeService {
constructor(private readonly prisma: PrismaService) {}
async listSessions(
cursor?: string,
limit = DEFAULT_PAGE_LIMIT
): Promise<AgentSessionTreeListResult> {
const safeLimit = this.normalizeLimit(limit);
const parsedCursor = this.parseCursor(cursor);
const where: Prisma.AgentSessionTreeWhereInput | undefined = parsedCursor
? {
OR: [
{
spawnedAt: {
lt: parsedCursor.spawnedAt,
},
},
{
spawnedAt: parsedCursor.spawnedAt,
sessionId: {
lt: parsedCursor.sessionId,
},
},
],
}
: undefined;
const [sessions, total] = await Promise.all([
this.prisma.agentSessionTree.findMany({
where,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: safeLimit,
}),
this.prisma.agentSessionTree.count(),
]);
const nextCursor =
sessions.length === safeLimit
? this.serializeCursor(sessions[sessions.length - 1])
: undefined;
return {
sessions,
total,
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
};
}
async getSession(sessionId: string): Promise<AgentSessionTree | null> {
return this.prisma.agentSessionTree.findUnique({
where: { sessionId },
});
}
async getTree(): Promise<AgentTreeResponseDto[]> {
const entries = await this.prisma.agentSessionTree.findMany({
orderBy: { spawnedAt: "desc" },
@@ -27,4 +94,53 @@ export class AgentTreeService {
return response;
}
private normalizeLimit(limit: number): number {
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_PAGE_LIMIT;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_PAGE_LIMIT);
}
private serializeCursor(entry: Pick<AgentSessionTree, "spawnedAt" | "sessionId">): string {
return Buffer.from(
JSON.stringify({
spawnedAt: entry.spawnedAt.toISOString(),
sessionId: entry.sessionId,
}),
"utf8"
).toString("base64url");
}
private parseCursor(cursor?: string): SessionCursor | null {
if (!cursor) {
return null;
}
try {
const decoded = Buffer.from(cursor, "base64url").toString("utf8");
const parsed = JSON.parse(decoded) as {
spawnedAt?: string;
sessionId?: string;
};
if (typeof parsed.spawnedAt !== "string" || typeof parsed.sessionId !== "string") {
return null;
}
const spawnedAt = new Date(parsed.spawnedAt);
if (Number.isNaN(spawnedAt.getTime())) {
return null;
}
return {
spawnedAt,
sessionId: parsed.sessionId,
};
} catch {
return null;
}
}
}

View File

@@ -10,6 +10,7 @@ import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { InternalAgentProvider } from "./internal-agent.provider";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
@@ -20,6 +21,8 @@ import { AgentTreeService } from "./agent-tree.service";
AgentMessagesService,
AgentControlService,
AgentTreeService,
InternalAgentProvider,
],
exports: [InternalAgentProvider],
})
export class AgentsModule {}

View File

@@ -0,0 +1,216 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
import { AgentControlService } from "./agent-control.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentTreeService } from "./agent-tree.service";
import { InternalAgentProvider } from "./internal-agent.provider";
describe("InternalAgentProvider", () => {
let provider: InternalAgentProvider;
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let controlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
killAgent: ReturnType<typeof vi.fn>;
};
let treeService: {
listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn(),
getMessagesAfter: vi.fn(),
};
controlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
killAgent: vi.fn().mockResolvedValue(undefined),
};
treeService = {
listSessions: vi.fn(),
getSession: vi.fn(),
};
provider = new InternalAgentProvider(
messagesService as unknown as AgentMessagesService,
controlService as unknown as AgentControlService,
treeService as unknown as AgentTreeService
);
});
it("maps paginated sessions", async () => {
const sessionEntry: AgentSessionTree = {
id: "tree-1",
sessionId: "session-1",
parentSessionId: "parent-1",
provider: "internal",
missionId: null,
taskId: "task-123",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: null,
metadata: { branch: "feat/test" },
};
treeService.listSessions.mockResolvedValue({
sessions: [sessionEntry],
total: 1,
cursor: "next-cursor",
});
const result = await provider.listSessions("cursor-1", 25);
expect(treeService.listSessions).toHaveBeenCalledWith("cursor-1", 25);
expect(result).toEqual({
sessions: [
{
id: "session-1",
providerId: "internal",
providerType: "internal",
label: "task-123",
status: "active",
parentSessionId: "parent-1",
createdAt: new Date("2026-03-07T10:00:00.000Z"),
updatedAt: new Date("2026-03-07T10:00:00.000Z"),
metadata: { branch: "feat/test" },
},
],
total: 1,
cursor: "next-cursor",
});
});
it("returns null for missing session", async () => {
treeService.getSession.mockResolvedValue(null);
const result = await provider.getSession("missing-session");
expect(treeService.getSession).toHaveBeenCalledWith("missing-session");
expect(result).toBeNull();
});
it("maps message history and parses skip cursor", async () => {
const message: AgentConversationMessage = {
id: "msg-1",
sessionId: "session-1",
provider: "internal",
role: "agent",
content: "hello",
timestamp: new Date("2026-03-07T10:05:00.000Z"),
metadata: { tokens: 42 },
};
messagesService.getMessages.mockResolvedValue({
messages: [message],
total: 10,
});
const result = await provider.getMessages("session-1", 30, "2");
expect(messagesService.getMessages).toHaveBeenCalledWith("session-1", 30, 2);
expect(result).toEqual([
{
id: "msg-1",
sessionId: "session-1",
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T10:05:00.000Z"),
metadata: { tokens: 42 },
},
]);
});
it("routes control operations through AgentControlService", async () => {
const injectResult = await provider.injectMessage("session-1", "new instruction");
await provider.pauseSession("session-1");
await provider.resumeSession("session-1");
await provider.killSession("session-1", false);
expect(controlService.injectMessage).toHaveBeenCalledWith(
"session-1",
"internal-provider",
"new instruction"
);
expect(injectResult).toEqual({ accepted: true });
expect(controlService.pauseAgent).toHaveBeenCalledWith("session-1", "internal-provider");
expect(controlService.resumeAgent).toHaveBeenCalledWith("session-1", "internal-provider");
expect(controlService.killAgent).toHaveBeenCalledWith("session-1", "internal-provider", false);
});
it("streams replay and incremental messages", async () => {
const replayMessage: AgentConversationMessage = {
id: "msg-replay",
sessionId: "session-1",
provider: "internal",
role: "agent",
content: "replay",
timestamp: new Date("2026-03-07T10:00:00.000Z"),
metadata: {},
};
const incrementalMessage: AgentConversationMessage = {
id: "msg-live",
sessionId: "session-1",
provider: "internal",
role: "operator",
content: "live",
timestamp: new Date("2026-03-07T10:00:01.000Z"),
metadata: {},
};
messagesService.getReplayMessages.mockResolvedValue([replayMessage]);
messagesService.getMessagesAfter
.mockResolvedValueOnce([incrementalMessage])
.mockResolvedValueOnce([]);
const iterator = provider.streamMessages("session-1")[Symbol.asyncIterator]();
const first = await iterator.next();
const second = await iterator.next();
expect(first.done).toBe(false);
expect(first.value).toEqual({
id: "msg-replay",
sessionId: "session-1",
role: "assistant",
content: "replay",
timestamp: new Date("2026-03-07T10:00:00.000Z"),
metadata: {},
});
expect(second.done).toBe(false);
expect(second.value).toEqual({
id: "msg-live",
sessionId: "session-1",
role: "user",
content: "live",
timestamp: new Date("2026-03-07T10:00:01.000Z"),
metadata: {},
});
await iterator.return?.();
expect(messagesService.getReplayMessages).toHaveBeenCalledWith("session-1", 50);
expect(messagesService.getMessagesAfter).toHaveBeenCalledWith(
"session-1",
new Date("2026-03-07T10:00:00.000Z"),
"msg-replay"
);
});
it("reports provider availability", async () => {
await expect(provider.isAvailable()).resolves.toBe(true);
});
});

View File

@@ -0,0 +1,218 @@
import { Injectable } from "@nestjs/common";
import type {
AgentMessage,
AgentMessageRole,
AgentSession,
AgentSessionList,
AgentSessionStatus,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
import { AgentControlService } from "./agent-control.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentTreeService } from "./agent-tree.service";
const DEFAULT_SESSION_LIMIT = 50;
const DEFAULT_MESSAGE_LIMIT = 50;
const MAX_MESSAGE_LIMIT = 200;
const STREAM_POLL_INTERVAL_MS = 1000;
const INTERNAL_OPERATOR_ID = "internal-provider";
@Injectable()
export class InternalAgentProvider implements IAgentProvider {
readonly providerId = "internal";
readonly providerType = "internal";
readonly displayName = "Internal Orchestrator";
constructor(
private readonly messagesService: AgentMessagesService,
private readonly controlService: AgentControlService,
private readonly treeService: AgentTreeService
) {}
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
const {
sessions,
total,
cursor: nextCursor,
} = await this.treeService.listSessions(cursor, limit);
return {
sessions: sessions.map((session) => this.toAgentSession(session)),
total,
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
};
}
async getSession(sessionId: string): Promise<AgentSession | null> {
const session = await this.treeService.getSession(sessionId);
return session ? this.toAgentSession(session) : null;
}
async getMessages(
sessionId: string,
limit = DEFAULT_MESSAGE_LIMIT,
before?: string
): Promise<AgentMessage[]> {
const safeLimit = this.normalizeMessageLimit(limit);
const skip = this.parseSkip(before);
const result = await this.messagesService.getMessages(sessionId, safeLimit, skip);
return result.messages.map((message) => this.toAgentMessage(message));
}
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
await this.controlService.injectMessage(sessionId, INTERNAL_OPERATOR_ID, content);
return {
accepted: true,
};
}
async pauseSession(sessionId: string): Promise<void> {
await this.controlService.pauseAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async resumeSession(sessionId: string): Promise<void> {
await this.controlService.resumeAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async killSession(sessionId: string, force = true): Promise<void> {
await this.controlService.killAgent(sessionId, INTERNAL_OPERATOR_ID, force);
}
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
const replayMessages = await this.messagesService.getReplayMessages(
sessionId,
DEFAULT_MESSAGE_LIMIT
);
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
for (const message of replayMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
for (;;) {
const newMessages = await this.messagesService.getMessagesAfter(
sessionId,
lastSeenTimestamp,
lastSeenMessageId
);
for (const message of newMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
await this.delay(STREAM_POLL_INTERVAL_MS);
}
}
isAvailable(): Promise<boolean> {
return Promise.resolve(true);
}
private toAgentSession(session: AgentSessionTree): AgentSession {
const metadata = this.toMetadata(session.metadata);
return {
id: session.sessionId,
providerId: this.providerId,
providerType: this.providerType,
...(session.taskId !== null ? { label: session.taskId } : {}),
status: this.toSessionStatus(session.status),
...(session.parentSessionId !== null ? { parentSessionId: session.parentSessionId } : {}),
createdAt: session.spawnedAt,
updatedAt: session.completedAt ?? session.spawnedAt,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toAgentMessage(message: AgentConversationMessage): AgentMessage {
const metadata = this.toMetadata(message.metadata);
return {
id: message.id,
sessionId: message.sessionId,
role: this.toMessageRole(message.role),
content: message.content,
timestamp: message.timestamp,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toSessionStatus(status: string): AgentSessionStatus {
switch (status) {
case "running":
return "active";
case "paused":
return "paused";
case "completed":
return "completed";
case "failed":
case "killed":
return "failed";
case "spawning":
default:
return "idle";
}
}
private toMessageRole(role: string): AgentMessageRole {
switch (role) {
case "agent":
case "assistant":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private normalizeMessageLimit(limit: number): number {
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_MESSAGE_LIMIT;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_MESSAGE_LIMIT);
}
private parseSkip(before?: string): number {
if (!before) {
return 0;
}
const parsed = Number.parseInt(before, 10);
if (Number.isNaN(parsed) || parsed < 0) {
return 0;
}
return parsed;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (value !== null && typeof value === "object" && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return undefined;
}
private async delay(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}

View File

@@ -125,8 +125,8 @@ Target version: `v0.0.23`
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
| MS23-P0-005 | in-progress | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | not-started | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | — | — | — | 20K | — | Phase 0 gate: SSE stream verified via curl |
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | done | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | 2026-03-07 | 20K | — | Phase 0 gate: SSE stream verified via curl |
### Phase 1 — Provider Interface (Plugin Architecture)

View File

@@ -0,0 +1,78 @@
// Agent message roles
export type AgentMessageRole = "user" | "assistant" | "system" | "tool";
// A single message in an agent conversation
export interface AgentMessage {
id: string;
sessionId: string;
role: AgentMessageRole;
content: string;
timestamp: Date;
metadata?: Record<string, unknown>;
}
// Session lifecycle status
export type AgentSessionStatus = "active" | "paused" | "completed" | "failed" | "idle";
// An agent session (conversation thread)
export interface AgentSession {
id: string;
providerId: string; // which provider owns this session
providerType: string; // "internal" | "openclaw" | etc.
label?: string;
status: AgentSessionStatus;
parentSessionId?: string; // for subagent trees
createdAt: Date;
updatedAt: Date;
metadata?: Record<string, unknown>;
}
// Result of listing sessions
export interface AgentSessionList {
sessions: AgentSession[];
total: number;
cursor?: string;
}
// Result of injecting a message
export interface InjectResult {
accepted: boolean;
messageId?: string;
}
// The IAgentProvider interface — every provider (internal, OpenClaw, future) implements this
export interface IAgentProvider {
readonly providerId: string;
readonly providerType: string;
readonly displayName: string;
// Session management
listSessions(cursor?: string, limit?: number): Promise<AgentSessionList>;
getSession(sessionId: string): Promise<AgentSession | null>;
getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]>;
// Control operations
injectMessage(sessionId: string, content: string): Promise<InjectResult>;
pauseSession(sessionId: string): Promise<void>;
resumeSession(sessionId: string): Promise<void>;
killSession(sessionId: string, force?: boolean): Promise<void>;
// SSE streaming — returns an AsyncIterable of AgentMessage events
streamMessages(sessionId: string): AsyncIterable<AgentMessage>;
// Health
isAvailable(): Promise<boolean>;
}
// Provider configuration stored in DB (AgentProviderConfig model from P0 schema)
export interface AgentProviderConfig {
id: string;
providerId: string;
providerType: string;
displayName: string;
baseUrl?: string;
apiToken?: string;
enabled: boolean;
createdAt: Date;
updatedAt: Date;
}

View File

@@ -134,3 +134,6 @@ export * from "./widget.types";
// Export WebSocket types
export * from "./websocket.types";
// Export agent provider types
export * from "./agent-provider.types";