Compare commits

..

6 Commits

Author SHA1 Message Date
1cf98a1729 feat(orchestrator): add MS23 per-agent message history and SSE stream endpoints
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
GET /agents/:id/messages - paginated message history
GET /agents/:id/messages/stream - SSE live stream with replay

Partial #693
2026-03-07 00:57:08 -06:00
b61554800b fix(orchestrator): add prisma CLI devDependency for prisma:generate (#704)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 06:56:42 +00:00
98e892f23c fix(orchestrator): Dockerfile prisma generate + vitest reflect-metadata setup (#703)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:45:17 +00:00
de6faf659e feat(orchestrator): MS23 agent lifecycle ingestion service (#701)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:21:26 +00:00
49fa958444 chore(orchestrator): MS23 P0-001 done, P0-002 in-progress (#700)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 02:10:59 +00:00
8d6abd72bb feat(api): MS23 mission control Prisma schema (#699)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 01:18:06 +00:00
19 changed files with 626 additions and 33 deletions

View File

@@ -21,6 +21,8 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/ COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/ COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/ COPY apps/orchestrator/package.json ./apps/orchestrator/
# Copy API prisma schema so prisma generate can run in the orchestrator build
COPY apps/api/prisma ./apps/api/prisma
# Copy npm configuration for native binary architecture hints # Copy npm configuration for native binary architecture hints
COPY .npmrc ./ COPY .npmrc ./
@@ -46,6 +48,10 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# Copy API prisma schema and generate the Prisma client for the orchestrator
COPY apps/api/prisma ./apps/api/prisma
RUN pnpm --filter=@mosaic/orchestrator prisma:generate
# Build the orchestrator app using TurboRepo # Build the orchestrator app using TurboRepo
RUN pnpm turbo build --filter=@mosaic/orchestrator RUN pnpm turbo build --filter=@mosaic/orchestrator

View File

@@ -3,19 +3,20 @@
"version": "0.0.20", "version": "0.0.20",
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "nest start --watch",
"build": "nest build", "build": "nest build",
"dev": "nest start --watch",
"lint": "eslint src/",
"lint:fix": "eslint src/ --fix",
"prisma:generate": "prisma generate --schema=../api/prisma/schema.prisma",
"start": "node dist/main.js", "start": "node dist/main.js",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch", "start:debug": "nest start --debug --watch",
"start:dev": "nest start --watch",
"start:prod": "node dist/main.js", "start:prod": "node dist/main.js",
"test": "vitest", "test": "vitest",
"test:watch": "vitest watch",
"test:e2e": "vitest run --config tests/integration/vitest.config.ts", "test:e2e": "vitest run --config tests/integration/vitest.config.ts",
"test:perf": "vitest run --config tests/performance/vitest.config.ts", "test:perf": "vitest run --config tests/performance/vitest.config.ts",
"typecheck": "tsc --noEmit", "test:watch": "vitest watch",
"lint": "eslint src/", "typecheck": "tsc --noEmit"
"lint:fix": "eslint src/ --fix"
}, },
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.72.1", "@anthropic-ai/sdk": "^0.72.1",
@@ -27,6 +28,7 @@
"@nestjs/core": "^11.1.12", "@nestjs/core": "^11.1.12",
"@nestjs/platform-express": "^11.1.12", "@nestjs/platform-express": "^11.1.12",
"@nestjs/throttler": "^6.5.0", "@nestjs/throttler": "^6.5.0",
"@prisma/client": "^6.19.2",
"bullmq": "^5.67.2", "bullmq": "^5.67.2",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.1", "class-validator": "^0.14.1",
@@ -45,6 +47,7 @@
"@types/express": "^5.0.1", "@types/express": "^5.0.1",
"@types/node": "^22.13.4", "@types/node": "^22.13.4",
"@vitest/coverage-v8": "^4.0.18", "@vitest/coverage-v8": "^4.0.18",
"prisma": "^6.19.2",
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0", "tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2", "typescript": "^5.8.2",

View File

@@ -0,0 +1,10 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../prisma/prisma.module";
import { AgentIngestionService } from "./agent-ingestion.service";
@Module({
imports: [PrismaModule],
providers: [AgentIngestionService],
exports: [AgentIngestionService],
})
export class AgentIngestionModule {}

View File

@@ -0,0 +1,141 @@
import { Injectable, Logger } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
export type AgentConversationRole = "agent" | "user" | "system" | "operator";
@Injectable()
export class AgentIngestionService {
private readonly logger = new Logger(AgentIngestionService.name);
constructor(private readonly prisma: PrismaService) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
async recordAgentSpawned(
agentId: string,
parentAgentId?: string,
missionId?: string,
taskId?: string,
agentType?: string
): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,
status: "spawning",
},
update: {
parentSessionId: parentAgentId,
missionId,
taskId,
agentType,
status: "spawning",
completedAt: null,
},
});
this.logger.debug(`Recorded spawned state for agent ${agentId}`);
}
async recordAgentStarted(agentId: string): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "running",
},
update: {
status: "running",
},
});
this.logger.debug(`Recorded running state for agent ${agentId}`);
}
async recordAgentCompleted(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "completed",
completedAt,
},
update: {
status: "completed",
completedAt,
},
});
this.logger.debug(`Recorded completed state for agent ${agentId}`);
}
async recordAgentFailed(agentId: string, error?: string): Promise<void> {
const completedAt = new Date();
const metadata = error ? this.toJsonValue({ error }) : undefined;
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "failed",
completedAt,
...(metadata && { metadata }),
},
update: {
status: "failed",
completedAt,
...(metadata && { metadata }),
},
});
this.logger.debug(`Recorded failed state for agent ${agentId}`);
}
async recordAgentKilled(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "killed",
completedAt,
},
update: {
status: "killed",
completedAt,
},
});
this.logger.debug(`Recorded killed state for agent ${agentId}`);
}
async recordMessage(
sessionId: string,
role: AgentConversationRole,
content: string,
provider = "internal",
metadata?: Record<string, unknown>
): Promise<void> {
await this.prisma.agentConversationMessage.create({
data: {
sessionId,
role,
content,
provider,
...(metadata && { metadata: this.toJsonValue(metadata) }),
},
});
this.logger.debug(`Recorded message for session ${sessionId}`);
}
}

View File

@@ -0,0 +1,84 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -5,6 +5,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import type { KillAllResult } from "../../killswitch/killswitch.service"; import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => { describe("AgentsController - Killswitch Endpoints", () => {
@@ -27,6 +28,12 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>; subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>; getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
}; };
beforeEach(() => { beforeEach(() => {
@@ -61,6 +68,13 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
data: { heartbeat: true }, data: { heartbeat: true },
}), }),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
}; };
controller = new AgentsController( controller = new AgentsController(
@@ -68,7 +82,8 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService, mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService, mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService, mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService
); );
}); });

View File

@@ -4,6 +4,7 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => { describe("AgentsController", () => {
@@ -30,6 +31,11 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>; getRecentEvents: ReturnType<typeof vi.fn>;
}; };
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => { beforeEach(() => {
// Create mock services // Create mock services
@@ -69,13 +75,20 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]), getRecentEvents: vi.fn().mockReturnValue([]),
}; };
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services // Create controller with mocked services
controller = new AgentsController( controller = new AgentsController(
queueService as unknown as QueueService, queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService, spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService, lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService, killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService
); );
}); });
@@ -365,6 +378,52 @@ describe("AgentsController", () => {
}); });
}); });
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => { describe("getRecentEvents", () => {
it("should return recent events with default limit", () => { it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([ eventsService.getRecentEvents.mockReturnValue([

View File

@@ -15,6 +15,7 @@ import {
MessageEvent, MessageEvent,
Query, Query,
} from "@nestjs/common"; } from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler"; import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs"; import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service"; import { QueueService } from "../../queue/queue.service";
@@ -25,6 +26,8 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"; import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
/** /**
* Controller for agent management endpoints * Controller for agent management endpoints
@@ -47,7 +50,8 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService, private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService, private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService, private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService
) {} ) {}
/** /**
@@ -185,6 +189,107 @@ export class AgentsController {
} }
} }
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/** /**
* Get agent status * Get agent status
* @param agentId Agent ID to query * @param agentId Agent ID to query
@@ -301,4 +406,24 @@ export class AgentsController {
throw error; throw error;
} }
} }
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
} }

View File

@@ -6,10 +6,12 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module"; import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
@Module({ @Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule], imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
controllers: [AgentsController], controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService], providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService],
}) })
export class AgentsModule {} export class AgentsModule {}

View File

@@ -0,0 +1,37 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -0,0 +1,17 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from "@nestjs/common";
import { PrismaService } from "./prisma.service";
@Global()
@Module({
providers: [PrismaService],
exports: [PrismaService],
})
export class PrismaModule {}

View File

@@ -0,0 +1,26 @@
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { PrismaClient } from "@prisma/client";
/**
* Lightweight Prisma service for orchestrator ingestion persistence.
*/
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
constructor() {
super({
log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"],
});
}
async onModuleInit(): Promise<void> {
await this.$connect();
this.logger.log("Database connection established");
}
async onModuleDestroy(): Promise<void> {
await this.$disconnect();
this.logger.log("Database connection closed");
}
}

View File

@@ -1,6 +1,7 @@
import { Injectable, Logger, Inject, forwardRef } from "@nestjs/common"; import { Injectable, Logger, Inject, Optional, forwardRef } from "@nestjs/common";
import { ValkeyService } from "../valkey/valkey.service"; import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "./agent-spawner.service"; import { AgentSpawnerService } from "./agent-spawner.service";
import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service";
import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types"; import type { AgentState, AgentStatus, AgentEvent } from "../valkey/types";
import { isValidAgentTransition } from "../valkey/types/state.types"; import { isValidAgentTransition } from "../valkey/types/state.types";
@@ -32,7 +33,8 @@ export class AgentLifecycleService {
constructor( constructor(
private readonly valkeyService: ValkeyService, private readonly valkeyService: ValkeyService,
@Inject(forwardRef(() => AgentSpawnerService)) @Inject(forwardRef(() => AgentSpawnerService))
private readonly spawnerService: AgentSpawnerService private readonly spawnerService: AgentSpawnerService,
@Optional() private readonly agentIngestionService?: AgentIngestionService
) { ) {
this.logger.log("AgentLifecycleService initialized"); this.logger.log("AgentLifecycleService initialized");
} }
@@ -55,6 +57,25 @@ export class AgentLifecycleService {
return createdState; return createdState;
} }
private async recordLifecycleIngestion(
agentId: string,
event: "started" | "completed" | "failed" | "killed",
record: (ingestionService: AgentIngestionService) => Promise<void>
): Promise<void> {
if (!this.agentIngestionService) {
return;
}
try {
await record(this.agentIngestionService);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(
`Failed to record agent ${event} ingestion for ${agentId}: ${errorMessage}`
);
}
}
/** /**
* Acquire a per-agent mutex to serialize state transitions. * Acquire a per-agent mutex to serialize state transitions.
* Uses promise chaining: each caller chains onto the previous lock, * Uses promise chaining: each caller chains onto the previous lock,
@@ -118,6 +139,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.running", updatedState); await this.publishStateChangeEvent("agent.running", updatedState);
await this.recordLifecycleIngestion(agentId, "started", (ingestionService) =>
ingestionService.recordAgentStarted(agentId)
);
this.logger.log(`Agent ${agentId} transitioned to running`); this.logger.log(`Agent ${agentId} transitioned to running`);
return updatedState; return updatedState;
}); });
@@ -155,6 +180,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.completed", updatedState); await this.publishStateChangeEvent("agent.completed", updatedState);
await this.recordLifecycleIngestion(agentId, "completed", (ingestionService) =>
ingestionService.recordAgentCompleted(agentId)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);
@@ -192,6 +221,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.failed", updatedState, error); await this.publishStateChangeEvent("agent.failed", updatedState, error);
await this.recordLifecycleIngestion(agentId, "failed", (ingestionService) =>
ingestionService.recordAgentFailed(agentId, error)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);
@@ -228,6 +261,10 @@ export class AgentLifecycleService {
// Emit event // Emit event
await this.publishStateChangeEvent("agent.killed", updatedState); await this.publishStateChangeEvent("agent.killed", updatedState);
await this.recordLifecycleIngestion(agentId, "killed", (ingestionService) =>
ingestionService.recordAgentKilled(agentId)
);
// Schedule session cleanup // Schedule session cleanup
this.spawnerService.scheduleSessionCleanup(agentId); this.spawnerService.scheduleSessionCleanup(agentId);

View File

@@ -1,4 +1,11 @@
import { Injectable, Logger, HttpException, HttpStatus, OnModuleDestroy } from "@nestjs/common"; import {
Injectable,
Logger,
HttpException,
HttpStatus,
OnModuleDestroy,
Optional,
} from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import Anthropic from "@anthropic-ai/sdk"; import Anthropic from "@anthropic-ai/sdk";
import { randomUUID } from "crypto"; import { randomUUID } from "crypto";
@@ -8,6 +15,7 @@ import {
AgentSession, AgentSession,
AgentType, AgentType,
} from "./types/agent-spawner.types"; } from "./types/agent-spawner.types";
import { AgentIngestionService } from "../agent-ingestion/agent-ingestion.service";
/** /**
* Default delay in milliseconds before cleaning up sessions after terminal states * Default delay in milliseconds before cleaning up sessions after terminal states
@@ -30,7 +38,10 @@ export class AgentSpawnerService implements OnModuleDestroy {
private readonly sessionCleanupDelayMs: number; private readonly sessionCleanupDelayMs: number;
private readonly cleanupTimers = new Map<string, NodeJS.Timeout>(); private readonly cleanupTimers = new Map<string, NodeJS.Timeout>();
constructor(private readonly configService: ConfigService) { constructor(
private readonly configService: ConfigService,
@Optional() private readonly agentIngestionService?: AgentIngestionService
) {
const configuredProvider = this.configService.get<string>("orchestrator.aiProvider"); const configuredProvider = this.configService.get<string>("orchestrator.aiProvider");
this.aiProvider = this.normalizeAiProvider(configuredProvider); this.aiProvider = this.normalizeAiProvider(configuredProvider);
@@ -98,6 +109,19 @@ export class AgentSpawnerService implements OnModuleDestroy {
this.cleanupTimers.clear(); this.cleanupTimers.clear();
} }
private recordSpawnedAgentIngestion(agentId: string, request: SpawnAgentRequest): void {
if (!this.agentIngestionService) {
return;
}
void this.agentIngestionService
.recordAgentSpawned(agentId, undefined, undefined, request.taskId, request.agentType)
.catch((error: unknown) => {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to record spawned ingestion for ${agentId}: ${errorMessage}`);
});
}
/** /**
* Spawn a new agent with the given configuration * Spawn a new agent with the given configuration
* @param request Agent spawn request * @param request Agent spawn request
@@ -130,6 +154,8 @@ export class AgentSpawnerService implements OnModuleDestroy {
// Store session // Store session
this.sessions.set(agentId, session); this.sessions.set(agentId, session);
this.recordSpawnedAgentIngestion(agentId, request);
this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`); this.logger.log(`Agent spawned successfully: ${agentId} (type: ${request.agentType})`);
// NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD) // NOTE: Actual Claude SDK integration will be implemented in next iteration (see issue #TBD)

View File

@@ -3,9 +3,10 @@ import { AgentSpawnerService } from "./agent-spawner.service";
import { AgentLifecycleService } from "./agent-lifecycle.service"; import { AgentLifecycleService } from "./agent-lifecycle.service";
import { DockerSandboxService } from "./docker-sandbox.service"; import { DockerSandboxService } from "./docker-sandbox.service";
import { ValkeyModule } from "../valkey/valkey.module"; import { ValkeyModule } from "../valkey/valkey.module";
import { AgentIngestionModule } from "../agent-ingestion/agent-ingestion.module";
@Module({ @Module({
imports: [ValkeyModule], imports: [ValkeyModule, AgentIngestionModule],
providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], providers: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService],
exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService], exports: [AgentSpawnerService, AgentLifecycleService, DockerSandboxService],
}) })

View File

@@ -4,6 +4,7 @@ export default defineConfig({
test: { test: {
globals: true, globals: true,
environment: "node", environment: "node",
setupFiles: ["reflect-metadata"],
exclude: ["**/node_modules/**", "**/dist/**", "**/tests/integration/**"], exclude: ["**/node_modules/**", "**/dist/**", "**/tests/integration/**"],
include: ["src/**/*.spec.ts", "src/**/*.test.ts"], include: ["src/**/*.spec.ts", "src/**/*.test.ts"],
coverage: { coverage: {

View File

@@ -121,8 +121,8 @@ Target version: `v0.0.23`
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | | id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- | | ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | not-started | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | — | — | — | 15K | — | taskSource field per mosaic-queue note in PRD | | MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | not-started | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | | | MS23-P0-002 | in-progress | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | — | 20K | — | |
| MS23-P0-003 | not-started | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | | | MS23-P0-003 | not-started | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | |
| MS23-P0-004 | not-started | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | | | MS23-P0-004 | not-started | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-005 | not-started | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | | | MS23-P0-005 | not-started | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |

24
pnpm-lock.yaml generated
View File

@@ -337,6 +337,9 @@ importers:
'@nestjs/throttler': '@nestjs/throttler':
specifier: ^6.5.0 specifier: ^6.5.0
version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2) version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2)
'@prisma/client':
specifier: ^6.19.2
version: 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3)
bullmq: bullmq:
specifier: ^5.67.2 specifier: ^5.67.2
version: 5.67.2 version: 5.67.2
@@ -386,6 +389,9 @@ importers:
'@vitest/coverage-v8': '@vitest/coverage-v8':
specifier: ^4.0.18 specifier: ^4.0.18
version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2)) version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2))
prisma:
specifier: ^6.19.2
version: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
ts-node: ts-node:
specifier: ^10.9.2 specifier: ^10.9.2
version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3) version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3)
@@ -8005,7 +8011,7 @@ snapshots:
chalk: 5.6.2 chalk: 5.6.2
commander: 12.1.0 commander: 12.1.0
dotenv: 17.2.4 dotenv: 17.2.4
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
open: 10.2.0 open: 10.2.0
pg: 8.17.2 pg: 8.17.2
prettier: 3.8.1 prettier: 3.8.1
@@ -11345,7 +11351,7 @@ snapshots:
optionalDependencies: optionalDependencies:
'@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) '@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
better-sqlite3: 12.6.2 better-sqlite3: 12.6.2
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
pg: 8.17.2 pg: 8.17.2
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
@@ -11370,7 +11376,7 @@ snapshots:
optionalDependencies: optionalDependencies:
'@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3) '@prisma/client': 6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3)
better-sqlite3: 12.6.2 better-sqlite3: 12.6.2
drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)) drizzle-orm: 0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next: 16.1.6(@babel/core@7.28.6)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
pg: 8.17.2 pg: 8.17.2
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
@@ -12194,17 +12200,6 @@ snapshots:
dotenv@17.2.4: {} dotenv@17.2.4: {}
drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)):
optionalDependencies:
'@opentelemetry/api': 1.9.0
'@prisma/client': 5.22.0(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))
'@types/pg': 8.16.0
better-sqlite3: 12.6.2
kysely: 0.28.10
pg: 8.17.2
postgres: 3.4.8
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)): drizzle-orm@0.41.0(@opentelemetry/api@1.9.0)(@prisma/client@6.19.2(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3))(typescript@5.9.3))(@types/pg@8.16.0)(better-sqlite3@12.6.2)(kysely@0.28.10)(pg@8.17.2)(postgres@3.4.8)(prisma@6.19.2(magicast@0.3.5)(typescript@5.9.3)):
optionalDependencies: optionalDependencies:
'@opentelemetry/api': 1.9.0 '@opentelemetry/api': 1.9.0
@@ -12215,7 +12210,6 @@ snapshots:
pg: 8.17.2 pg: 8.17.2
postgres: 3.4.8 postgres: 3.4.8
prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3) prisma: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
optional: true
dunder-proto@1.0.1: dunder-proto@1.0.1:
dependencies: dependencies: