diff --git a/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts b/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts index 0241c6a..baf68ec 100644 --- a/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts +++ b/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts @@ -12,6 +12,7 @@ import { InternalAgentProvider } from "./internal-agent.provider"; type MockProvider = IAgentProvider & { listSessions: ReturnType; + getSession: ReturnType; }; const emptyMessageStream = async function* (): AsyncIterable { @@ -134,4 +135,68 @@ describe("AgentProviderRegistry", () => { expect.stringContaining("Failed to list sessions for provider failing") ); }); + + it("finds a provider for an existing session", async () => { + const targetSession: AgentSession = { + id: "session-found", + providerId: "openclaw", + providerType: "external", + status: "active", + createdAt: new Date("2026-03-07T12:00:00.000Z"), + updatedAt: new Date("2026-03-07T12:10:00.000Z"), + }; + + const openclawProvider = createProvider("openclaw"); + openclawProvider.getSession.mockResolvedValue(targetSession); + + registry.onModuleInit(); + registry.registerProvider(openclawProvider); + + const result = await registry.getProviderForSession(targetSession.id); + + expect(result).toEqual({ + provider: openclawProvider, + session: targetSession, + }); + expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id); + expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id); + }); + + it("returns null when no provider has the requested session", async () => { + const openclawProvider = createProvider("openclaw"); + + registry.onModuleInit(); + registry.registerProvider(openclawProvider); + + await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull(); + }); + + it("continues searching providers when getSession throws", async () => { + const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined); + const failingProvider = createProvider("failing"); + failingProvider.getSession.mockRejectedValue(new Error("provider timeout")); + + const healthySession: AgentSession = { + id: "session-healthy", + providerId: "healthy", + providerType: "external", + status: "active", + createdAt: new Date("2026-03-07T12:15:00.000Z"), + updatedAt: new Date("2026-03-07T12:16:00.000Z"), + }; + + const healthyProvider = createProvider("healthy"); + healthyProvider.getSession.mockResolvedValue(healthySession); + + registry.onModuleInit(); + registry.registerProvider(failingProvider); + registry.registerProvider(healthyProvider); + + const result = await registry.getProviderForSession(healthySession.id); + + expect(result).toEqual({ provider: healthyProvider, session: healthySession }); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("Failed to get session session-healthy for provider failing") + ); + }); }); diff --git a/apps/orchestrator/src/api/agents/agent-provider.registry.ts b/apps/orchestrator/src/api/agents/agent-provider.registry.ts index 526a65e..cc15c21 100644 --- a/apps/orchestrator/src/api/agents/agent-provider.registry.ts +++ b/apps/orchestrator/src/api/agents/agent-provider.registry.ts @@ -26,6 +26,28 @@ export class AgentProviderRegistry implements OnModuleInit { return this.providers.get(providerId) ?? null; } + async getProviderForSession( + sessionId: string + ): Promise<{ provider: IAgentProvider; session: AgentSession } | null> { + for (const provider of this.providers.values()) { + try { + const session = await provider.getSession(sessionId); + if (session !== null) { + return { + provider, + session, + }; + } + } catch (error) { + this.logger.warn( + `Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}` + ); + } + } + + return null; + } + async listAllSessions(): Promise { const providers = [...this.providers.values()]; const sessionsByProvider = await Promise.all( diff --git a/apps/orchestrator/src/api/mission-control/dto/get-mission-control-messages-query.dto.ts b/apps/orchestrator/src/api/mission-control/dto/get-mission-control-messages-query.dto.ts new file mode 100644 index 0000000..fb610f0 --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/dto/get-mission-control-messages-query.dto.ts @@ -0,0 +1,15 @@ +import { Type } from "class-transformer"; +import { IsInt, IsOptional, IsString, Max, Min } from "class-validator"; + +export class GetMissionControlMessagesQueryDto { + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(1) + @Max(200) + limit?: number; + + @IsOptional() + @IsString() + before?: string; +} diff --git a/apps/orchestrator/src/api/mission-control/dto/kill-session.dto.ts b/apps/orchestrator/src/api/mission-control/dto/kill-session.dto.ts new file mode 100644 index 0000000..6993abc --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/dto/kill-session.dto.ts @@ -0,0 +1,7 @@ +import { IsBoolean, IsOptional } from "class-validator"; + +export class KillSessionDto { + @IsOptional() + @IsBoolean() + force?: boolean; +} diff --git a/apps/orchestrator/src/api/mission-control/mission-control.controller.ts b/apps/orchestrator/src/api/mission-control/mission-control.controller.ts new file mode 100644 index 0000000..f7f3798 --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/mission-control.controller.ts @@ -0,0 +1,183 @@ +import { + Body, + Controller, + Get, + Header, + HttpCode, + MessageEvent, + Param, + Post, + Query, + Request, + Sse, + UseGuards, + UsePipes, + ValidationPipe, +} from "@nestjs/common"; +import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared"; +import { Observable } from "rxjs"; +import { AuthGuard } from "../../auth/guards/auth.guard"; +import { InjectAgentDto } from "../agents/dto/inject-agent.dto"; +import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto"; +import { KillSessionDto } from "./dto/kill-session.dto"; +import { MissionControlService } from "./mission-control.service"; + +const DEFAULT_OPERATOR_ID = "mission-control"; + +interface MissionControlRequest { + user?: { + id?: string; + }; +} + +@Controller("api/mission-control") +@UseGuards(AuthGuard) +export class MissionControlController { + constructor(private readonly missionControlService: MissionControlService) {} + + @Get("sessions") + async listSessions(): Promise<{ sessions: AgentSession[] }> { + const sessions = await this.missionControlService.listSessions(); + return { sessions }; + } + + @Get("sessions/:sessionId") + getSession(@Param("sessionId") sessionId: string): Promise { + return this.missionControlService.getSession(sessionId); + } + + @Get("sessions/:sessionId/messages") + @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) + async getMessages( + @Param("sessionId") sessionId: string, + @Query() query: GetMissionControlMessagesQueryDto + ): Promise<{ messages: AgentMessage[] }> { + const messages = await this.missionControlService.getMessages( + sessionId, + query.limit, + query.before + ); + + return { messages }; + } + + @Post("sessions/:sessionId/inject") + @HttpCode(200) + @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) + injectMessage( + @Param("sessionId") sessionId: string, + @Body() dto: InjectAgentDto, + @Request() req: MissionControlRequest + ): Promise { + return this.missionControlService.injectMessage( + sessionId, + dto.message, + this.resolveOperatorId(req) + ); + } + + @Post("sessions/:sessionId/pause") + @HttpCode(200) + async pauseSession( + @Param("sessionId") sessionId: string, + @Request() req: MissionControlRequest + ): Promise<{ message: string }> { + await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req)); + + return { message: `Session ${sessionId} paused` }; + } + + @Post("sessions/:sessionId/resume") + @HttpCode(200) + async resumeSession( + @Param("sessionId") sessionId: string, + @Request() req: MissionControlRequest + ): Promise<{ message: string }> { + await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req)); + + return { message: `Session ${sessionId} resumed` }; + } + + @Post("sessions/:sessionId/kill") + @HttpCode(200) + @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) + async killSession( + @Param("sessionId") sessionId: string, + @Body() dto: KillSessionDto, + @Request() req: MissionControlRequest + ): Promise<{ message: string }> { + await this.missionControlService.killSession( + sessionId, + dto.force ?? true, + this.resolveOperatorId(req) + ); + + return { message: `Session ${sessionId} killed` }; + } + + @Sse("sessions/:sessionId/stream") + @Header("Content-Type", "text/event-stream") + @Header("Cache-Control", "no-cache") + streamSessionMessages(@Param("sessionId") sessionId: string): Observable { + return new Observable((subscriber) => { + let isClosed = false; + let iterator: AsyncIterator | null = null; + + void this.missionControlService + .streamMessages(sessionId) + .then(async (stream) => { + iterator = stream[Symbol.asyncIterator](); + + for (;;) { + if (isClosed) { + break; + } + + const next = (await iterator.next()) as { done: boolean; value: AgentMessage }; + if (next.done) { + break; + } + + subscriber.next({ + data: this.toStreamPayload(next.value), + }); + } + + subscriber.complete(); + }) + .catch((error: unknown) => { + subscriber.error(error); + }); + + return () => { + isClosed = true; + void iterator?.return?.(); + }; + }); + } + + private resolveOperatorId(req: MissionControlRequest): string { + const operatorId = req.user?.id; + return typeof operatorId === "string" && operatorId.length > 0 + ? operatorId + : DEFAULT_OPERATOR_ID; + } + + private toStreamPayload(message: AgentMessage): { + id: string; + sessionId: string; + role: string; + content: string; + timestamp: string; + metadata?: Record; + } { + return { + id: message.id, + sessionId: message.sessionId, + role: message.role, + content: message.content, + timestamp: message.timestamp.toISOString(), + ...(message.metadata !== undefined ? { metadata: message.metadata } : {}), + }; + } +} diff --git a/apps/orchestrator/src/api/mission-control/mission-control.module.ts b/apps/orchestrator/src/api/mission-control/mission-control.module.ts new file mode 100644 index 0000000..7dca566 --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/mission-control.module.ts @@ -0,0 +1,13 @@ +import { Module } from "@nestjs/common"; +import { AgentsModule } from "../agents/agents.module"; +import { AuthModule } from "../../auth/auth.module"; +import { PrismaModule } from "../../prisma/prisma.module"; +import { MissionControlController } from "./mission-control.controller"; +import { MissionControlService } from "./mission-control.service"; + +@Module({ + imports: [AgentsModule, AuthModule, PrismaModule], + controllers: [MissionControlController], + providers: [MissionControlService], +}) +export class MissionControlModule {} diff --git a/apps/orchestrator/src/api/mission-control/mission-control.service.spec.ts b/apps/orchestrator/src/api/mission-control/mission-control.service.spec.ts new file mode 100644 index 0000000..b8c0258 --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/mission-control.service.spec.ts @@ -0,0 +1,213 @@ +import { NotFoundException } from "@nestjs/common"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared"; +import type { PrismaService } from "../../prisma/prisma.service"; +import { AgentProviderRegistry } from "../agents/agent-provider.registry"; +import { MissionControlService } from "./mission-control.service"; + +type MockProvider = IAgentProvider & { + listSessions: ReturnType; + getSession: ReturnType; + getMessages: ReturnType; + injectMessage: ReturnType; + pauseSession: ReturnType; + resumeSession: ReturnType; + killSession: ReturnType; + streamMessages: ReturnType; +}; + +const emptyMessageStream = async function* (): AsyncIterable { + return; +}; + +const createProvider = (providerId = "internal"): MockProvider => ({ + providerId, + providerType: providerId, + displayName: providerId, + listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }), + getSession: vi.fn().mockResolvedValue(null), + getMessages: vi.fn().mockResolvedValue([]), + injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult), + pauseSession: vi.fn().mockResolvedValue(undefined), + resumeSession: vi.fn().mockResolvedValue(undefined), + killSession: vi.fn().mockResolvedValue(undefined), + streamMessages: vi.fn().mockReturnValue(emptyMessageStream()), + isAvailable: vi.fn().mockResolvedValue(true), +}); + +describe("MissionControlService", () => { + let service: MissionControlService; + let registry: { + listAllSessions: ReturnType; + getProviderForSession: ReturnType; + }; + let prisma: { + operatorAuditLog: { + create: ReturnType; + }; + }; + + const session: AgentSession = { + id: "session-1", + providerId: "internal", + providerType: "internal", + status: "active", + createdAt: new Date("2026-03-07T14:00:00.000Z"), + updatedAt: new Date("2026-03-07T14:01:00.000Z"), + }; + + beforeEach(() => { + registry = { + listAllSessions: vi.fn().mockResolvedValue([session]), + getProviderForSession: vi.fn().mockResolvedValue(null), + }; + + prisma = { + operatorAuditLog: { + create: vi.fn().mockResolvedValue(undefined), + }, + }; + + service = new MissionControlService( + registry as unknown as AgentProviderRegistry, + prisma as unknown as PrismaService + ); + }); + + it("lists sessions from the registry", async () => { + await expect(service.listSessions()).resolves.toEqual([session]); + expect(registry.listAllSessions).toHaveBeenCalledTimes(1); + }); + + it("returns a session when it is found", async () => { + const provider = createProvider("internal"); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await expect(service.getSession(session.id)).resolves.toEqual(session); + }); + + it("throws NotFoundException when session lookup fails", async () => { + await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException); + }); + + it("gets messages from the resolved provider", async () => { + const provider = createProvider("openclaw"); + const messages: AgentMessage[] = [ + { + id: "message-1", + sessionId: session.id, + role: "assistant", + content: "hello", + timestamp: new Date("2026-03-07T14:01:00.000Z"), + }, + ]; + + provider.getMessages.mockResolvedValue(messages); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages); + expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10"); + }); + + it("injects a message and writes an audit log", async () => { + const provider = createProvider("internal"); + const injectResult: InjectResult = { accepted: true, messageId: "msg-1" }; + provider.injectMessage.mockResolvedValue(injectResult); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual( + injectResult + ); + + expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it"); + expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({ + data: { + sessionId: session.id, + userId: "operator-1", + provider: "internal", + action: "inject", + content: "ship it", + metadata: { + payload: { message: "ship it" }, + }, + }, + }); + }); + + it("pauses and resumes using default operator id", async () => { + const provider = createProvider("openclaw"); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await service.pauseSession(session.id); + await service.resumeSession(session.id); + + expect(provider.pauseSession).toHaveBeenCalledWith(session.id); + expect(provider.resumeSession).toHaveBeenCalledWith(session.id); + expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, { + data: { + sessionId: session.id, + userId: "mission-control", + provider: "openclaw", + action: "pause", + metadata: { + payload: {}, + }, + }, + }); + expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, { + data: { + sessionId: session.id, + userId: "mission-control", + provider: "openclaw", + action: "resume", + metadata: { + payload: {}, + }, + }, + }); + }); + + it("kills with provided force value and writes audit log", async () => { + const provider = createProvider("openclaw"); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await service.killSession(session.id, false, "operator-2"); + + expect(provider.killSession).toHaveBeenCalledWith(session.id, false); + expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({ + data: { + sessionId: session.id, + userId: "operator-2", + provider: "openclaw", + action: "kill", + metadata: { + payload: { force: false }, + }, + }, + }); + }); + + it("resolves provider message stream", async () => { + const provider = createProvider("internal"); + const messageStream = (async function* (): AsyncIterable { + yield { + id: "message-1", + sessionId: session.id, + role: "assistant", + content: "stream", + timestamp: new Date("2026-03-07T14:03:00.000Z"), + }; + })(); + + provider.streamMessages.mockReturnValue(messageStream); + registry.getProviderForSession.mockResolvedValue({ provider, session }); + + await expect(service.streamMessages(session.id)).resolves.toBe(messageStream); + expect(provider.streamMessages).toHaveBeenCalledWith(session.id); + }); + + it("does not write audit log when session cannot be resolved", async () => { + await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException); + expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/orchestrator/src/api/mission-control/mission-control.service.ts b/apps/orchestrator/src/api/mission-control/mission-control.service.ts new file mode 100644 index 0000000..2e8060f --- /dev/null +++ b/apps/orchestrator/src/api/mission-control/mission-control.service.ts @@ -0,0 +1,139 @@ +import { Injectable, NotFoundException } from "@nestjs/common"; +import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared"; +import type { Prisma } from "@prisma/client"; +import { PrismaService } from "../../prisma/prisma.service"; +import { AgentProviderRegistry } from "../agents/agent-provider.registry"; + +type MissionControlAction = "inject" | "pause" | "resume" | "kill"; + +const DEFAULT_OPERATOR_ID = "mission-control"; + +@Injectable() +export class MissionControlService { + constructor( + private readonly registry: AgentProviderRegistry, + private readonly prisma: PrismaService + ) {} + + listSessions(): Promise { + return this.registry.listAllSessions(); + } + + async getSession(sessionId: string): Promise { + const resolved = await this.registry.getProviderForSession(sessionId); + if (!resolved) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + return resolved.session; + } + + async getMessages(sessionId: string, limit?: number, before?: string): Promise { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + return provider.getMessages(sessionId, limit, before); + } + + async injectMessage( + sessionId: string, + message: string, + operatorId = DEFAULT_OPERATOR_ID + ): Promise { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + const result = await provider.injectMessage(sessionId, message); + + await this.writeOperatorAuditLog({ + sessionId, + providerId: provider.providerId, + operatorId, + action: "inject", + content: message, + payload: { message }, + }); + + return result; + } + + async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + await provider.pauseSession(sessionId); + + await this.writeOperatorAuditLog({ + sessionId, + providerId: provider.providerId, + operatorId, + action: "pause", + payload: {}, + }); + } + + async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + await provider.resumeSession(sessionId); + + await this.writeOperatorAuditLog({ + sessionId, + providerId: provider.providerId, + operatorId, + action: "resume", + payload: {}, + }); + } + + async killSession( + sessionId: string, + force = true, + operatorId = DEFAULT_OPERATOR_ID + ): Promise { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + await provider.killSession(sessionId, force); + + await this.writeOperatorAuditLog({ + sessionId, + providerId: provider.providerId, + operatorId, + action: "kill", + payload: { force }, + }); + } + + async streamMessages(sessionId: string): Promise> { + const { provider } = await this.getProviderForSessionOrThrow(sessionId); + return provider.streamMessages(sessionId); + } + + private async getProviderForSessionOrThrow( + sessionId: string + ): Promise<{ provider: IAgentProvider; session: AgentSession }> { + const resolved = await this.registry.getProviderForSession(sessionId); + + if (!resolved) { + throw new NotFoundException(`Session ${sessionId} not found`); + } + + return resolved; + } + + private toJsonValue(value: Record): Prisma.InputJsonValue { + return value as Prisma.InputJsonValue; + } + + private async writeOperatorAuditLog(params: { + sessionId: string; + providerId: string; + operatorId: string; + action: MissionControlAction; + content?: string; + payload: Record; + }): Promise { + await this.prisma.operatorAuditLog.create({ + data: { + sessionId: params.sessionId, + userId: params.operatorId, + provider: params.providerId, + action: params.action, + ...(params.content !== undefined ? { content: params.content } : {}), + metadata: this.toJsonValue({ payload: params.payload }), + }, + }); + } +} diff --git a/apps/orchestrator/src/app.module.ts b/apps/orchestrator/src/app.module.ts index 3b337cb..6aeb7bd 100644 --- a/apps/orchestrator/src/app.module.ts +++ b/apps/orchestrator/src/app.module.ts @@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq"; import { ThrottlerModule } from "@nestjs/throttler"; import { HealthModule } from "./api/health/health.module"; import { AgentsModule } from "./api/agents/agents.module"; +import { MissionControlModule } from "./api/mission-control/mission-control.module"; import { QueueApiModule } from "./api/queue/queue-api.module"; import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module"; import { CoordinatorModule } from "./coordinator/coordinator.module"; @@ -53,6 +54,7 @@ import { orchestratorConfig } from "./config/orchestrator.config"; HealthModule, AgentsModule, AgentProvidersModule, + MissionControlModule, QueueApiModule, CoordinatorModule, BudgetModule, diff --git a/apps/orchestrator/src/auth/auth.module.ts b/apps/orchestrator/src/auth/auth.module.ts new file mode 100644 index 0000000..d26bd75 --- /dev/null +++ b/apps/orchestrator/src/auth/auth.module.ts @@ -0,0 +1,9 @@ +import { Module } from "@nestjs/common"; +import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard"; +import { AuthGuard } from "./guards/auth.guard"; + +@Module({ + providers: [OrchestratorApiKeyGuard, AuthGuard], + exports: [AuthGuard], +}) +export class AuthModule {} diff --git a/apps/orchestrator/src/auth/guards/auth.guard.ts b/apps/orchestrator/src/auth/guards/auth.guard.ts new file mode 100644 index 0000000..e787f7a --- /dev/null +++ b/apps/orchestrator/src/auth/guards/auth.guard.ts @@ -0,0 +1,11 @@ +import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common"; +import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; + +@Injectable() +export class AuthGuard implements CanActivate { + constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {} + + canActivate(context: ExecutionContext): boolean | Promise { + return this.apiKeyGuard.canActivate(context); + } +}