Files
stack/apps/orchestrator/src/api/agents/internal-agent.provider.ts
Jason Woltje 4b135ae1f0
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
feat(orchestrator): MS23-P1-002 InternalAgentProvider (#719)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 19:06:36 +00:00

219 lines
6.1 KiB
TypeScript

import { Injectable } from "@nestjs/common";
import type {
AgentMessage,
AgentMessageRole,
AgentSession,
AgentSessionList,
AgentSessionStatus,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
import { AgentControlService } from "./agent-control.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentTreeService } from "./agent-tree.service";
const DEFAULT_SESSION_LIMIT = 50;
const DEFAULT_MESSAGE_LIMIT = 50;
const MAX_MESSAGE_LIMIT = 200;
const STREAM_POLL_INTERVAL_MS = 1000;
const INTERNAL_OPERATOR_ID = "internal-provider";
@Injectable()
export class InternalAgentProvider implements IAgentProvider {
readonly providerId = "internal";
readonly providerType = "internal";
readonly displayName = "Internal Orchestrator";
constructor(
private readonly messagesService: AgentMessagesService,
private readonly controlService: AgentControlService,
private readonly treeService: AgentTreeService
) {}
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
const {
sessions,
total,
cursor: nextCursor,
} = await this.treeService.listSessions(cursor, limit);
return {
sessions: sessions.map((session) => this.toAgentSession(session)),
total,
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
};
}
async getSession(sessionId: string): Promise<AgentSession | null> {
const session = await this.treeService.getSession(sessionId);
return session ? this.toAgentSession(session) : null;
}
async getMessages(
sessionId: string,
limit = DEFAULT_MESSAGE_LIMIT,
before?: string
): Promise<AgentMessage[]> {
const safeLimit = this.normalizeMessageLimit(limit);
const skip = this.parseSkip(before);
const result = await this.messagesService.getMessages(sessionId, safeLimit, skip);
return result.messages.map((message) => this.toAgentMessage(message));
}
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
await this.controlService.injectMessage(sessionId, INTERNAL_OPERATOR_ID, content);
return {
accepted: true,
};
}
async pauseSession(sessionId: string): Promise<void> {
await this.controlService.pauseAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async resumeSession(sessionId: string): Promise<void> {
await this.controlService.resumeAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async killSession(sessionId: string, force = true): Promise<void> {
await this.controlService.killAgent(sessionId, INTERNAL_OPERATOR_ID, force);
}
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
const replayMessages = await this.messagesService.getReplayMessages(
sessionId,
DEFAULT_MESSAGE_LIMIT
);
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
for (const message of replayMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
for (;;) {
const newMessages = await this.messagesService.getMessagesAfter(
sessionId,
lastSeenTimestamp,
lastSeenMessageId
);
for (const message of newMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
await this.delay(STREAM_POLL_INTERVAL_MS);
}
}
isAvailable(): Promise<boolean> {
return Promise.resolve(true);
}
private toAgentSession(session: AgentSessionTree): AgentSession {
const metadata = this.toMetadata(session.metadata);
return {
id: session.sessionId,
providerId: this.providerId,
providerType: this.providerType,
...(session.taskId !== null ? { label: session.taskId } : {}),
status: this.toSessionStatus(session.status),
...(session.parentSessionId !== null ? { parentSessionId: session.parentSessionId } : {}),
createdAt: session.spawnedAt,
updatedAt: session.completedAt ?? session.spawnedAt,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toAgentMessage(message: AgentConversationMessage): AgentMessage {
const metadata = this.toMetadata(message.metadata);
return {
id: message.id,
sessionId: message.sessionId,
role: this.toMessageRole(message.role),
content: message.content,
timestamp: message.timestamp,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toSessionStatus(status: string): AgentSessionStatus {
switch (status) {
case "running":
return "active";
case "paused":
return "paused";
case "completed":
return "completed";
case "failed":
case "killed":
return "failed";
case "spawning":
default:
return "idle";
}
}
private toMessageRole(role: string): AgentMessageRole {
switch (role) {
case "agent":
case "assistant":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private normalizeMessageLimit(limit: number): number {
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_MESSAGE_LIMIT;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_MESSAGE_LIMIT);
}
private parseSkip(before?: string): number {
if (!before) {
return 0;
}
const parsed = Number.parseInt(before, 10);
if (Number.isNaN(parsed) || parsed < 0) {
return 0;
}
return parsed;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (value !== null && typeof value === "object" && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return undefined;
}
private async delay(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}