Compare commits

...

1 Commits

Author SHA1 Message Date
52e7b0e6e7 feat(orchestrator): add mission control proxy api
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 13:34:57 -06:00
11 changed files with 679 additions and 0 deletions

View File

@@ -12,6 +12,7 @@ import { InternalAgentProvider } from "./internal-agent.provider";
type MockProvider = IAgentProvider & { type MockProvider = IAgentProvider & {
listSessions: ReturnType<typeof vi.fn>; listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
}; };
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> { const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
@@ -134,4 +135,68 @@ describe("AgentProviderRegistry", () => {
expect.stringContaining("Failed to list sessions for provider failing") expect.stringContaining("Failed to list sessions for provider failing")
); );
}); });
it("finds a provider for an existing session", async () => {
const targetSession: AgentSession = {
id: "session-found",
providerId: "openclaw",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T12:00:00.000Z"),
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
};
const openclawProvider = createProvider("openclaw");
openclawProvider.getSession.mockResolvedValue(targetSession);
registry.onModuleInit();
registry.registerProvider(openclawProvider);
const result = await registry.getProviderForSession(targetSession.id);
expect(result).toEqual({
provider: openclawProvider,
session: targetSession,
});
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
});
it("returns null when no provider has the requested session", async () => {
const openclawProvider = createProvider("openclaw");
registry.onModuleInit();
registry.registerProvider(openclawProvider);
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
});
it("continues searching providers when getSession throws", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const failingProvider = createProvider("failing");
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
const healthySession: AgentSession = {
id: "session-healthy",
providerId: "healthy",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T12:15:00.000Z"),
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
};
const healthyProvider = createProvider("healthy");
healthyProvider.getSession.mockResolvedValue(healthySession);
registry.onModuleInit();
registry.registerProvider(failingProvider);
registry.registerProvider(healthyProvider);
const result = await registry.getProviderForSession(healthySession.id);
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Failed to get session session-healthy for provider failing")
);
});
}); });

View File

@@ -26,6 +26,28 @@ export class AgentProviderRegistry implements OnModuleInit {
return this.providers.get(providerId) ?? null; return this.providers.get(providerId) ?? null;
} }
async getProviderForSession(
sessionId: string
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
for (const provider of this.providers.values()) {
try {
const session = await provider.getSession(sessionId);
if (session !== null) {
return {
provider,
session,
};
}
} catch (error) {
this.logger.warn(
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
);
}
}
return null;
}
async listAllSessions(): Promise<AgentSession[]> { async listAllSessions(): Promise<AgentSession[]> {
const providers = [...this.providers.values()]; const providers = [...this.providers.values()];
const sessionsByProvider = await Promise.all( const sessionsByProvider = await Promise.all(

View File

@@ -0,0 +1,15 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
export class GetMissionControlMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit?: number;
@IsOptional()
@IsString()
before?: string;
}

View File

@@ -0,0 +1,7 @@
import { IsBoolean, IsOptional } from "class-validator";
export class KillSessionDto {
@IsOptional()
@IsBoolean()
force?: boolean;
}

View File

@@ -0,0 +1,183 @@
import {
Body,
Controller,
Get,
Header,
HttpCode,
MessageEvent,
Param,
Post,
Query,
Request,
Sse,
UseGuards,
UsePipes,
ValidationPipe,
} from "@nestjs/common";
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
import { Observable } from "rxjs";
import { AuthGuard } from "../../auth/guards/auth.guard";
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
import { KillSessionDto } from "./dto/kill-session.dto";
import { MissionControlService } from "./mission-control.service";
const DEFAULT_OPERATOR_ID = "mission-control";
interface MissionControlRequest {
user?: {
id?: string;
};
}
@Controller("api/mission-control")
@UseGuards(AuthGuard)
export class MissionControlController {
constructor(private readonly missionControlService: MissionControlService) {}
@Get("sessions")
async listSessions(): Promise<{ sessions: AgentSession[] }> {
const sessions = await this.missionControlService.listSessions();
return { sessions };
}
@Get("sessions/:sessionId")
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
return this.missionControlService.getSession(sessionId);
}
@Get("sessions/:sessionId/messages")
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getMessages(
@Param("sessionId") sessionId: string,
@Query() query: GetMissionControlMessagesQueryDto
): Promise<{ messages: AgentMessage[] }> {
const messages = await this.missionControlService.getMessages(
sessionId,
query.limit,
query.before
);
return { messages };
}
@Post("sessions/:sessionId/inject")
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
injectMessage(
@Param("sessionId") sessionId: string,
@Body() dto: InjectAgentDto,
@Request() req: MissionControlRequest
): Promise<InjectResult> {
return this.missionControlService.injectMessage(
sessionId,
dto.message,
this.resolveOperatorId(req)
);
}
@Post("sessions/:sessionId/pause")
@HttpCode(200)
async pauseSession(
@Param("sessionId") sessionId: string,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
return { message: `Session ${sessionId} paused` };
}
@Post("sessions/:sessionId/resume")
@HttpCode(200)
async resumeSession(
@Param("sessionId") sessionId: string,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
return { message: `Session ${sessionId} resumed` };
}
@Post("sessions/:sessionId/kill")
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async killSession(
@Param("sessionId") sessionId: string,
@Body() dto: KillSessionDto,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.killSession(
sessionId,
dto.force ?? true,
this.resolveOperatorId(req)
);
return { message: `Session ${sessionId} killed` };
}
@Sse("sessions/:sessionId/stream")
@Header("Content-Type", "text/event-stream")
@Header("Cache-Control", "no-cache")
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let iterator: AsyncIterator<AgentMessage> | null = null;
void this.missionControlService
.streamMessages(sessionId)
.then(async (stream) => {
iterator = stream[Symbol.asyncIterator]();
for (;;) {
if (isClosed) {
break;
}
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
if (next.done) {
break;
}
subscriber.next({
data: this.toStreamPayload(next.value),
});
}
subscriber.complete();
})
.catch((error: unknown) => {
subscriber.error(error);
});
return () => {
isClosed = true;
void iterator?.return?.();
};
});
}
private resolveOperatorId(req: MissionControlRequest): string {
const operatorId = req.user?.id;
return typeof operatorId === "string" && operatorId.length > 0
? operatorId
: DEFAULT_OPERATOR_ID;
}
private toStreamPayload(message: AgentMessage): {
id: string;
sessionId: string;
role: string;
content: string;
timestamp: string;
metadata?: Record<string, unknown>;
} {
return {
id: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
timestamp: message.timestamp.toISOString(),
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
};
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { AgentsModule } from "../agents/agents.module";
import { AuthModule } from "../../auth/auth.module";
import { PrismaModule } from "../../prisma/prisma.module";
import { MissionControlController } from "./mission-control.controller";
import { MissionControlService } from "./mission-control.service";
@Module({
imports: [AgentsModule, AuthModule, PrismaModule],
controllers: [MissionControlController],
providers: [MissionControlService],
})
export class MissionControlModule {}

View File

@@ -0,0 +1,213 @@
import { NotFoundException } from "@nestjs/common";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
import type { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
import { MissionControlService } from "./mission-control.service";
type MockProvider = IAgentProvider & {
listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
getMessages: ReturnType<typeof vi.fn>;
injectMessage: ReturnType<typeof vi.fn>;
pauseSession: ReturnType<typeof vi.fn>;
resumeSession: ReturnType<typeof vi.fn>;
killSession: ReturnType<typeof vi.fn>;
streamMessages: ReturnType<typeof vi.fn>;
};
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
return;
};
const createProvider = (providerId = "internal"): MockProvider => ({
providerId,
providerType: providerId,
displayName: providerId,
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
getSession: vi.fn().mockResolvedValue(null),
getMessages: vi.fn().mockResolvedValue([]),
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
pauseSession: vi.fn().mockResolvedValue(undefined),
resumeSession: vi.fn().mockResolvedValue(undefined),
killSession: vi.fn().mockResolvedValue(undefined),
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
isAvailable: vi.fn().mockResolvedValue(true),
});
describe("MissionControlService", () => {
let service: MissionControlService;
let registry: {
listAllSessions: ReturnType<typeof vi.fn>;
getProviderForSession: ReturnType<typeof vi.fn>;
};
let prisma: {
operatorAuditLog: {
create: ReturnType<typeof vi.fn>;
};
};
const session: AgentSession = {
id: "session-1",
providerId: "internal",
providerType: "internal",
status: "active",
createdAt: new Date("2026-03-07T14:00:00.000Z"),
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
};
beforeEach(() => {
registry = {
listAllSessions: vi.fn().mockResolvedValue([session]),
getProviderForSession: vi.fn().mockResolvedValue(null),
};
prisma = {
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
service = new MissionControlService(
registry as unknown as AgentProviderRegistry,
prisma as unknown as PrismaService
);
});
it("lists sessions from the registry", async () => {
await expect(service.listSessions()).resolves.toEqual([session]);
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
});
it("returns a session when it is found", async () => {
const provider = createProvider("internal");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.getSession(session.id)).resolves.toEqual(session);
});
it("throws NotFoundException when session lookup fails", async () => {
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
});
it("gets messages from the resolved provider", async () => {
const provider = createProvider("openclaw");
const messages: AgentMessage[] = [
{
id: "message-1",
sessionId: session.id,
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T14:01:00.000Z"),
},
];
provider.getMessages.mockResolvedValue(messages);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
});
it("injects a message and writes an audit log", async () => {
const provider = createProvider("internal");
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
provider.injectMessage.mockResolvedValue(injectResult);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
injectResult
);
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: session.id,
userId: "operator-1",
provider: "internal",
action: "inject",
content: "ship it",
metadata: {
payload: { message: "ship it" },
},
},
});
});
it("pauses and resumes using default operator id", async () => {
const provider = createProvider("openclaw");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await service.pauseSession(session.id);
await service.resumeSession(session.id);
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
data: {
sessionId: session.id,
userId: "mission-control",
provider: "openclaw",
action: "pause",
metadata: {
payload: {},
},
},
});
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
data: {
sessionId: session.id,
userId: "mission-control",
provider: "openclaw",
action: "resume",
metadata: {
payload: {},
},
},
});
});
it("kills with provided force value and writes audit log", async () => {
const provider = createProvider("openclaw");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await service.killSession(session.id, false, "operator-2");
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: session.id,
userId: "operator-2",
provider: "openclaw",
action: "kill",
metadata: {
payload: { force: false },
},
},
});
});
it("resolves provider message stream", async () => {
const provider = createProvider("internal");
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
yield {
id: "message-1",
sessionId: session.id,
role: "assistant",
content: "stream",
timestamp: new Date("2026-03-07T14:03:00.000Z"),
};
})();
provider.streamMessages.mockReturnValue(messageStream);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
});
it("does not write audit log when session cannot be resolved", async () => {
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,139 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
const DEFAULT_OPERATOR_ID = "mission-control";
@Injectable()
export class MissionControlService {
constructor(
private readonly registry: AgentProviderRegistry,
private readonly prisma: PrismaService
) {}
listSessions(): Promise<AgentSession[]> {
return this.registry.listAllSessions();
}
async getSession(sessionId: string): Promise<AgentSession> {
const resolved = await this.registry.getProviderForSession(sessionId);
if (!resolved) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
return resolved.session;
}
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
return provider.getMessages(sessionId, limit, before);
}
async injectMessage(
sessionId: string,
message: string,
operatorId = DEFAULT_OPERATOR_ID
): Promise<InjectResult> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
const result = await provider.injectMessage(sessionId, message);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "inject",
content: message,
payload: { message },
});
return result;
}
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.pauseSession(sessionId);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "pause",
payload: {},
});
}
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.resumeSession(sessionId);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "resume",
payload: {},
});
}
async killSession(
sessionId: string,
force = true,
operatorId = DEFAULT_OPERATOR_ID
): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.killSession(sessionId, force);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "kill",
payload: { force },
});
}
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
return provider.streamMessages(sessionId);
}
private async getProviderForSessionOrThrow(
sessionId: string
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
const resolved = await this.registry.getProviderForSession(sessionId);
if (!resolved) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
return resolved;
}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
private async writeOperatorAuditLog(params: {
sessionId: string;
providerId: string;
operatorId: string;
action: MissionControlAction;
content?: string;
payload: Record<string, unknown>;
}): Promise<void> {
await this.prisma.operatorAuditLog.create({
data: {
sessionId: params.sessionId,
userId: params.operatorId,
provider: params.providerId,
action: params.action,
...(params.content !== undefined ? { content: params.content } : {}),
metadata: this.toJsonValue({ payload: params.payload }),
},
});
}
}

View File

@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
import { ThrottlerModule } from "@nestjs/throttler"; import { ThrottlerModule } from "@nestjs/throttler";
import { HealthModule } from "./api/health/health.module"; import { HealthModule } from "./api/health/health.module";
import { AgentsModule } from "./api/agents/agents.module"; import { AgentsModule } from "./api/agents/agents.module";
import { MissionControlModule } from "./api/mission-control/mission-control.module";
import { QueueApiModule } from "./api/queue/queue-api.module"; import { QueueApiModule } from "./api/queue/queue-api.module";
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module"; import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
import { CoordinatorModule } from "./coordinator/coordinator.module"; import { CoordinatorModule } from "./coordinator/coordinator.module";
@@ -53,6 +54,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
HealthModule, HealthModule,
AgentsModule, AgentsModule,
AgentProvidersModule, AgentProvidersModule,
MissionControlModule,
QueueApiModule, QueueApiModule,
CoordinatorModule, CoordinatorModule,
BudgetModule, BudgetModule,

View File

@@ -0,0 +1,9 @@
import { Module } from "@nestjs/common";
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
import { AuthGuard } from "./guards/auth.guard";
@Module({
providers: [OrchestratorApiKeyGuard, AuthGuard],
exports: [AuthGuard],
})
export class AuthModule {}

View File

@@ -0,0 +1,11 @@
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
@Injectable()
export class AuthGuard implements CanActivate {
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
return this.apiKeyGuard.canActivate(context);
}
}