From 1cf98a1729bcf550a1cd2f8409e04f7a5403b898 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 22:31:51 -0600 Subject: [PATCH] feat(orchestrator): add MS23 per-agent message history and SSE stream endpoints GET /agents/:id/messages - paginated message history GET /agents/:id/messages/stream - SSE live stream with replay Partial #693 --- .../src/api/agents/agent-messages.service.ts | 84 ++++++++++++ .../agents-killswitch.controller.spec.ts | 17 ++- .../src/api/agents/agents.controller.spec.ts | 61 ++++++++- .../src/api/agents/agents.controller.ts | 127 +++++++++++++++++- .../src/api/agents/agents.module.ts | 6 +- .../agents/dto/get-messages-query.dto.spec.ts | 37 +++++ .../api/agents/dto/get-messages-query.dto.ts | 17 +++ 7 files changed, 344 insertions(+), 5 deletions(-) create mode 100644 apps/orchestrator/src/api/agents/agent-messages.service.ts create mode 100644 apps/orchestrator/src/api/agents/dto/get-messages-query.dto.spec.ts create mode 100644 apps/orchestrator/src/api/agents/dto/get-messages-query.dto.ts diff --git a/apps/orchestrator/src/api/agents/agent-messages.service.ts b/apps/orchestrator/src/api/agents/agent-messages.service.ts new file mode 100644 index 0000000..e6a9da2 --- /dev/null +++ b/apps/orchestrator/src/api/agents/agent-messages.service.ts @@ -0,0 +1,84 @@ +import { Injectable } from "@nestjs/common"; +import { type AgentConversationMessage, type Prisma } from "@prisma/client"; +import { PrismaService } from "../../prisma/prisma.service"; + +@Injectable() +export class AgentMessagesService { + constructor(private readonly prisma: PrismaService) {} + + async getMessages( + sessionId: string, + limit: number, + skip: number + ): Promise<{ + messages: AgentConversationMessage[]; + total: number; + }> { + const where = { sessionId }; + + const [messages, total] = await Promise.all([ + this.prisma.agentConversationMessage.findMany({ + where, + orderBy: { + timestamp: "desc", + }, + take: limit, + skip, + }), + this.prisma.agentConversationMessage.count({ where }), + ]); + + return { + messages, + total, + }; + } + + async getReplayMessages(sessionId: string, limit = 50): Promise { + const messages = await this.prisma.agentConversationMessage.findMany({ + where: { sessionId }, + orderBy: { + timestamp: "desc", + }, + take: limit, + }); + + return messages.reverse(); + } + + async getMessagesAfter( + sessionId: string, + lastSeenTimestamp: Date, + lastSeenMessageId: string | null + ): Promise { + const where: Prisma.AgentConversationMessageWhereInput = { + sessionId, + ...(lastSeenMessageId + ? { + OR: [ + { + timestamp: { + gt: lastSeenTimestamp, + }, + }, + { + timestamp: lastSeenTimestamp, + id: { + gt: lastSeenMessageId, + }, + }, + ], + } + : { + timestamp: { + gt: lastSeenTimestamp, + }, + }), + }; + + return this.prisma.agentConversationMessage.findMany({ + where, + orderBy: [{ timestamp: "asc" }, { id: "asc" }], + }); + } +} diff --git a/apps/orchestrator/src/api/agents/agents-killswitch.controller.spec.ts b/apps/orchestrator/src/api/agents/agents-killswitch.controller.spec.ts index a10fb7e..399f1aa 100644 --- a/apps/orchestrator/src/api/agents/agents-killswitch.controller.spec.ts +++ b/apps/orchestrator/src/api/agents/agents-killswitch.controller.spec.ts @@ -5,6 +5,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { KillswitchService } from "../../killswitch/killswitch.service"; import { AgentEventsService } from "./agent-events.service"; +import { AgentMessagesService } from "./agent-messages.service"; import type { KillAllResult } from "../../killswitch/killswitch.service"; describe("AgentsController - Killswitch Endpoints", () => { @@ -27,6 +28,12 @@ describe("AgentsController - Killswitch Endpoints", () => { subscribe: ReturnType; getInitialSnapshot: ReturnType; createHeartbeat: ReturnType; + getRecentEvents: ReturnType; + }; + let mockMessagesService: { + getMessages: ReturnType; + getReplayMessages: ReturnType; + getMessagesAfter: ReturnType; }; beforeEach(() => { @@ -61,6 +68,13 @@ describe("AgentsController - Killswitch Endpoints", () => { timestamp: new Date().toISOString(), data: { heartbeat: true }, }), + getRecentEvents: vi.fn().mockReturnValue([]), + }; + + mockMessagesService = { + getMessages: vi.fn(), + getReplayMessages: vi.fn().mockResolvedValue([]), + getMessagesAfter: vi.fn().mockResolvedValue([]), }; controller = new AgentsController( @@ -68,7 +82,8 @@ describe("AgentsController - Killswitch Endpoints", () => { mockSpawnerService as unknown as AgentSpawnerService, mockLifecycleService as unknown as AgentLifecycleService, mockKillswitchService as unknown as KillswitchService, - mockEventsService as unknown as AgentEventsService + mockEventsService as unknown as AgentEventsService, + mockMessagesService as unknown as AgentMessagesService ); }); diff --git a/apps/orchestrator/src/api/agents/agents.controller.spec.ts b/apps/orchestrator/src/api/agents/agents.controller.spec.ts index 75393ec..baa2721 100644 --- a/apps/orchestrator/src/api/agents/agents.controller.spec.ts +++ b/apps/orchestrator/src/api/agents/agents.controller.spec.ts @@ -4,6 +4,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { KillswitchService } from "../../killswitch/killswitch.service"; import { AgentEventsService } from "./agent-events.service"; +import { AgentMessagesService } from "./agent-messages.service"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; describe("AgentsController", () => { @@ -30,6 +31,11 @@ describe("AgentsController", () => { createHeartbeat: ReturnType; getRecentEvents: ReturnType; }; + let messagesService: { + getMessages: ReturnType; + getReplayMessages: ReturnType; + getMessagesAfter: ReturnType; + }; beforeEach(() => { // Create mock services @@ -69,13 +75,20 @@ describe("AgentsController", () => { getRecentEvents: vi.fn().mockReturnValue([]), }; + messagesService = { + getMessages: vi.fn(), + getReplayMessages: vi.fn().mockResolvedValue([]), + getMessagesAfter: vi.fn().mockResolvedValue([]), + }; + // Create controller with mocked services controller = new AgentsController( queueService as unknown as QueueService, spawnerService as unknown as AgentSpawnerService, lifecycleService as unknown as AgentLifecycleService, killswitchService as unknown as KillswitchService, - eventsService as unknown as AgentEventsService + eventsService as unknown as AgentEventsService, + messagesService as unknown as AgentMessagesService ); }); @@ -365,6 +378,52 @@ describe("AgentsController", () => { }); }); + describe("getAgentMessages", () => { + it("should return paginated message history", async () => { + const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64"; + const query = { + limit: 25, + skip: 10, + }; + + const response = { + messages: [ + { + id: "msg-1", + sessionId: agentId, + role: "agent", + content: "hello", + provider: "internal", + timestamp: new Date("2026-03-07T03:00:00.000Z"), + metadata: {}, + }, + ], + total: 101, + }; + + messagesService.getMessages.mockResolvedValue(response); + + const result = await controller.getAgentMessages(agentId, query); + + expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10); + expect(result).toEqual(response); + }); + + it("should use default pagination values", async () => { + const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64"; + const query = { + limit: 50, + skip: 0, + }; + + messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 }); + + await controller.getAgentMessages(agentId, query); + + expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0); + }); + }); + describe("getRecentEvents", () => { it("should return recent events with default limit", () => { eventsService.getRecentEvents.mockReturnValue([ diff --git a/apps/orchestrator/src/api/agents/agents.controller.ts b/apps/orchestrator/src/api/agents/agents.controller.ts index 687129e..a94efac 100644 --- a/apps/orchestrator/src/api/agents/agents.controller.ts +++ b/apps/orchestrator/src/api/agents/agents.controller.ts @@ -15,6 +15,7 @@ import { MessageEvent, Query, } from "@nestjs/common"; +import type { AgentConversationMessage } from "@prisma/client"; import { Throttle } from "@nestjs/throttler"; import { Observable } from "rxjs"; import { QueueService } from "../../queue/queue.service"; @@ -25,6 +26,8 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"; import { AgentEventsService } from "./agent-events.service"; +import { GetMessagesQueryDto } from "./dto/get-messages-query.dto"; +import { AgentMessagesService } from "./agent-messages.service"; /** * Controller for agent management endpoints @@ -47,7 +50,8 @@ export class AgentsController { private readonly spawnerService: AgentSpawnerService, private readonly lifecycleService: AgentLifecycleService, private readonly killswitchService: KillswitchService, - private readonly eventsService: AgentEventsService + private readonly eventsService: AgentEventsService, + private readonly messagesService: AgentMessagesService ) {} /** @@ -185,6 +189,107 @@ export class AgentsController { } } + /** + * Get paginated message history for an agent. + */ + @Get(":agentId/messages") + @Throttle({ status: { limit: 200, ttl: 60000 } }) + @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) + async getAgentMessages( + @Param("agentId", ParseUUIDPipe) agentId: string, + @Query() query: GetMessagesQueryDto + ): Promise<{ + messages: AgentConversationMessage[]; + total: number; + }> { + return this.messagesService.getMessages(agentId, query.limit, query.skip); + } + + /** + * Stream per-agent conversation messages as server-sent events (SSE). + */ + @Sse(":agentId/messages/stream") + @Throttle({ status: { limit: 200, ttl: 60000 } }) + streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable { + return new Observable((subscriber) => { + let isClosed = false; + let lastSeenTimestamp = new Date(); + let lastSeenMessageId: string | null = null; + + const emitMessage = (message: AgentConversationMessage): void => { + if (isClosed) { + return; + } + + subscriber.next({ + data: this.toMessageStreamPayload(message), + }); + + lastSeenTimestamp = message.timestamp; + lastSeenMessageId = message.id; + }; + + void this.messagesService + .getReplayMessages(agentId, 50) + .then((messages) => { + if (isClosed) { + return; + } + + messages.forEach((message) => { + emitMessage(message); + }); + + if (messages.length === 0) { + lastSeenTimestamp = new Date(); + lastSeenMessageId = null; + } + }) + .catch((error: unknown) => { + this.logger.error( + `Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}` + ); + lastSeenTimestamp = new Date(); + lastSeenMessageId = null; + }); + + const pollInterval = setInterval(() => { + if (isClosed) { + return; + } + + void this.messagesService + .getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId) + .then((messages) => { + if (isClosed || messages.length === 0) { + return; + } + + messages.forEach((message) => { + emitMessage(message); + }); + }) + .catch((error: unknown) => { + this.logger.error( + `Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}` + ); + }); + }, 1000); + + const heartbeat = setInterval(() => { + if (!isClosed) { + subscriber.next({ data: { type: "heartbeat" } }); + } + }, 15000); + + return () => { + isClosed = true; + clearInterval(pollInterval); + clearInterval(heartbeat); + }; + }); + } + /** * Get agent status * @param agentId Agent ID to query @@ -301,4 +406,24 @@ export class AgentsController { throw error; } } + + private toMessageStreamPayload(message: AgentConversationMessage): { + messageId: string; + sessionId: string; + role: string; + content: string; + provider: string; + timestamp: string; + metadata: unknown; + } { + return { + messageId: message.id, + sessionId: message.sessionId, + role: message.role, + content: message.content, + provider: message.provider, + timestamp: message.timestamp.toISOString(), + metadata: message.metadata, + }; + } } diff --git a/apps/orchestrator/src/api/agents/agents.module.ts b/apps/orchestrator/src/api/agents/agents.module.ts index 029c3c2..d59ce73 100644 --- a/apps/orchestrator/src/api/agents/agents.module.ts +++ b/apps/orchestrator/src/api/agents/agents.module.ts @@ -6,10 +6,12 @@ import { KillswitchModule } from "../../killswitch/killswitch.module"; import { ValkeyModule } from "../../valkey/valkey.module"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { AgentEventsService } from "./agent-events.service"; +import { PrismaModule } from "../../prisma/prisma.module"; +import { AgentMessagesService } from "./agent-messages.service"; @Module({ - imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule], + imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule], controllers: [AgentsController], - providers: [OrchestratorApiKeyGuard, AgentEventsService], + providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService], }) export class AgentsModule {} diff --git a/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.spec.ts b/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.spec.ts new file mode 100644 index 0000000..6436778 --- /dev/null +++ b/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.spec.ts @@ -0,0 +1,37 @@ +import { plainToInstance } from "class-transformer"; +import { validate } from "class-validator"; +import { describe, expect, it } from "vitest"; +import { GetMessagesQueryDto } from "./get-messages-query.dto"; + +describe("GetMessagesQueryDto", () => { + it("should use defaults when empty", async () => { + const dto = plainToInstance(GetMessagesQueryDto, {}); + const errors = await validate(dto); + + expect(errors).toHaveLength(0); + expect(dto.limit).toBe(50); + expect(dto.skip).toBe(0); + }); + + it("should reject limit greater than 200", async () => { + const dto = plainToInstance(GetMessagesQueryDto, { + limit: 201, + skip: 0, + }); + const errors = await validate(dto); + + expect(errors.length).toBeGreaterThan(0); + expect(errors.some((error) => error.property === "limit")).toBe(true); + }); + + it("should reject negative skip", async () => { + const dto = plainToInstance(GetMessagesQueryDto, { + limit: 50, + skip: -1, + }); + const errors = await validate(dto); + + expect(errors.length).toBeGreaterThan(0); + expect(errors.some((error) => error.property === "skip")).toBe(true); + }); +}); diff --git a/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.ts b/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.ts new file mode 100644 index 0000000..61bd8ce --- /dev/null +++ b/apps/orchestrator/src/api/agents/dto/get-messages-query.dto.ts @@ -0,0 +1,17 @@ +import { Type } from "class-transformer"; +import { IsInt, IsOptional, Max, Min } from "class-validator"; + +export class GetMessagesQueryDto { + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(1) + @Max(200) + limit = 50; + + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(0) + skip = 0; +}