Compare commits

...

3 Commits

Author SHA1 Message Date
1cf98a1729 feat(orchestrator): add MS23 per-agent message history and SSE stream endpoints
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
GET /agents/:id/messages - paginated message history
GET /agents/:id/messages/stream - SSE live stream with replay

Partial #693
2026-03-07 00:57:08 -06:00
b61554800b fix(orchestrator): add prisma CLI devDependency for prisma:generate (#704)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 06:56:42 +00:00
98e892f23c fix(orchestrator): Dockerfile prisma generate + vitest reflect-metadata setup (#703)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:45:17 +00:00
11 changed files with 362 additions and 11 deletions

View File

@@ -21,6 +21,8 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/
# Copy API prisma schema so prisma generate can run in the orchestrator build
COPY apps/api/prisma ./apps/api/prisma
# Copy npm configuration for native binary architecture hints
COPY .npmrc ./
@@ -46,6 +48,10 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# Copy API prisma schema and generate the Prisma client for the orchestrator
COPY apps/api/prisma ./apps/api/prisma
RUN pnpm --filter=@mosaic/orchestrator prisma:generate
# Build the orchestrator app using TurboRepo
RUN pnpm turbo build --filter=@mosaic/orchestrator

View File

@@ -3,19 +3,20 @@
"version": "0.0.20",
"private": true,
"scripts": {
"dev": "nest start --watch",
"build": "nest build",
"dev": "nest start --watch",
"lint": "eslint src/",
"lint:fix": "eslint src/ --fix",
"prisma:generate": "prisma generate --schema=../api/prisma/schema.prisma",
"start": "node dist/main.js",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:dev": "nest start --watch",
"start:prod": "node dist/main.js",
"test": "vitest",
"test:watch": "vitest watch",
"test:e2e": "vitest run --config tests/integration/vitest.config.ts",
"test:perf": "vitest run --config tests/performance/vitest.config.ts",
"typecheck": "tsc --noEmit",
"lint": "eslint src/",
"lint:fix": "eslint src/ --fix"
"test:watch": "vitest watch",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@anthropic-ai/sdk": "^0.72.1",
@@ -46,6 +47,7 @@
"@types/express": "^5.0.1",
"@types/node": "^22.13.4",
"@vitest/coverage-v8": "^4.0.18",
"prisma": "^6.19.2",
"ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2",

View File

@@ -0,0 +1,84 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -5,6 +5,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -27,6 +28,12 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -61,6 +68,13 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controller = new AgentsController(
@@ -68,7 +82,8 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService
mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService
);
});

View File

@@ -4,6 +4,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -30,6 +31,11 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -69,13 +75,20 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]),
};
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService
eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService
);
});
@@ -365,6 +378,52 @@ describe("AgentsController", () => {
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([

View File

@@ -15,6 +15,7 @@ import {
MessageEvent,
Query,
} from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
@@ -25,6 +26,8 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
/**
* Controller for agent management endpoints
@@ -47,7 +50,8 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService
private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService
) {}
/**
@@ -185,6 +189,107 @@ export class AgentsController {
}
}
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/**
* Get agent status
* @param agentId Agent ID to query
@@ -301,4 +406,24 @@ export class AgentsController {
throw error;
}
}
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
}

View File

@@ -6,10 +6,12 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService],
providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService],
})
export class AgentsModule {}

View File

@@ -0,0 +1,37 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -0,0 +1,17 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -4,6 +4,7 @@ export default defineConfig({
test: {
globals: true,
environment: "node",
setupFiles: ["reflect-metadata"],
exclude: ["**/node_modules/**", "**/dist/**", "**/tests/integration/**"],
include: ["src/**/*.spec.ts", "src/**/*.test.ts"],
coverage: {

3
pnpm-lock.yaml generated
View File

@@ -389,6 +389,9 @@ importers:
'@vitest/coverage-v8':
specifier: ^4.0.18
version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2))
prisma:
specifier: ^6.19.2
version: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
ts-node:
specifier: ^10.9.2
version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3)