Compare commits

..

12 Commits

Author SHA1 Message Date
5a2e404f03 test(orchestrator): add service unit tests for agent services
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 12:02:59 -06:00
76f06d0291 chore(tasks): MS23-P0-005 done, P0-006 in-progress (#715)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:58:20 +00:00
03dd25f028 feat(orchestrator): MS23-P0-005 subagent tree endpoint (#714)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:57:55 +00:00
f3726de54e chore(tasks): MS23-P0-004 done, P0-005 in-progress (#713)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:43:35 +00:00
d0c6622de5 feat(orchestrator): MS23-P0-004 operator inject/pause/resume endpoints (#712)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:43:11 +00:00
4749f52668 chore(tasks): MS23-P0-002 done, P0-003 done, P0-004 in-progress (#711)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:15:57 +00:00
e0b28c91c3 feat(orchestrator): MS23 per-agent message history and SSE stream (#702)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:15:30 +00:00
fa7837af3e fix(orchestrator): rm symlink before schema COPY in Docker builder (#710)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:00:38 +00:00
123cbce5cd fix(orchestrator): copy schema to overwrite dangling symlink in Docker (#709)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:45:54 +00:00
7d47e5ff99 fix(orchestrator): use symlink path ./prisma/schema.prisma in generate script (#708)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:31:49 +00:00
ef674206e7 fix(orchestrator): symlink prisma/schema.prisma to resolve Docker build root detection (#707)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:17:20 +00:00
977747599f fix(orchestrator): local prisma schema copy for Docker generate (#706)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:00:07 +00:00
23 changed files with 1020 additions and 21 deletions

View File

@@ -21,7 +21,7 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/
# (API prisma schema is copied in builder stage directly into orchestrator/prisma/)
# API schema is available via apps/orchestrator/prisma/schema.prisma symlink
# Copy npm configuration for native binary architecture hints
COPY .npmrc ./
@@ -47,11 +47,14 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# Copy API prisma schema into orchestrator's own prisma/ dir.
# Running generate from within the orchestrator package avoids cross-package
# project-root detection issues (Prisma was looking for package.json in /app/apps/).
# The repo has apps/orchestrator/prisma/schema.prisma as a symlink for CI use.
# Kaniko resolves destination symlinks on COPY, which fails because the symlink
# target (../../api/prisma/schema.prisma) doesn't exist in the container.
# Fix: remove the dangling symlink first, then copy the real schema file there.
RUN rm -f apps/orchestrator/prisma/schema.prisma
COPY apps/api/prisma/schema.prisma ./apps/orchestrator/prisma/schema.prisma
RUN cd apps/orchestrator && ./node_modules/.bin/prisma generate --schema=./prisma/schema.prisma
# pnpm turbo build runs prisma:generate (--schema=./prisma/schema.prisma) from the
# orchestrator package context — no cross-package project-root issues.
# Build the orchestrator app using TurboRepo
RUN pnpm turbo build --filter=@mosaic/orchestrator

View File

@@ -52,8 +52,5 @@
"tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2",
"vitest": "^4.0.18"
},
"prisma": {
"schema": "prisma/schema.prisma"
}
}

View File

@@ -0,0 +1 @@
../../api/prisma/schema.prisma

View File

@@ -25,14 +25,14 @@ export class AgentIngestionService {
where: { sessionId: agentId },
create: {
sessionId: agentId,
parentSessionId: parentAgentId,
parentSessionId: parentAgentId ?? null,
missionId,
taskId,
agentType,
status: "spawning",
},
update: {
parentSessionId: parentAgentId,
parentSessionId: parentAgentId ?? null,
missionId,
taskId,
agentType,

View File

@@ -0,0 +1,140 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentControlService } from "./agent-control.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentControlService", () => {
let service: AgentControlService;
let prisma: {
agentSessionTree: {
findUnique: ReturnType<typeof vi.fn>;
updateMany: ReturnType<typeof vi.fn>;
};
agentConversationMessage: {
create: ReturnType<typeof vi.fn>;
};
operatorAuditLog: {
create: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findUnique: vi.fn(),
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
agentConversationMessage: {
create: vi.fn().mockResolvedValue(undefined),
},
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
service = new AgentControlService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("injectMessage", () => {
it("creates conversation message and audit log when tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue({ id: "tree-1" });
await service.injectMessage("agent-123", "operator-abc", "Please continue");
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
where: { sessionId: "agent-123" },
select: { id: true },
});
expect(prisma.agentConversationMessage.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
role: "operator",
content: "Please continue",
provider: "internal",
metadata: {},
},
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
userId: "operator-abc",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Please continue",
},
},
},
});
});
it("creates only audit log when no tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
await service.injectMessage("agent-456", "operator-def", "Nudge message");
expect(prisma.agentConversationMessage.create).not.toHaveBeenCalled();
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-456",
userId: "operator-def",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Nudge message",
},
},
},
});
});
});
describe("pauseAgent", () => {
it("updates tree status to paused and creates audit log", async () => {
await service.pauseAgent("agent-789", "operator-pause");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-789" },
data: { status: "paused" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-789",
userId: "operator-pause",
provider: "internal",
action: "pause",
metadata: {
payload: {},
},
},
});
});
});
describe("resumeAgent", () => {
it("updates tree status to running and creates audit log", async () => {
await service.resumeAgent("agent-321", "operator-resume");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-321" },
data: { status: "running" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-321",
userId: "operator-resume",
provider: "internal",
action: "resume",
metadata: {
payload: {},
},
},
});
});
});
});

View File

@@ -0,0 +1,68 @@
import { Injectable } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentControlService {
constructor(private readonly prisma: PrismaService) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
private async createOperatorAuditLog(
agentId: string,
operatorId: string,
action: "inject" | "pause" | "resume",
payload: Record<string, unknown>
): Promise<void> {
await this.prisma.operatorAuditLog.create({
data: {
sessionId: agentId,
userId: operatorId,
provider: "internal",
action,
metadata: this.toJsonValue({ payload }),
},
});
}
async injectMessage(agentId: string, operatorId: string, message: string): Promise<void> {
const treeEntry = await this.prisma.agentSessionTree.findUnique({
where: { sessionId: agentId },
select: { id: true },
});
if (treeEntry) {
await this.prisma.agentConversationMessage.create({
data: {
sessionId: agentId,
role: "operator",
content: message,
provider: "internal",
metadata: this.toJsonValue({}),
},
});
}
await this.createOperatorAuditLog(agentId, operatorId, "inject", { message });
}
async pauseAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "paused" },
});
await this.createOperatorAuditLog(agentId, operatorId, "pause", {});
}
async resumeAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "running" },
});
await this.createOperatorAuditLog(agentId, operatorId, "resume", {});
}
}

View File

@@ -0,0 +1,103 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentMessagesService } from "./agent-messages.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentMessagesService", () => {
let service: AgentMessagesService;
let prisma: {
agentConversationMessage: {
findMany: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentConversationMessage: {
findMany: vi.fn(),
count: vi.fn(),
},
};
service = new AgentMessagesService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("getMessages", () => {
it("returns paginated messages from Prisma", async () => {
const sessionId = "agent-123";
const messages = [
{
id: "msg-1",
sessionId,
provider: "internal",
role: "assistant",
content: "First message",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
metadata: {},
},
{
id: "msg-2",
sessionId,
provider: "internal",
role: "user",
content: "Second message",
timestamp: new Date("2026-03-07T15:59:00.000Z"),
metadata: {},
},
];
prisma.agentConversationMessage.findMany.mockResolvedValue(messages);
prisma.agentConversationMessage.count.mockResolvedValue(2);
const result = await service.getMessages(sessionId, 50, 0);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: 50,
skip: 0,
});
expect(prisma.agentConversationMessage.count).toHaveBeenCalledWith({ where: { sessionId } });
expect(result).toEqual({
messages,
total: 2,
});
});
it("applies limit and cursor (skip) correctly", async () => {
const sessionId = "agent-456";
const limit = 10;
const cursor = 20;
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(42);
await service.getMessages(sessionId, limit, cursor);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: limit,
skip: cursor,
});
});
it("returns empty messages array when no messages exist", async () => {
const sessionId = "agent-empty";
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(0);
const result = await service.getMessages(sessionId, 25, 0);
expect(result).toEqual({
messages: [],
total: 0,
});
});
});
});

View File

@@ -0,0 +1,84 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -0,0 +1,106 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentTreeService } from "./agent-tree.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentTreeService", () => {
let service: AgentTreeService;
let prisma: {
agentSessionTree: {
findMany: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findMany: vi.fn(),
},
};
service = new AgentTreeService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("getTree", () => {
it("returns mapped entries from Prisma", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-1",
sessionId: "agent-1",
parentSessionId: "agent-root",
provider: "internal",
missionId: "mission-1",
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: new Date("2026-03-07T11:00:00.000Z"),
metadata: {},
},
]);
const result = await service.getTree();
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
orderBy: { spawnedAt: "desc" },
take: 200,
});
expect(result).toEqual([
{
sessionId: "agent-1",
parentSessionId: "agent-root",
status: "running",
agentType: "worker",
taskSource: "queue",
spawnedAt: "2026-03-07T10:00:00.000Z",
completedAt: "2026-03-07T11:00:00.000Z",
},
]);
});
it("returns empty array when no entries exist", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
const result = await service.getTree();
expect(result).toEqual([]);
});
it("maps null parentSessionId and completedAt correctly", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-2",
sessionId: "agent-root",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: null,
taskSource: null,
agentType: null,
status: "spawning",
spawnedAt: new Date("2026-03-07T09:00:00.000Z"),
completedAt: null,
metadata: {},
},
]);
const result = await service.getTree();
expect(result).toEqual([
{
sessionId: "agent-root",
parentSessionId: null,
status: "spawning",
agentType: null,
taskSource: null,
spawnedAt: "2026-03-07T09:00:00.000Z",
completedAt: null,
},
]);
});
});
});

View File

@@ -0,0 +1,30 @@
import { Injectable } from "@nestjs/common";
import { PrismaService } from "../../prisma/prisma.service";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
@Injectable()
export class AgentTreeService {
constructor(private readonly prisma: PrismaService) {}
async getTree(): Promise<AgentTreeResponseDto[]> {
const entries = await this.prisma.agentSessionTree.findMany({
orderBy: { spawnedAt: "desc" },
take: 200,
});
const response: AgentTreeResponseDto[] = [];
for (const entry of entries) {
response.push({
sessionId: entry.sessionId,
parentSessionId: entry.parentSessionId ?? null,
status: entry.status,
agentType: entry.agentType ?? null,
taskSource: entry.taskSource ?? null,
spawnedAt: entry.spawnedAt.toISOString(),
completedAt: entry.completedAt?.toISOString() ?? null,
});
}
return response;
}
}

View File

@@ -5,6 +5,9 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -27,6 +30,20 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let mockControlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let mockTreeService: {
getTree: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -61,6 +78,23 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
mockControlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
mockTreeService = {
getTree: vi.fn().mockResolvedValue([]),
};
controller = new AgentsController(
@@ -68,7 +102,10 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService
mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService,
mockControlService as unknown as AgentControlService,
mockTreeService as unknown as AgentTreeService
);
});

View File

@@ -4,6 +4,9 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -30,6 +33,19 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let controlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let treeService: {
getTree: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -69,13 +85,32 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]),
};
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
treeService = {
getTree: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService
eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService,
controlService as unknown as AgentControlService,
treeService as unknown as AgentTreeService
);
});
@@ -87,6 +122,27 @@ describe("AgentsController", () => {
expect(controller).toBeDefined();
});
describe("getAgentTree", () => {
it("should return tree entries", async () => {
const entries = [
{
sessionId: "agent-1",
parentSessionId: null,
status: "running",
agentType: "worker",
taskSource: "internal",
spawnedAt: "2026-03-07T00:00:00.000Z",
completedAt: null,
},
];
treeService.getTree.mockResolvedValue(entries);
await expect(controller.getAgentTree()).resolves.toEqual(entries);
expect(treeService.getTree).toHaveBeenCalledTimes(1);
});
});
describe("listAgents", () => {
it("should return empty array when no agents exist", () => {
// Arrange
@@ -365,6 +421,93 @@ describe("AgentsController", () => {
});
});
describe("agent control endpoints", () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
it("should inject an operator message", async () => {
const req = { apiKey: "control-key" };
const result = await controller.injectAgentMessage(
agentId,
{ message: "pause and summarize" },
req
);
expect(controlService.injectMessage).toHaveBeenCalledWith(
agentId,
"control-key",
"pause and summarize"
);
expect(result).toEqual({ message: `Message injected into agent ${agentId}` });
});
it("should default operator id when request api key is missing", async () => {
await controller.injectAgentMessage(agentId, { message: "continue" }, {});
expect(controlService.injectMessage).toHaveBeenCalledWith(agentId, "operator", "continue");
});
it("should pause an agent", async () => {
const result = await controller.pauseAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.pauseAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} paused` });
});
it("should resume an agent", async () => {
const result = await controller.resumeAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.resumeAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} resumed` });
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([

View File

@@ -14,7 +14,9 @@ import {
Sse,
MessageEvent,
Query,
Request,
} from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
@@ -25,6 +27,13 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
import { InjectAgentDto } from "./dto/inject-agent.dto";
import { PauseAgentDto, ResumeAgentDto } from "./dto/control-agent.dto";
/**
* Controller for agent management endpoints
@@ -47,7 +56,10 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService
private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService,
private readonly agentControlService: AgentControlService,
private readonly agentTreeService: AgentTreeService
) {}
/**
@@ -69,6 +81,7 @@ export class AgentsController {
// Spawn agent using spawner service
const spawnResponse = this.spawnerService.spawnAgent({
taskId: dto.taskId,
...(dto.parentAgentId !== undefined ? { parentAgentId: dto.parentAgentId } : {}),
agentType: dto.agentType,
context: dto.context,
});
@@ -143,6 +156,13 @@ export class AgentsController {
};
}
@Get("tree")
@UseGuards(OrchestratorApiKeyGuard)
@Throttle({ default: { limit: 200, ttl: 60000 } })
async getAgentTree(): Promise<AgentTreeResponseDto[]> {
return this.agentTreeService.getTree();
}
/**
* List all agents
* @returns Array of all agent sessions with their status
@@ -185,6 +205,107 @@ export class AgentsController {
}
}
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/**
* Get agent status
* @param agentId Agent ID to query
@@ -269,6 +390,57 @@ export class AgentsController {
}
}
@Post(":agentId/inject")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async injectAgentMessage(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() dto: InjectAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.injectMessage(agentId, operatorId, dto.message);
return {
message: `Message injected into agent ${agentId}`,
};
}
@Post(":agentId/pause")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async pauseAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: PauseAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.pauseAgent(agentId, operatorId);
return {
message: `Agent ${agentId} paused`,
};
}
@Post(":agentId/resume")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async resumeAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: ResumeAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.resumeAgent(agentId, operatorId);
return {
message: `Agent ${agentId} resumed`,
};
}
/**
* Kill all active agents
* @returns Summary of kill operation
@@ -301,4 +473,24 @@ export class AgentsController {
throw error;
}
}
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
}

View File

@@ -6,10 +6,20 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService],
providers: [
OrchestratorApiKeyGuard,
AgentEventsService,
AgentMessagesService,
AgentControlService,
AgentTreeService,
],
})
export class AgentsModule {}

View File

@@ -0,0 +1,9 @@
export class AgentTreeResponseDto {
sessionId!: string;
parentSessionId!: string | null;
status!: string;
agentType!: string | null;
taskSource!: string | null;
spawnedAt!: string;
completedAt!: string | null;
}

View File

@@ -0,0 +1,3 @@
export class PauseAgentDto {}
export class ResumeAgentDto {}

View File

@@ -0,0 +1,37 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -0,0 +1,17 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -0,0 +1,7 @@
import { IsNotEmpty, IsString } from "class-validator";
export class InjectAgentDto {
@IsString()
@IsNotEmpty()
message!: string;
}

View File

@@ -116,6 +116,10 @@ export class SpawnAgentDto {
@IsOptional()
@IsIn(["strict", "standard", "minimal", "custom"])
gateProfile?: GateProfileType;
@IsOptional()
@IsString()
parentAgentId?: string;
}
/**

View File

@@ -115,7 +115,13 @@ export class AgentSpawnerService implements OnModuleDestroy {
}
void this.agentIngestionService
.recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType)
.recordAgentSpawned(
agentId,
request.parentAgentId,
undefined,
request.taskId,
request.agentType
)
.catch((error: unknown) => {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`);

View File

@@ -40,6 +40,8 @@ export interface SpawnAgentOptions {
export interface SpawnAgentRequest {
/** Unique task identifier */
taskId: string;
/** Optional parent session identifier for subagent lineage */
parentAgentId?: string;
/** Type of agent to spawn */
agentType: AgentType;
/** Context for task execution */

View File

@@ -122,11 +122,11 @@ Target version: `v0.0.23`
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | in-progress | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | | 20K | — | |
| MS23-P0-003 | not-started | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | |
| MS23-P0-004 | not-started | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-005 | not-started | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | not-started | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | — | — | — | 20K | — | Phase 0 gate: SSE stream verified via curl |
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | in-progress | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | — | 20K | — | Phase 0 gate: SSE stream verified via curl |
### Phase 1 — Provider Interface (Plugin Architecture)