Compare commits
1 Commits
fix/ms23-p
...
feat/ms23-
| Author | SHA1 | Date | |
|---|---|---|---|
| 52e7b0e6e7 |
@@ -12,6 +12,7 @@ import { InternalAgentProvider } from "./internal-agent.provider";
|
|||||||
|
|
||||||
type MockProvider = IAgentProvider & {
|
type MockProvider = IAgentProvider & {
|
||||||
listSessions: ReturnType<typeof vi.fn>;
|
listSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getSession: ReturnType<typeof vi.fn>;
|
||||||
};
|
};
|
||||||
|
|
||||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||||
@@ -134,4 +135,68 @@ describe("AgentProviderRegistry", () => {
|
|||||||
expect.stringContaining("Failed to list sessions for provider failing")
|
expect.stringContaining("Failed to list sessions for provider failing")
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("finds a provider for an existing session", async () => {
|
||||||
|
const targetSession: AgentSession = {
|
||||||
|
id: "session-found",
|
||||||
|
providerId: "openclaw",
|
||||||
|
providerType: "external",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T12:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
const openclawProvider = createProvider("openclaw");
|
||||||
|
openclawProvider.getSession.mockResolvedValue(targetSession);
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(openclawProvider);
|
||||||
|
|
||||||
|
const result = await registry.getProviderForSession(targetSession.id);
|
||||||
|
|
||||||
|
expect(result).toEqual({
|
||||||
|
provider: openclawProvider,
|
||||||
|
session: targetSession,
|
||||||
|
});
|
||||||
|
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||||
|
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null when no provider has the requested session", async () => {
|
||||||
|
const openclawProvider = createProvider("openclaw");
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(openclawProvider);
|
||||||
|
|
||||||
|
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("continues searching providers when getSession throws", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const failingProvider = createProvider("failing");
|
||||||
|
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
|
||||||
|
|
||||||
|
const healthySession: AgentSession = {
|
||||||
|
id: "session-healthy",
|
||||||
|
providerId: "healthy",
|
||||||
|
providerType: "external",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T12:15:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
const healthyProvider = createProvider("healthy");
|
||||||
|
healthyProvider.getSession.mockResolvedValue(healthySession);
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(failingProvider);
|
||||||
|
registry.registerProvider(healthyProvider);
|
||||||
|
|
||||||
|
const result = await registry.getProviderForSession(healthySession.id);
|
||||||
|
|
||||||
|
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Failed to get session session-healthy for provider failing")
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -26,6 +26,28 @@ export class AgentProviderRegistry implements OnModuleInit {
|
|||||||
return this.providers.get(providerId) ?? null;
|
return this.providers.get(providerId) ?? null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getProviderForSession(
|
||||||
|
sessionId: string
|
||||||
|
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
|
||||||
|
for (const provider of this.providers.values()) {
|
||||||
|
try {
|
||||||
|
const session = await provider.getSession(sessionId);
|
||||||
|
if (session !== null) {
|
||||||
|
return {
|
||||||
|
provider,
|
||||||
|
session,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
async listAllSessions(): Promise<AgentSession[]> {
|
async listAllSessions(): Promise<AgentSession[]> {
|
||||||
const providers = [...this.providers.values()];
|
const providers = [...this.providers.values()];
|
||||||
const sessionsByProvider = await Promise.all(
|
const sessionsByProvider = await Promise.all(
|
||||||
|
|||||||
@@ -0,0 +1,15 @@
|
|||||||
|
import { Type } from "class-transformer";
|
||||||
|
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
|
||||||
|
|
||||||
|
export class GetMissionControlMessagesQueryDto {
|
||||||
|
@IsOptional()
|
||||||
|
@Type(() => Number)
|
||||||
|
@IsInt()
|
||||||
|
@Min(1)
|
||||||
|
@Max(200)
|
||||||
|
limit?: number;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
before?: string;
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
import { IsBoolean, IsOptional } from "class-validator";
|
||||||
|
|
||||||
|
export class KillSessionDto {
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
force?: boolean;
|
||||||
|
}
|
||||||
@@ -0,0 +1,183 @@
|
|||||||
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Get,
|
||||||
|
Header,
|
||||||
|
HttpCode,
|
||||||
|
MessageEvent,
|
||||||
|
Param,
|
||||||
|
Post,
|
||||||
|
Query,
|
||||||
|
Request,
|
||||||
|
Sse,
|
||||||
|
UseGuards,
|
||||||
|
UsePipes,
|
||||||
|
ValidationPipe,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
|
||||||
|
import { Observable } from "rxjs";
|
||||||
|
import { AuthGuard } from "../../auth/guards/auth.guard";
|
||||||
|
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
|
||||||
|
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
|
||||||
|
import { KillSessionDto } from "./dto/kill-session.dto";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||||
|
|
||||||
|
interface MissionControlRequest {
|
||||||
|
user?: {
|
||||||
|
id?: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Controller("api/mission-control")
|
||||||
|
@UseGuards(AuthGuard)
|
||||||
|
export class MissionControlController {
|
||||||
|
constructor(private readonly missionControlService: MissionControlService) {}
|
||||||
|
|
||||||
|
@Get("sessions")
|
||||||
|
async listSessions(): Promise<{ sessions: AgentSession[] }> {
|
||||||
|
const sessions = await this.missionControlService.listSessions();
|
||||||
|
return { sessions };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId")
|
||||||
|
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
|
||||||
|
return this.missionControlService.getSession(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/messages")
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async getMessages(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Query() query: GetMissionControlMessagesQueryDto
|
||||||
|
): Promise<{ messages: AgentMessage[] }> {
|
||||||
|
const messages = await this.missionControlService.getMessages(
|
||||||
|
sessionId,
|
||||||
|
query.limit,
|
||||||
|
query.before
|
||||||
|
);
|
||||||
|
|
||||||
|
return { messages };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/inject")
|
||||||
|
@HttpCode(200)
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
injectMessage(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() dto: InjectAgentDto,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<InjectResult> {
|
||||||
|
return this.missionControlService.injectMessage(
|
||||||
|
sessionId,
|
||||||
|
dto.message,
|
||||||
|
this.resolveOperatorId(req)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/pause")
|
||||||
|
@HttpCode(200)
|
||||||
|
async pauseSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} paused` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/resume")
|
||||||
|
@HttpCode(200)
|
||||||
|
async resumeSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} resumed` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/kill")
|
||||||
|
@HttpCode(200)
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async killSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() dto: KillSessionDto,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.killSession(
|
||||||
|
sessionId,
|
||||||
|
dto.force ?? true,
|
||||||
|
this.resolveOperatorId(req)
|
||||||
|
);
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} killed` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Sse("sessions/:sessionId/stream")
|
||||||
|
@Header("Content-Type", "text/event-stream")
|
||||||
|
@Header("Cache-Control", "no-cache")
|
||||||
|
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
|
||||||
|
return new Observable<MessageEvent>((subscriber) => {
|
||||||
|
let isClosed = false;
|
||||||
|
let iterator: AsyncIterator<AgentMessage> | null = null;
|
||||||
|
|
||||||
|
void this.missionControlService
|
||||||
|
.streamMessages(sessionId)
|
||||||
|
.then(async (stream) => {
|
||||||
|
iterator = stream[Symbol.asyncIterator]();
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (isClosed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
|
||||||
|
if (next.done) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.next({
|
||||||
|
data: this.toStreamPayload(next.value),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.complete();
|
||||||
|
})
|
||||||
|
.catch((error: unknown) => {
|
||||||
|
subscriber.error(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
isClosed = true;
|
||||||
|
void iterator?.return?.();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveOperatorId(req: MissionControlRequest): string {
|
||||||
|
const operatorId = req.user?.id;
|
||||||
|
return typeof operatorId === "string" && operatorId.length > 0
|
||||||
|
? operatorId
|
||||||
|
: DEFAULT_OPERATOR_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toStreamPayload(message: AgentMessage): {
|
||||||
|
id: string;
|
||||||
|
sessionId: string;
|
||||||
|
role: string;
|
||||||
|
content: string;
|
||||||
|
timestamp: string;
|
||||||
|
metadata?: Record<string, unknown>;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
id: message.id,
|
||||||
|
sessionId: message.sessionId,
|
||||||
|
role: message.role,
|
||||||
|
content: message.content,
|
||||||
|
timestamp: message.timestamp.toISOString(),
|
||||||
|
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { AgentsModule } from "../agents/agents.module";
|
||||||
|
import { AuthModule } from "../../auth/auth.module";
|
||||||
|
import { PrismaModule } from "../../prisma/prisma.module";
|
||||||
|
import { MissionControlController } from "./mission-control.controller";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [AgentsModule, AuthModule, PrismaModule],
|
||||||
|
controllers: [MissionControlController],
|
||||||
|
providers: [MissionControlService],
|
||||||
|
})
|
||||||
|
export class MissionControlModule {}
|
||||||
@@ -0,0 +1,213 @@
|
|||||||
|
import { NotFoundException } from "@nestjs/common";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||||
|
import type { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
type MockProvider = IAgentProvider & {
|
||||||
|
listSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getSession: ReturnType<typeof vi.fn>;
|
||||||
|
getMessages: ReturnType<typeof vi.fn>;
|
||||||
|
injectMessage: ReturnType<typeof vi.fn>;
|
||||||
|
pauseSession: ReturnType<typeof vi.fn>;
|
||||||
|
resumeSession: ReturnType<typeof vi.fn>;
|
||||||
|
killSession: ReturnType<typeof vi.fn>;
|
||||||
|
streamMessages: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
const createProvider = (providerId = "internal"): MockProvider => ({
|
||||||
|
providerId,
|
||||||
|
providerType: providerId,
|
||||||
|
displayName: providerId,
|
||||||
|
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
|
||||||
|
getSession: vi.fn().mockResolvedValue(null),
|
||||||
|
getMessages: vi.fn().mockResolvedValue([]),
|
||||||
|
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||||
|
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
killSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(true),
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("MissionControlService", () => {
|
||||||
|
let service: MissionControlService;
|
||||||
|
let registry: {
|
||||||
|
listAllSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
let prisma: {
|
||||||
|
operatorAuditLog: {
|
||||||
|
create: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
const session: AgentSession = {
|
||||||
|
id: "session-1",
|
||||||
|
providerId: "internal",
|
||||||
|
providerType: "internal",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T14:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
registry = {
|
||||||
|
listAllSessions: vi.fn().mockResolvedValue([session]),
|
||||||
|
getProviderForSession: vi.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
|
|
||||||
|
prisma = {
|
||||||
|
operatorAuditLog: {
|
||||||
|
create: vi.fn().mockResolvedValue(undefined),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
service = new MissionControlService(
|
||||||
|
registry as unknown as AgentProviderRegistry,
|
||||||
|
prisma as unknown as PrismaService
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("lists sessions from the registry", async () => {
|
||||||
|
await expect(service.listSessions()).resolves.toEqual([session]);
|
||||||
|
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns a session when it is found", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.getSession(session.id)).resolves.toEqual(session);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when session lookup fails", async () => {
|
||||||
|
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("gets messages from the resolved provider", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
const messages: AgentMessage[] = [
|
||||||
|
{
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: session.id,
|
||||||
|
role: "assistant",
|
||||||
|
content: "hello",
|
||||||
|
timestamp: new Date("2026-03-07T14:01:00.000Z"),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
provider.getMessages.mockResolvedValue(messages);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
|
||||||
|
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("injects a message and writes an audit log", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
|
||||||
|
provider.injectMessage.mockResolvedValue(injectResult);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
|
||||||
|
injectResult
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "operator-1",
|
||||||
|
provider: "internal",
|
||||||
|
action: "inject",
|
||||||
|
content: "ship it",
|
||||||
|
metadata: {
|
||||||
|
payload: { message: "ship it" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("pauses and resumes using default operator id", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await service.pauseSession(session.id);
|
||||||
|
await service.resumeSession(session.id);
|
||||||
|
|
||||||
|
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
|
||||||
|
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "mission-control",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "pause",
|
||||||
|
metadata: {
|
||||||
|
payload: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "mission-control",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "resume",
|
||||||
|
metadata: {
|
||||||
|
payload: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("kills with provided force value and writes audit log", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await service.killSession(session.id, false, "operator-2");
|
||||||
|
|
||||||
|
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "operator-2",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "kill",
|
||||||
|
metadata: {
|
||||||
|
payload: { force: false },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves provider message stream", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
|
||||||
|
yield {
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: session.id,
|
||||||
|
role: "assistant",
|
||||||
|
content: "stream",
|
||||||
|
timestamp: new Date("2026-03-07T14:03:00.000Z"),
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
provider.streamMessages.mockReturnValue(messageStream);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
|
||||||
|
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not write audit log when session cannot be resolved", async () => {
|
||||||
|
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,139 @@
|
|||||||
|
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||||
|
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||||
|
import type { Prisma } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
|
||||||
|
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
|
||||||
|
|
||||||
|
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class MissionControlService {
|
||||||
|
constructor(
|
||||||
|
private readonly registry: AgentProviderRegistry,
|
||||||
|
private readonly prisma: PrismaService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
listSessions(): Promise<AgentSession[]> {
|
||||||
|
return this.registry.listAllSessions();
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSession(sessionId: string): Promise<AgentSession> {
|
||||||
|
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||||
|
if (!resolved) {
|
||||||
|
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resolved.session;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
return provider.getMessages(sessionId, limit, before);
|
||||||
|
}
|
||||||
|
|
||||||
|
async injectMessage(
|
||||||
|
sessionId: string,
|
||||||
|
message: string,
|
||||||
|
operatorId = DEFAULT_OPERATOR_ID
|
||||||
|
): Promise<InjectResult> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
const result = await provider.injectMessage(sessionId, message);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "inject",
|
||||||
|
content: message,
|
||||||
|
payload: { message },
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.pauseSession(sessionId);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "pause",
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.resumeSession(sessionId);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "resume",
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async killSession(
|
||||||
|
sessionId: string,
|
||||||
|
force = true,
|
||||||
|
operatorId = DEFAULT_OPERATOR_ID
|
||||||
|
): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.killSession(sessionId, force);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "kill",
|
||||||
|
payload: { force },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
return provider.streamMessages(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getProviderForSessionOrThrow(
|
||||||
|
sessionId: string
|
||||||
|
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
|
||||||
|
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||||
|
|
||||||
|
if (!resolved) {
|
||||||
|
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resolved;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||||
|
return value as Prisma.InputJsonValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async writeOperatorAuditLog(params: {
|
||||||
|
sessionId: string;
|
||||||
|
providerId: string;
|
||||||
|
operatorId: string;
|
||||||
|
action: MissionControlAction;
|
||||||
|
content?: string;
|
||||||
|
payload: Record<string, unknown>;
|
||||||
|
}): Promise<void> {
|
||||||
|
await this.prisma.operatorAuditLog.create({
|
||||||
|
data: {
|
||||||
|
sessionId: params.sessionId,
|
||||||
|
userId: params.operatorId,
|
||||||
|
provider: params.providerId,
|
||||||
|
action: params.action,
|
||||||
|
...(params.content !== undefined ? { content: params.content } : {}),
|
||||||
|
metadata: this.toJsonValue({ payload: params.payload }),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
|
|||||||
import { ThrottlerModule } from "@nestjs/throttler";
|
import { ThrottlerModule } from "@nestjs/throttler";
|
||||||
import { HealthModule } from "./api/health/health.module";
|
import { HealthModule } from "./api/health/health.module";
|
||||||
import { AgentsModule } from "./api/agents/agents.module";
|
import { AgentsModule } from "./api/agents/agents.module";
|
||||||
|
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
||||||
import { QueueApiModule } from "./api/queue/queue-api.module";
|
import { QueueApiModule } from "./api/queue/queue-api.module";
|
||||||
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
||||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||||
@@ -53,6 +54,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
|||||||
HealthModule,
|
HealthModule,
|
||||||
AgentsModule,
|
AgentsModule,
|
||||||
AgentProvidersModule,
|
AgentProvidersModule,
|
||||||
|
MissionControlModule,
|
||||||
QueueApiModule,
|
QueueApiModule,
|
||||||
CoordinatorModule,
|
CoordinatorModule,
|
||||||
BudgetModule,
|
BudgetModule,
|
||||||
|
|||||||
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
import { AuthGuard } from "./guards/auth.guard";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
providers: [OrchestratorApiKeyGuard, AuthGuard],
|
||||||
|
exports: [AuthGuard],
|
||||||
|
})
|
||||||
|
export class AuthModule {}
|
||||||
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AuthGuard implements CanActivate {
|
||||||
|
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
|
||||||
|
|
||||||
|
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
|
||||||
|
return this.apiKeyGuard.canActivate(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user