diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index bddc35b..717f611 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -60,6 +60,7 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module"; import { OnboardingModule } from "./onboarding/onboarding.module"; import { ChatProxyModule } from "./chat-proxy/chat-proxy.module"; +import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module"; import { OrchestratorModule } from "./orchestrator/orchestrator.module"; @Module({ @@ -142,6 +143,7 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module"; FleetSettingsModule, OnboardingModule, ChatProxyModule, + MissionControlProxyModule, OrchestratorModule, ], controllers: [AppController, CsrfController], diff --git a/apps/api/src/mission-control-proxy/mission-control-proxy.controller.ts b/apps/api/src/mission-control-proxy/mission-control-proxy.controller.ts new file mode 100644 index 0000000..ccaeac8 --- /dev/null +++ b/apps/api/src/mission-control-proxy/mission-control-proxy.controller.ts @@ -0,0 +1,286 @@ +import { + Body, + Controller, + Get, + Logger, + Param, + Post, + Req, + Res, + ServiceUnavailableException, + UseGuards, +} from "@nestjs/common"; +import type { Request, Response } from "express"; +import { AuthGuard } from "../auth/guards/auth.guard"; + +const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL"; +const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY"; + +@Controller("mission-control") +@UseGuards(AuthGuard) +export class MissionControlProxyController { + private readonly logger = new Logger(MissionControlProxyController.name); + private readonly orchestratorUrl: string; + private readonly orchestratorApiKey: string; + + constructor() { + this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY); + this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY); + } + + @Get("sessions") + proxySessions(@Req() req: Request, @Res() res: Response): Promise { + return this.proxyRequest("GET", "sessions", req, res); + } + + @Get("sessions/:sessionId") + proxySession( + @Param("sessionId") sessionId: string, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("GET", `sessions/${sessionId}`, req, res); + } + + @Get("sessions/:sessionId/messages") + proxyMessages( + @Param("sessionId") sessionId: string, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res); + } + + @Get("audit-log") + proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise { + return this.proxyRequest("GET", "audit-log", req, res); + } + + @Post("sessions/:sessionId/inject") + proxyInject( + @Param("sessionId") sessionId: string, + @Body() body: unknown, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body); + } + + @Post("sessions/:sessionId/pause") + proxyPause( + @Param("sessionId") sessionId: string, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res); + } + + @Post("sessions/:sessionId/resume") + proxyResume( + @Param("sessionId") sessionId: string, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res); + } + + @Post("sessions/:sessionId/kill") + proxyKill( + @Param("sessionId") sessionId: string, + @Body() body: unknown, + @Req() req: Request, + @Res() res: Response + ): Promise { + return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body); + } + + @Get("sessions/:sessionId/stream") + async proxySessionStream( + @Param("sessionId") sessionId: string, + @Req() req: Request, + @Res() res: Response + ): Promise { + const abortController = new AbortController(); + req.once("close", () => { + abortController.abort(); + }); + + try { + const upstream = await this.fetchUpstream( + "GET", + `sessions/${sessionId}/stream`, + req.query, + undefined, + abortController.signal + ); + + if (!upstream.ok || !upstream.body) { + await this.sendStandardResponse(upstream, res); + return; + } + + res.status(upstream.status); + this.copyHeaderIfPresent(upstream, res, "content-type"); + this.copyHeaderIfPresent(upstream, res, "cache-control"); + this.copyHeaderIfPresent(upstream, res, "connection"); + res.setHeader("X-Accel-Buffering", "no"); + + if (typeof res.flushHeaders === "function") { + res.flushHeaders(); + } + + for await (const chunk of upstream.body as unknown as AsyncIterable) { + if (res.writableEnded || res.destroyed) { + break; + } + + res.write(Buffer.from(chunk)); + } + + if (!res.writableEnded && !res.destroyed) { + res.end(); + } + } catch (error: unknown) { + if (this.isAbortError(error)) { + return; + } + + const message = error instanceof Error ? error.message : String(error); + this.logger.warn(`Mission Control stream proxy request failed: ${message}`); + + if (!res.headersSent) { + res.status(503).json({ message: "Failed to proxy Mission Control request" }); + } else if (!res.writableEnded && !res.destroyed) { + res.end(); + } + } + } + + private async proxyRequest( + method: "GET" | "POST", + path: string, + req: Request, + res: Response, + body?: unknown + ): Promise { + const abortController = new AbortController(); + req.once("close", () => { + abortController.abort(); + }); + + try { + const upstream = await this.fetchUpstream( + method, + path, + req.query, + body, + abortController.signal + ); + await this.sendStandardResponse(upstream, res); + } catch (error: unknown) { + if (this.isAbortError(error)) { + return; + } + + this.handleProxyError(error); + } + } + + private async fetchUpstream( + method: "GET" | "POST", + path: string, + query: Request["query"], + body: unknown, + signal: AbortSignal + ): Promise { + const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl); + this.appendQueryParams(url.searchParams, query); + + const headers: Record = { + "X-API-Key": this.orchestratorApiKey, + }; + + const requestInit: RequestInit = { + method, + headers, + signal, + }; + + if (method === "POST" && body !== undefined) { + headers["Content-Type"] = "application/json"; + requestInit.body = JSON.stringify(body); + } + + return fetch(url.toString(), requestInit); + } + + private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void { + for (const [key, value] of Object.entries(query)) { + this.appendQueryValue(searchParams, key, value); + } + } + + private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void { + if (value === undefined || value === null) { + return; + } + + if (Array.isArray(value)) { + for (const item of value) { + this.appendQueryValue(searchParams, key, item); + } + return; + } + + if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") { + searchParams.append(key, String(value)); + } + } + + private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise { + res.status(upstream.status); + this.copyHeaderIfPresent(upstream, res, "content-type"); + this.copyHeaderIfPresent(upstream, res, "cache-control"); + this.copyHeaderIfPresent(upstream, res, "location"); + + const responseText = await upstream.text(); + + if (responseText.length === 0) { + res.end(); + return; + } + + res.send(responseText); + } + + private copyHeaderIfPresent( + upstream: globalThis.Response, + res: Response, + headerName: string + ): void { + const value = upstream.headers.get(headerName); + if (value) { + res.setHeader(headerName, value); + } + } + + private handleProxyError(error: unknown): never { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn(`Mission Control proxy request failed: ${message}`); + throw new ServiceUnavailableException("Failed to proxy Mission Control request"); + } + + private isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === "AbortError"; + } + + private requireEnv(key: string): string { + const value = process.env[key]; + + if (typeof value !== "string" || value.trim().length === 0) { + throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`); + } + + return value; + } +} diff --git a/apps/api/src/mission-control-proxy/mission-control-proxy.module.ts b/apps/api/src/mission-control-proxy/mission-control-proxy.module.ts new file mode 100644 index 0000000..83580a1 --- /dev/null +++ b/apps/api/src/mission-control-proxy/mission-control-proxy.module.ts @@ -0,0 +1,9 @@ +import { Module } from "@nestjs/common"; +import { AuthModule } from "../auth/auth.module"; +import { MissionControlProxyController } from "./mission-control-proxy.controller"; + +@Module({ + imports: [AuthModule], + controllers: [MissionControlProxyController], +}) +export class MissionControlProxyModule {}