Compare commits

..

1 Commits

Author SHA1 Message Date
4f3a9b9521 feat(orchestrator): add MS23 per-agent message history and SSE stream endpoints
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
GET /agents/:id/messages - paginated message history
GET /agents/:id/messages/stream - SSE live stream with replay

Partial #693
2026-03-07 11:00:45 -06:00
14 changed files with 12 additions and 322 deletions

View File

@@ -25,14 +25,14 @@ export class AgentIngestionService {
where: { sessionId: agentId },
create: {
sessionId: agentId,
parentSessionId: parentAgentId ?? null,
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,
status: "spawning",
},
update: {
parentSessionId: parentAgentId ?? null,
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,

View File

@@ -1,68 +0,0 @@
import { Injectable } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentControlService {
constructor(private readonly prisma: PrismaService) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
private async createOperatorAuditLog(
agentId: string,
operatorId: string,
action: "inject" | "pause" | "resume",
payload: Record<string, unknown>
): Promise<void> {
await this.prisma.operatorAuditLog.create({
data: {
sessionId: agentId,
userId: operatorId,
provider: "internal",
action,
metadata: this.toJsonValue({ payload }),
},
});
}
async injectMessage(agentId: string, operatorId: string, message: string): Promise<void> {
const treeEntry = await this.prisma.agentSessionTree.findUnique({
where: { sessionId: agentId },
select: { id: true },
});
if (treeEntry) {
await this.prisma.agentConversationMessage.create({
data: {
sessionId: agentId,
role: "operator",
content: message,
provider: "internal",
metadata: this.toJsonValue({}),
},
});
}
await this.createOperatorAuditLog(agentId, operatorId, "inject", { message });
}
async pauseAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "paused" },
});
await this.createOperatorAuditLog(agentId, operatorId, "pause", {});
}
async resumeAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "running" },
});
await this.createOperatorAuditLog(agentId, operatorId, "resume", {});
}
}

View File

@@ -1,30 +0,0 @@
import { Injectable } from "@nestjs/common";
import { PrismaService } from "../../prisma/prisma.service";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
@Injectable()
export class AgentTreeService {
constructor(private readonly prisma: PrismaService) {}
async getTree(): Promise<AgentTreeResponseDto[]> {
const entries = await this.prisma.agentSessionTree.findMany({
orderBy: { spawnedAt: "desc" },
take: 200,
});
const response: AgentTreeResponseDto[] = [];
for (const entry of entries) {
response.push({
sessionId: entry.sessionId,
parentSessionId: entry.parentSessionId ?? null,
status: entry.status,
agentType: entry.agentType ?? null,
taskSource: entry.taskSource ?? null,
spawnedAt: entry.spawnedAt.toISOString(),
completedAt: entry.completedAt?.toISOString() ?? null,
});
}
return response;
}
}

View File

@@ -6,8 +6,6 @@ import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -37,14 +35,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let mockControlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let mockTreeService: {
getTree: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
mockKillswitchService = {
@@ -87,25 +77,13 @@ describe("AgentsController - Killswitch Endpoints", () => {
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
mockControlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
mockTreeService = {
getTree: vi.fn().mockResolvedValue([]),
};
controller = new AgentsController(
mockQueueService as unknown as QueueService,
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService,
mockControlService as unknown as AgentControlService,
mockTreeService as unknown as AgentTreeService
mockMessagesService as unknown as AgentMessagesService
);
});

View File

@@ -5,8 +5,6 @@ import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -38,14 +36,6 @@ describe("AgentsController", () => {
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let controlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let treeService: {
getTree: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -91,16 +81,6 @@ describe("AgentsController", () => {
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
treeService = {
getTree: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
@@ -108,9 +88,7 @@ describe("AgentsController", () => {
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService,
controlService as unknown as AgentControlService,
treeService as unknown as AgentTreeService
messagesService as unknown as AgentMessagesService
);
});
@@ -122,27 +100,6 @@ describe("AgentsController", () => {
expect(controller).toBeDefined();
});
describe("getAgentTree", () => {
it("should return tree entries", async () => {
const entries = [
{
sessionId: "agent-1",
parentSessionId: null,
status: "running",
agentType: "worker",
taskSource: "internal",
spawnedAt: "2026-03-07T00:00:00.000Z",
completedAt: null,
},
];
treeService.getTree.mockResolvedValue(entries);
await expect(controller.getAgentTree()).resolves.toEqual(entries);
expect(treeService.getTree).toHaveBeenCalledTimes(1);
});
});
describe("listAgents", () => {
it("should return empty array when no agents exist", () => {
// Arrange
@@ -421,47 +378,6 @@ describe("AgentsController", () => {
});
});
describe("agent control endpoints", () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
it("should inject an operator message", async () => {
const req = { apiKey: "control-key" };
const result = await controller.injectAgentMessage(
agentId,
{ message: "pause and summarize" },
req
);
expect(controlService.injectMessage).toHaveBeenCalledWith(
agentId,
"control-key",
"pause and summarize"
);
expect(result).toEqual({ message: `Message injected into agent ${agentId}` });
});
it("should default operator id when request api key is missing", async () => {
await controller.injectAgentMessage(agentId, { message: "continue" }, {});
expect(controlService.injectMessage).toHaveBeenCalledWith(agentId, "operator", "continue");
});
it("should pause an agent", async () => {
const result = await controller.pauseAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.pauseAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} paused` });
});
it("should resume an agent", async () => {
const result = await controller.resumeAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.resumeAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} resumed` });
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";

View File

@@ -14,7 +14,6 @@ import {
Sse,
MessageEvent,
Query,
Request,
} from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler";
@@ -29,11 +28,6 @@ import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"
import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
import { InjectAgentDto } from "./dto/inject-agent.dto";
import { PauseAgentDto, ResumeAgentDto } from "./dto/control-agent.dto";
/**
* Controller for agent management endpoints
@@ -57,9 +51,7 @@ export class AgentsController {
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService,
private readonly agentControlService: AgentControlService,
private readonly agentTreeService: AgentTreeService
private readonly messagesService: AgentMessagesService
) {}
/**
@@ -81,7 +73,6 @@ export class AgentsController {
// Spawn agent using spawner service
const spawnResponse = this.spawnerService.spawnAgent({
taskId: dto.taskId,
...(dto.parentAgentId !== undefined ? { parentAgentId: dto.parentAgentId } : {}),
agentType: dto.agentType,
context: dto.context,
});
@@ -156,13 +147,6 @@ export class AgentsController {
};
}
@Get("tree")
@UseGuards(OrchestratorApiKeyGuard)
@Throttle({ default: { limit: 200, ttl: 60000 } })
async getAgentTree(): Promise<AgentTreeResponseDto[]> {
return this.agentTreeService.getTree();
}
/**
* List all agents
* @returns Array of all agent sessions with their status
@@ -390,57 +374,6 @@ export class AgentsController {
}
}
@Post(":agentId/inject")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async injectAgentMessage(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() dto: InjectAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.injectMessage(agentId, operatorId, dto.message);
return {
message: `Message injected into agent ${agentId}`,
};
}
@Post(":agentId/pause")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async pauseAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: PauseAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.pauseAgent(agentId, operatorId);
return {
message: `Agent ${agentId} paused`,
};
}
@Post(":agentId/resume")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async resumeAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: ResumeAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.resumeAgent(agentId, operatorId);
return {
message: `Agent ${agentId} resumed`,
};
}
/**
* Kill all active agents
* @returns Summary of kill operation

View File

@@ -8,18 +8,10 @@ import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
controllers: [AgentsController],
providers: [
OrchestratorApiKeyGuard,
AgentEventsService,
AgentMessagesService,
AgentControlService,
AgentTreeService,
],
providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService],
})
export class AgentsModule {}

View File

@@ -1,9 +0,0 @@
export class AgentTreeResponseDto {
sessionId!: string;
parentSessionId!: string | null;
status!: string;
agentType!: string | null;
taskSource!: string | null;
spawnedAt!: string;
completedAt!: string | null;
}

View File

@@ -1,3 +0,0 @@
export class PauseAgentDto {}
export class ResumeAgentDto {}

View File

@@ -1,7 +0,0 @@
import { IsNotEmpty, IsString } from "class-validator";
export class InjectAgentDto {
@IsString()
@IsNotEmpty()
message!: string;
}

View File

@@ -116,10 +116,6 @@ export class SpawnAgentDto {
@IsOptional()
@IsIn(["strict", "standard", "minimal", "custom"])
gateProfile?: GateProfileType;
@IsOptional()
@IsString()
parentAgentId?: string;
}
/**

View File

@@ -115,13 +115,7 @@ export class AgentSpawnerService implements OnModuleDestroy {
}
void this.agentIngestionService
.recordAgentSpawned(
agentId,
request.parentAgentId,
undefined,
request.taskId,
request.agentType
)
.recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType)
.catch((error: unknown) => {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`);

View File

@@ -40,8 +40,6 @@ export interface SpawnAgentOptions {
export interface SpawnAgentRequest {
/** Unique task identifier */
taskId: string;
/** Optional parent session identifier for subagent lineage */
parentAgentId?: string;
/** Type of agent to spawn */
agentType: AgentType;
/** Context for task execution */

View File

@@ -122,11 +122,11 @@ Target version: `v0.0.23`
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | in-progress | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | — | 20K | — | Phase 0 gate: SSE stream verified via curl |
| MS23-P0-002 | in-progress | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | | 20K | — | |
| MS23-P0-003 | not-started | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | |
| MS23-P0-004 | not-started | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-005 | not-started | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | not-started | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | — | — | — | 20K | — | Phase 0 gate: SSE stream verified via curl |
### Phase 1 — Provider Interface (Plugin Architecture)