Compare commits

..

2 Commits

Author SHA1 Message Date
7e299f74b5 fix(orchestrator): copy apps/api/package.json into Dockerfile for prisma generate
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Prisma generate needs a package.json in the inferred project root to identify
the workspace. Without it, Prisma tries to auto-install and fails.

Copying apps/api/package.json alongside the schema file provides the required
project root marker.
2026-03-07 09:38:35 -06:00
b61554800b fix(orchestrator): add prisma CLI devDependency for prisma:generate (#704)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 06:56:42 +00:00
10 changed files with 13 additions and 346 deletions

View File

@@ -21,7 +21,8 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/
# Copy API prisma schema so prisma generate can run in the orchestrator build
# Copy API package.json + prisma schema so prisma generate can find the project root
COPY apps/api/package.json ./apps/api/package.json
COPY apps/api/prisma ./apps/api/prisma
# Copy npm configuration for native binary architecture hints
@@ -48,7 +49,8 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# Copy API prisma schema and generate the Prisma client for the orchestrator
# Copy API package.json + prisma schema so Prisma can find the project root
COPY apps/api/package.json ./apps/api/package.json
COPY apps/api/prisma ./apps/api/prisma
RUN pnpm --filter=@mosaic/orchestrator prisma:generate

View File

@@ -47,6 +47,7 @@
"@types/express": "^5.0.1",
"@types/node": "^22.13.4",
"@vitest/coverage-v8": "^4.0.18",
"prisma": "^6.19.2",
"ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2",

View File

@@ -1,84 +0,0 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -5,7 +5,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -28,12 +27,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -68,13 +61,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controller = new AgentsController(
@@ -82,8 +68,7 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService
mockEventsService as unknown as AgentEventsService
);
});

View File

@@ -4,7 +4,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -31,11 +30,6 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -75,20 +69,13 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]),
};
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService
eventsService as unknown as AgentEventsService
);
});
@@ -378,52 +365,6 @@ describe("AgentsController", () => {
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([

View File

@@ -15,7 +15,6 @@ import {
MessageEvent,
Query,
} from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
@@ -26,8 +25,6 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
/**
* Controller for agent management endpoints
@@ -50,8 +47,7 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService
private readonly eventsService: AgentEventsService
) {}
/**
@@ -189,107 +185,6 @@ export class AgentsController {
}
}
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/**
* Get agent status
* @param agentId Agent ID to query
@@ -406,24 +301,4 @@ export class AgentsController {
throw error;
}
}
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
}

View File

@@ -6,12 +6,10 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService],
providers: [OrchestratorApiKeyGuard, AgentEventsService],
})
export class AgentsModule {}

View File

@@ -1,37 +0,0 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -1,17 +0,0 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

3
pnpm-lock.yaml generated
View File

@@ -389,6 +389,9 @@ importers:
'@vitest/coverage-v8':
specifier: ^4.0.18
version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@22.19.7)(jiti@2.6.1)(jsdom@26.1.0)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2))
prisma:
specifier: ^6.19.2
version: 6.19.2(magicast@0.3.5)(typescript@5.9.3)
ts-node:
specifier: ^10.9.2
version: 10.9.2(@swc/core@1.15.11)(@types/node@22.19.7)(typescript@5.9.3)