import { Body, Controller, Get, Header, HttpCode, MessageEvent, Param, Post, Query, Request, Sse, UseGuards, UsePipes, ValidationPipe, } from "@nestjs/common"; import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared"; import { Observable } from "rxjs"; import { AuthGuard } from "../../auth/guards/auth.guard"; import { InjectAgentDto } from "../agents/dto/inject-agent.dto"; import { GetMissionControlAuditLogQueryDto } from "./dto/get-mission-control-audit-log-query.dto"; import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto"; import { KillSessionDto } from "./dto/kill-session.dto"; import { MissionControlService, type MissionControlAuditLogPage } from "./mission-control.service"; const DEFAULT_OPERATOR_ID = "mission-control"; interface MissionControlRequest { user?: { id?: string; }; } @Controller("api/mission-control") @UseGuards(AuthGuard) export class MissionControlController { constructor(private readonly missionControlService: MissionControlService) {} @Get("sessions") async listSessions(): Promise<{ sessions: AgentSession[] }> { const sessions = await this.missionControlService.listSessions(); return { sessions }; } @Get("sessions/:sessionId") getSession(@Param("sessionId") sessionId: string): Promise { return this.missionControlService.getSession(sessionId); } @Get("sessions/:sessionId/messages") @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) async getMessages( @Param("sessionId") sessionId: string, @Query() query: GetMissionControlMessagesQueryDto ): Promise<{ messages: AgentMessage[] }> { const messages = await this.missionControlService.getMessages( sessionId, query.limit, query.before ); return { messages }; } @Get("audit-log") @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) getAuditLog( @Query() query: GetMissionControlAuditLogQueryDto ): Promise { return this.missionControlService.getAuditLog(query.sessionId, query.page, query.limit); } @Post("sessions/:sessionId/inject") @HttpCode(200) @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) injectMessage( @Param("sessionId") sessionId: string, @Body() dto: InjectAgentDto, @Request() req: MissionControlRequest ): Promise { return this.missionControlService.injectMessage( sessionId, dto.message, this.resolveOperatorId(req) ); } @Post("sessions/:sessionId/pause") @HttpCode(200) async pauseSession( @Param("sessionId") sessionId: string, @Request() req: MissionControlRequest ): Promise<{ message: string }> { await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req)); return { message: `Session ${sessionId} paused` }; } @Post("sessions/:sessionId/resume") @HttpCode(200) async resumeSession( @Param("sessionId") sessionId: string, @Request() req: MissionControlRequest ): Promise<{ message: string }> { await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req)); return { message: `Session ${sessionId} resumed` }; } @Post("sessions/:sessionId/kill") @HttpCode(200) @UsePipes(new ValidationPipe({ transform: true, whitelist: true })) async killSession( @Param("sessionId") sessionId: string, @Body() dto: KillSessionDto, @Request() req: MissionControlRequest ): Promise<{ message: string }> { await this.missionControlService.killSession( sessionId, dto.force ?? true, this.resolveOperatorId(req) ); return { message: `Session ${sessionId} killed` }; } @Sse("sessions/:sessionId/stream") @Header("Content-Type", "text/event-stream") @Header("Cache-Control", "no-cache") streamSessionMessages(@Param("sessionId") sessionId: string): Observable { return new Observable((subscriber) => { let isClosed = false; let iterator: AsyncIterator | null = null; void this.missionControlService .streamMessages(sessionId) .then(async (stream) => { iterator = stream[Symbol.asyncIterator](); for (;;) { if (isClosed) { break; } const next = (await iterator.next()) as { done: boolean; value: AgentMessage }; if (next.done) { break; } subscriber.next({ data: this.toStreamPayload(next.value), }); } subscriber.complete(); }) .catch((error: unknown) => { subscriber.error(error); }); return () => { isClosed = true; void iterator?.return?.(); }; }); } private resolveOperatorId(req: MissionControlRequest): string { const operatorId = req.user?.id; return typeof operatorId === "string" && operatorId.length > 0 ? operatorId : DEFAULT_OPERATOR_ID; } private toStreamPayload(message: AgentMessage): { id: string; sessionId: string; role: string; content: string; timestamp: string; metadata?: Record; } { return { id: message.id, sessionId: message.sessionId, role: message.role, content: message.content, timestamp: message.timestamp.toISOString(), ...(message.metadata !== undefined ? { metadata: message.metadata } : {}), }; } }