Compare commits

..

3 Commits

Author SHA1 Message Date
7e299f74b5 fix(orchestrator): copy apps/api/package.json into Dockerfile for prisma generate
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Prisma generate needs a package.json in the inferred project root to identify
the workspace. Without it, Prisma tries to auto-install and fails.

Copying apps/api/package.json alongside the schema file provides the required
project root marker.
2026-03-07 09:38:35 -06:00
b61554800b fix(orchestrator): add prisma CLI devDependency for prisma:generate (#704)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 06:56:42 +00:00
98e892f23c fix(orchestrator): Dockerfile prisma generate + vitest reflect-metadata setup (#703)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:45:17 +00:00
11 changed files with 25 additions and 350 deletions

View File

@@ -21,6 +21,9 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/ COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/ COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/ COPY apps/orchestrator/package.json ./apps/orchestrator/
# Copy API package.json + prisma schema so prisma generate can find the project root
COPY apps/api/package.json ./apps/api/package.json
COPY apps/api/prisma ./apps/api/prisma
# Copy npm configuration for native binary architecture hints # Copy npm configuration for native binary architecture hints
COPY .npmrc ./ COPY .npmrc ./
@@ -46,6 +49,11 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# Copy API package.json + prisma schema so Prisma can find the project root
COPY apps/api/package.json ./apps/api/package.json
COPY apps/api/prisma ./apps/api/prisma
RUN pnpm --filter=@mosaic/orchestrator prisma:generate
# Build the orchestrator app using TurboRepo # Build the orchestrator app using TurboRepo
RUN pnpm turbo build --filter=@mosaic/orchestrator RUN pnpm turbo build --filter=@mosaic/orchestrator

View File

@@ -3,19 +3,20 @@
"version": "0.0.20", "version": "0.0.20",
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "nest start --watch",
"build": "nest build", "build": "nest build",
"dev": "nest start --watch",
"lint": "eslint src/",
"lint:fix": "eslint src/ --fix",
"prisma:generate": "prisma generate --schema=../api/prisma/schema.prisma",
"start": "node dist/main.js", "start": "node dist/main.js",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch", "start:debug": "nest start --debug --watch",
"start:dev": "nest start --watch",
"start:prod": "node dist/main.js", "start:prod": "node dist/main.js",
"test": "vitest", "test": "vitest",
"test:watch": "vitest watch",
"test:e2e": "vitest run --config tests/integration/vitest.config.ts", "test:e2e": "vitest run --config tests/integration/vitest.config.ts",
"test:perf": "vitest run --config tests/performance/vitest.config.ts", "test:perf": "vitest run --config tests/performance/vitest.config.ts",
"typecheck": "tsc --noEmit", "test:watch": "vitest watch",
"lint": "eslint src/", "typecheck": "tsc --noEmit"
"lint:fix": "eslint src/ --fix"
}, },
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.72.1", "@anthropic-ai/sdk": "^0.72.1",
@@ -46,6 +47,7 @@
"@types/express": "^5.0.1", "@types/express": "^5.0.1",
"@types/node": "^22.13.4", "@types/node": "^22.13.4",
"@vitest/coverage-v8": "^4.0.18", "@vitest/coverage-v8": "^4.0.18",
"prisma": "^6.19.2",
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0", "tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2", "typescript": "^5.8.2",

View File

@@ -1,84 +0,0 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -5,7 +5,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import type { KillAllResult } from "../../killswitch/killswitch.service"; import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => { describe("AgentsController - Killswitch Endpoints", () => {
@@ -28,12 +27,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>; subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>; getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
}; };
beforeEach(() => { beforeEach(() => {
@@ -68,13 +61,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
data: { heartbeat: true }, data: { heartbeat: true },
}), }),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
}; };
controller = new AgentsController( controller = new AgentsController(
@@ -82,8 +68,7 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService, mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService, mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService, mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService, mockEventsService as unknown as AgentEventsService
mockMessagesService as unknown as AgentMessagesService
); );
}); });

View File

@@ -4,7 +4,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => { describe("AgentsController", () => {
@@ -31,11 +30,6 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>; getRecentEvents: ReturnType<typeof vi.fn>;
}; };
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => { beforeEach(() => {
// Create mock services // Create mock services
@@ -75,20 +69,13 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]), getRecentEvents: vi.fn().mockReturnValue([]),
}; };
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services // Create controller with mocked services
controller = new AgentsController( controller = new AgentsController(
queueService as unknown as QueueService, queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService, spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService, lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService, killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService, eventsService as unknown as AgentEventsService
messagesService as unknown as AgentMessagesService
); );
}); });
@@ -378,52 +365,6 @@ describe("AgentsController", () => {
}); });
}); });
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => { describe("getRecentEvents", () => {
it("should return recent events with default limit", () => { it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([ eventsService.getRecentEvents.mockReturnValue([

View File

@@ -15,7 +15,6 @@ import {
MessageEvent, MessageEvent,
Query, Query,
} from "@nestjs/common"; } from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler"; import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs"; import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service"; import { QueueService } from "../../queue/queue.service";
@@ -26,8 +25,6 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"; import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
/** /**
* Controller for agent management endpoints * Controller for agent management endpoints
@@ -50,8 +47,7 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService, private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService, private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService, private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService, private readonly eventsService: AgentEventsService
private readonly messagesService: AgentMessagesService
) {} ) {}
/** /**
@@ -189,107 +185,6 @@ export class AgentsController {
} }
} }
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/** /**
* Get agent status * Get agent status
* @param agentId Agent ID to query * @param agentId Agent ID to query
@@ -406,24 +301,4 @@ export class AgentsController {
throw error; throw error;
} }
} }
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
} }

View File

@@ -6,12 +6,10 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module"; import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
@Module({ @Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule], imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
controllers: [AgentsController], controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService], providers: [OrchestratorApiKeyGuard, AgentEventsService],
}) })
export class AgentsModule {} export class AgentsModule {}

View File

@@ -1,37 +0,0 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -1,17 +0,0 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -4,6 +4,7 @@ export default defineConfig({
test: { test: {
globals: true, globals: true,
environment: "node", environment: "node",
setupFiles: ["reflect-metadata"],
exclude: ["**/node_modules/**", "**/dist/**", "**/tests/integration/**"], exclude: ["**/node_modules/**", "**/dist/**", "**/tests/integration/**"],
include: ["src/**/*.spec.ts", "src/**/*.test.ts"], include: ["src/**/*.spec.ts", "src/**/*.test.ts"],
coverage: { coverage: {

3
pnpm-lock.yaml generated
View File

@@ -389,6 +389,9 @@ importers:
'@vitest/coverage-v8': '@vitest/coverage-v8':
specifier: ^4.0.18 specifier: ^4.0.18
version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2)) version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2))
prisma:
specifier: ^6.19.2
version: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
ts-node: ts-node:
specifier: ^10.9.2 specifier: ^10.9.2
version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3) version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3)