Compare commits

..

6 Commits

Author SHA1 Message Date
04bbdf3308 fix(api): proxy mission-control routes to orchestrator
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 14:04:12 -05:00
a6f1438f40 fix(web): route Mission Control API calls through orchestrator proxy (#747)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 17:00:27 +00:00
523662656e Merge pull request 'fix(deploy): use consistent network alias for openbrain-brain-internal' (#746) from fix/compose-network-alias-consistency into main 2026-03-08 16:35:32 +00:00
27120ac3f2 fix(deploy): use consistent network alias for openbrain-brain-internal
All checks were successful
ci/woodpecker/push/ci Infra-only: compose YAML fix, no app code
Service definitions were using 'openbrain_brain-internal' (underscore) but the
networks block defines the alias as 'openbrain-brain-internal' (hyphen), with
name: openbrain_brain-internal pointing to the actual Docker network.

This caused 'undefined network' errors on every Portainer deploy for
orchestrator and synapse services.

Fixed: all service network references now use 'openbrain-brain-internal'.
2026-03-08 11:34:38 -05:00
ad9921107c fix(deploy): add DATABASE_URL and openbrain network to orchestrator + synapse (#745)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 15:51:22 +00:00
3c288f9849 fix(web): add ReactQueryProvider to root layout for Mission Control (#744)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 15:50:40 +00:00
16 changed files with 511 additions and 49 deletions

View File

@@ -60,6 +60,7 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
import { OnboardingModule } from "./onboarding/onboarding.module";
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
@Module({
@@ -142,6 +143,7 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
FleetSettingsModule,
OnboardingModule,
ChatProxyModule,
MissionControlProxyModule,
OrchestratorModule,
],
controllers: [AppController, CsrfController],

View File

@@ -0,0 +1,286 @@
import {
Body,
Controller,
Get,
Logger,
Param,
Post,
Req,
Res,
ServiceUnavailableException,
UseGuards,
} from "@nestjs/common";
import type { Request, Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard";
const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL";
const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY";
@Controller("mission-control")
@UseGuards(AuthGuard)
export class MissionControlProxyController {
private readonly logger = new Logger(MissionControlProxyController.name);
private readonly orchestratorUrl: string;
private readonly orchestratorApiKey: string;
constructor() {
this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY);
this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY);
}
@Get("sessions")
proxySessions(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "sessions", req, res);
}
@Get("sessions/:sessionId")
proxySession(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}`, req, res);
}
@Get("sessions/:sessionId/messages")
proxyMessages(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res);
}
@Get("audit-log")
proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "audit-log", req, res);
}
@Post("sessions/:sessionId/inject")
proxyInject(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body);
}
@Post("sessions/:sessionId/pause")
proxyPause(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res);
}
@Post("sessions/:sessionId/resume")
proxyResume(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res);
}
@Post("sessions/:sessionId/kill")
proxyKill(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body);
}
@Get("sessions/:sessionId/stream")
async proxySessionStream(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
"GET",
`sessions/${sessionId}/stream`,
req.query,
undefined,
abortController.signal
);
if (!upstream.ok || !upstream.body) {
await this.sendStandardResponse(upstream, res);
return;
}
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "connection");
res.setHeader("X-Accel-Buffering", "no");
if (typeof res.flushHeaders === "function") {
res.flushHeaders();
}
for await (const chunk of upstream.body as unknown as AsyncIterable<Uint8Array>) {
if (res.writableEnded || res.destroyed) {
break;
}
res.write(Buffer.from(chunk));
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control stream proxy request failed: ${message}`);
if (!res.headersSent) {
res.status(503).json({ message: "Failed to proxy Mission Control request" });
} else if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
}
private async proxyRequest(
method: "GET" | "POST",
path: string,
req: Request,
res: Response,
body?: unknown
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
method,
path,
req.query,
body,
abortController.signal
);
await this.sendStandardResponse(upstream, res);
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
this.handleProxyError(error);
}
}
private async fetchUpstream(
method: "GET" | "POST",
path: string,
query: Request["query"],
body: unknown,
signal: AbortSignal
): Promise<globalThis.Response> {
const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl);
this.appendQueryParams(url.searchParams, query);
const headers: Record<string, string> = {
"X-API-Key": this.orchestratorApiKey,
};
const requestInit: RequestInit = {
method,
headers,
signal,
};
if (method === "POST" && body !== undefined) {
headers["Content-Type"] = "application/json";
requestInit.body = JSON.stringify(body);
}
return fetch(url.toString(), requestInit);
}
private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void {
for (const [key, value] of Object.entries(query)) {
this.appendQueryValue(searchParams, key, value);
}
}
private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void {
if (value === undefined || value === null) {
return;
}
if (Array.isArray(value)) {
for (const item of value) {
this.appendQueryValue(searchParams, key, item);
}
return;
}
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
searchParams.append(key, String(value));
}
}
private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise<void> {
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "location");
const responseText = await upstream.text();
if (responseText.length === 0) {
res.end();
return;
}
res.send(responseText);
}
private copyHeaderIfPresent(
upstream: globalThis.Response,
res: Response,
headerName: string
): void {
const value = upstream.headers.get(headerName);
if (value) {
res.setHeader(headerName, value);
}
}
private handleProxyError(error: unknown): never {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control proxy request failed: ${message}`);
throw new ServiceUnavailableException("Failed to proxy Mission Control request");
}
private isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
private requireEnv(key: string): string {
const value = process.env[key];
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`);
}
return value;
}
}

View File

@@ -0,0 +1,9 @@
import { Module } from "@nestjs/common";
import { AuthModule } from "../auth/auth.module";
import { MissionControlProxyController } from "./mission-control-proxy.controller";
@Module({
imports: [AuthModule],
controllers: [MissionControlProxyController],
})
export class MissionControlProxyModule {}

View File

@@ -0,0 +1,93 @@
import { type NextRequest, NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
/**
* Generic catch-all proxy for orchestrator API routes.
*
* Forwards any request to /api/orchestrator/<path> → ORCHESTRATOR_URL/<path>
* with the ORCHESTRATOR_API_KEY injected server-side so it never reaches the browser.
*
* Supports GET, POST, PATCH, DELETE, PUT.
*
* Example:
* GET /api/orchestrator/mission-control/sessions
* → GET ORCHESTRATOR_URL/api/mission-control/sessions
* POST /api/orchestrator/mission-control/sessions/abc/kill
* → POST ORCHESTRATOR_URL/api/mission-control/sessions/abc/kill
*/
async function proxyToOrchestrator(
request: NextRequest,
context: { params: Promise<{ path: string[] }> }
): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
const { path } = await context.params;
const upstreamPath = `/${path.join("/")}`;
const search = request.nextUrl.search;
const upstreamUrl = `${getOrchestratorUrl()}${upstreamPath}${search}`;
const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, 30_000);
try {
const headers: Record<string, string> = {
"X-API-Key": orchestratorApiKey,
};
const contentType = request.headers.get("Content-Type");
if (contentType) {
headers["Content-Type"] = contentType;
}
const hasBody = request.method !== "GET" && request.method !== "HEAD";
const body = hasBody ? await request.text() : null;
const upstream = await fetch(upstreamUrl, {
method: request.method,
headers,
...(body !== null ? { body } : {}),
cache: "no-store",
signal: controller.signal,
});
const responseText = await upstream.text();
return new NextResponse(responseText, {
status: upstream.status,
headers: {
"Content-Type": upstream.headers.get("Content-Type") ?? "application/json",
},
});
} catch (error) {
const message =
error instanceof Error && error.name === "AbortError"
? "Orchestrator request timed out."
: "Unable to reach orchestrator.";
return NextResponse.json({ error: message }, { status: 502 });
} finally {
clearTimeout(timeout);
}
}
export const GET = proxyToOrchestrator;
export const POST = proxyToOrchestrator;
export const PATCH = proxyToOrchestrator;
export const PUT = proxyToOrchestrator;
export const DELETE = proxyToOrchestrator;

View File

@@ -4,6 +4,7 @@ import { Outfit, Fira_Code } from "next/font/google";
import { AuthProvider } from "@/lib/auth/auth-context";
import { ErrorBoundary } from "@/components/error-boundary";
import { ThemeProvider } from "@/providers/ThemeProvider";
import { ReactQueryProvider } from "@/providers/ReactQueryProvider";
import "./globals.css";
export const dynamic = "force-dynamic";
@@ -56,9 +57,11 @@ export default function RootLayout({ children }: { children: ReactNode }): React
</head>
<body>
<ThemeProvider>
<ReactQueryProvider>
<ErrorBoundary>
<AuthProvider>{children}</AuthProvider>
</ErrorBoundary>
</ReactQueryProvider>
</ThemeProvider>
</body>
</html>

View File

@@ -158,7 +158,9 @@ async function fetchAuditLog(
}
try {
return await apiGet<AuditLogResponse>(`/api/mission-control/audit-log?${params.toString()}`);
return await apiGet<AuditLogResponse>(
`/api/orchestrator/api/mission-control/audit-log?${params.toString()}`
);
} catch (error) {
if (isRateLimitError(error)) {
return createEmptyAuditLogResponse(page, "Rate limited - retrying...");

View File

@@ -59,9 +59,12 @@ describe("BargeInInput", (): void => {
await user.click(screen.getByRole("button", { name: "Send" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-1/inject", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/session-1/inject",
{
content: "execute plan",
});
}
);
});
expect(onSent).toHaveBeenCalledTimes(1);
@@ -83,12 +86,18 @@ describe("BargeInInput", (): void => {
const calls = mockApiPost.mock.calls as [string, unknown?][];
expect(calls[0]).toEqual(["/api/mission-control/sessions/session-2/pause", undefined]);
expect(calls[0]).toEqual([
"/api/orchestrator/api/mission-control/sessions/session-2/pause",
undefined,
]);
expect(calls[1]).toEqual([
"/api/mission-control/sessions/session-2/inject",
"/api/orchestrator/api/mission-control/sessions/session-2/inject",
{ content: "hello world" },
]);
expect(calls[2]).toEqual(["/api/mission-control/sessions/session-2/resume", undefined]);
expect(calls[2]).toEqual([
"/api/orchestrator/api/mission-control/sessions/session-2/resume",
undefined,
]);
});
it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => {
@@ -105,9 +114,12 @@ describe("BargeInInput", (): void => {
fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false });
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-3/inject", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/session-3/inject",
{
content: "first",
});
}
);
});
});

View File

@@ -39,7 +39,7 @@ export function BargeInInput({ sessionId, onSent }: BargeInInputProps): React.JS
}
const encodedSessionId = encodeURIComponent(sessionId);
const baseEndpoint = `/api/mission-control/sessions/${encodedSessionId}`;
const baseEndpoint = `/api/orchestrator/api/mission-control/sessions/${encodedSessionId}`;
let didPause = false;
let didInject = false;

View File

@@ -177,9 +177,12 @@ describe("GlobalAgentRoster", (): void => {
fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/killme123456/kill", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/killme123456/kill",
{
force: false,
});
}
);
});
});

View File

@@ -83,7 +83,7 @@ function groupByProvider(sessions: MissionControlSession[]): ProviderSessionGrou
async function fetchSessions(): Promise<MissionControlSession[]> {
const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
"/api/mission-control/sessions"
"/api/orchestrator/api/mission-control/sessions"
);
return Array.isArray(payload) ? payload : payload.sessions;
}
@@ -118,9 +118,12 @@ export function GlobalAgentRoster({
const killMutation = useMutation({
mutationFn: async (sessionId: string): Promise<string> => {
await apiPost<{ message: string }>(`/api/mission-control/sessions/${sessionId}/kill`, {
await apiPost<{ message: string }>(
`/api/orchestrator/api/mission-control/sessions/${sessionId}/kill`,
{
force: false,
});
}
);
return sessionId;
},
onSuccess: (): void => {

View File

@@ -112,14 +112,20 @@ describe("KillAllDialog", (): void => {
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-1/kill", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/internal-1/kill",
{
force: true,
});
}
);
});
expect(mockApiPost).not.toHaveBeenCalledWith("/api/mission-control/sessions/external-1/kill", {
expect(mockApiPost).not.toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/external-1/kill",
{
force: true,
});
}
);
expect(onComplete).toHaveBeenCalledTimes(1);
});
@@ -141,12 +147,18 @@ describe("KillAllDialog", (): void => {
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-2/kill", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/internal-2/kill",
{
force: true,
});
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/external-2/kill", {
}
);
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/external-2/kill",
{
force: true,
});
}
);
});
});

View File

@@ -96,9 +96,12 @@ export function KillAllDialog({ sessions, onComplete }: KillAllDialogProps): Rea
const killRequests = targetSessions.map(async (session) => {
try {
await apiPost<{ message: string }>(`/api/mission-control/sessions/${session.id}/kill`, {
await apiPost<{ message: string }>(
`/api/orchestrator/api/mission-control/sessions/${session.id}/kill`,
{
force: true,
});
}
);
return true;
} catch {
return false;

View File

@@ -89,7 +89,7 @@ describe("PanelControls", (): void => {
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/mission-control/sessions/session%20with%20space/pause",
"/api/orchestrator/api/mission-control/sessions/session%20with%20space/pause",
undefined
);
});
@@ -114,9 +114,12 @@ describe("PanelControls", (): void => {
await user.click(screen.getByRole("button", { name: "Confirm" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-4/kill", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/session-4/kill",
{
force: false,
});
}
);
});
expect(onStatusChange).toHaveBeenCalledWith("killed");
@@ -137,9 +140,12 @@ describe("PanelControls", (): void => {
await user.click(screen.getByRole("button", { name: "Confirm" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-5/kill", {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/session-5/kill",
{
force: true,
});
}
);
});
expect(onStatusChange).toHaveBeenCalledWith("killed");

View File

@@ -50,23 +50,23 @@ export function PanelControls({
switch (action) {
case "pause":
await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
);
return { nextStatus: "paused" };
case "resume":
await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
);
return { nextStatus: "active" };
case "graceful-kill":
await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
{ force: false }
);
return { nextStatus: "killed" };
case "force-kill":
await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
{ force: true }
);
return { nextStatus: "killed" };

View File

@@ -0,0 +1,28 @@
"use client";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useState, type ReactNode } from "react";
interface ReactQueryProviderProps {
children: ReactNode;
}
export function ReactQueryProvider({ children }: ReactQueryProviderProps): React.JSX.Element {
// Create a stable QueryClient per component mount (one per app session)
const [queryClient] = useState(
() =>
new QueryClient({
defaultOptions: {
queries: {
// Don't refetch on window focus in a dashboard context
refetchOnWindowFocus: false,
// Stale time of 30s — short enough for live data, avoids hammering
staleTime: 30_000,
retry: 1,
},
},
})
);
return <QueryClientProvider client={queryClient}>{children}</QueryClientProvider>;
}

View File

@@ -333,7 +333,7 @@ services:
start_period: 40s
networks:
- internal
- openbrain_brain-internal
- openbrain-brain-internal
cap_drop:
- ALL
cap_add:
@@ -406,7 +406,7 @@ services:
networks:
- internal
- traefik-public
- openbrain_brain-internal
- openbrain-brain-internal
deploy:
restart_policy:
condition: on-failure