Compare commits

..

1 Commits

Author SHA1 Message Date
945f4ba12a fix(orchestrator): rm dangling symlink before COPY in Docker builder
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Kaniko resolves destination symlinks during COPY. Writing to
apps/orchestrator/prisma/schema.prisma (a symlink → ../../api/prisma/schema.prisma)
causes kaniko to try to resolve /app/apps/api which does not exist → build fails.

Fix: RUN rm -f the symlink first so the destination path is clean,
then COPY the real schema file to that location.

CI is unaffected (symlink resolves correctly in the monorepo checkout).
2026-03-07 11:00:27 -06:00
8 changed files with 8 additions and 347 deletions

View File

@@ -1,84 +0,0 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -5,7 +5,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => {
@@ -28,12 +27,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -68,13 +61,6 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controller = new AgentsController(
@@ -82,8 +68,7 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService
mockEventsService as unknown as AgentEventsService
);
});

View File

@@ -4,7 +4,6 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => {
@@ -31,11 +30,6 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
// Create mock services
@@ -75,20 +69,13 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]),
};
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services
controller = new AgentsController(
queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService
eventsService as unknown as AgentEventsService
);
});
@@ -378,52 +365,6 @@ describe("AgentsController", () => {
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([

View File

@@ -15,7 +15,6 @@ import {
MessageEvent,
Query,
} from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service";
@@ -26,8 +25,6 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
/**
* Controller for agent management endpoints
@@ -50,8 +47,7 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService
private readonly eventsService: AgentEventsService
) {}
/**
@@ -189,107 +185,6 @@ export class AgentsController {
}
}
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/**
* Get agent status
* @param agentId Agent ID to query
@@ -406,24 +301,4 @@ export class AgentsController {
throw error;
}
}
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
}

View File

@@ -6,12 +6,10 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
@Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService, AgentMessagesService],
providers: [OrchestratorApiKeyGuard, AgentEventsService],
})
export class AgentsModule {}

View File

@@ -1,37 +0,0 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -1,17 +0,0 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -122,9 +122,9 @@ Target version: `v0.0.23`
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-004 | in-progress | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | — | 15K | — | |
| MS23-P0-002 | in-progress | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | | 20K | — | |
| MS23-P0-003 | not-started | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | — | — | — | 20K | — | |
| MS23-P0-004 | not-started | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-005 | not-started | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | not-started | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | — | — | — | 20K | — | Phase 0 gate: SSE stream verified via curl |