Compare commits
7 Commits
feat/ms23-
...
feat/ms23-
| Author | SHA1 | Date | |
|---|---|---|---|
| 487aac6903 | |||
| 544e828e58 | |||
| 9489bc63f8 | |||
| ad644799aa | |||
| 81bf349270 | |||
| bcada71e88 | |||
| 9cc82e7fcf |
@@ -0,0 +1,54 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Delete,
|
||||
Get,
|
||||
Param,
|
||||
Patch,
|
||||
Post,
|
||||
UseGuards,
|
||||
UsePipes,
|
||||
ValidationPipe,
|
||||
} from "@nestjs/common";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||
|
||||
@Controller("agent-providers")
|
||||
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
|
||||
export class AgentProvidersController {
|
||||
constructor(private readonly agentProvidersService: AgentProvidersService) {}
|
||||
|
||||
@Get()
|
||||
async list(): Promise<AgentProviderConfig[]> {
|
||||
return this.agentProvidersService.list();
|
||||
}
|
||||
|
||||
@Get(":id")
|
||||
async getById(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.getById(id);
|
||||
}
|
||||
|
||||
@Post()
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async create(@Body() dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.create(dto);
|
||||
}
|
||||
|
||||
@Patch(":id")
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async update(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: UpdateAgentProviderDto
|
||||
): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.update(id, dto);
|
||||
}
|
||||
|
||||
@Delete(":id")
|
||||
async delete(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.delete(id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
import { AgentProvidersController } from "./agent-providers.controller";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
|
||||
@Module({
|
||||
imports: [PrismaModule],
|
||||
controllers: [AgentProvidersController],
|
||||
providers: [OrchestratorApiKeyGuard, AgentProvidersService],
|
||||
})
|
||||
export class AgentProvidersModule {}
|
||||
@@ -0,0 +1,211 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
|
||||
describe("AgentProvidersService", () => {
|
||||
let service: AgentProvidersService;
|
||||
let prisma: {
|
||||
agentProviderConfig: {
|
||||
findMany: ReturnType<typeof vi.fn>;
|
||||
findUnique: ReturnType<typeof vi.fn>;
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
update: ReturnType<typeof vi.fn>;
|
||||
delete: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = {
|
||||
agentProviderConfig: {
|
||||
findMany: vi.fn(),
|
||||
findUnique: vi.fn(),
|
||||
create: vi.fn(),
|
||||
update: vi.fn(),
|
||||
delete: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
service = new AgentProvidersService(prisma as unknown as PrismaService);
|
||||
});
|
||||
|
||||
it("lists all provider configs", async () => {
|
||||
const expected = [
|
||||
{
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
},
|
||||
];
|
||||
prisma.agentProviderConfig.findMany.mockResolvedValue(expected);
|
||||
|
||||
const result = await service.list();
|
||||
|
||||
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
|
||||
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||
});
|
||||
expect(result).toEqual(expected);
|
||||
});
|
||||
|
||||
it("returns a single provider config", async () => {
|
||||
const expected = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: { apiKeyRef: "vault:openai" },
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(expected);
|
||||
|
||||
const result = await service.getById("cfg-1");
|
||||
|
||||
expect(prisma.agentProviderConfig.findUnique).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
});
|
||||
expect(result).toEqual(expected);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when provider config is missing", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.getById("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||
});
|
||||
|
||||
it("creates a provider config with default credentials", async () => {
|
||||
const created = {
|
||||
id: "cfg-created",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.create.mockResolvedValue(created);
|
||||
|
||||
const result = await service.create({
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
});
|
||||
|
||||
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(created);
|
||||
});
|
||||
|
||||
it("updates a provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
});
|
||||
|
||||
const updated = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Secondary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.update.mockResolvedValue(updated);
|
||||
|
||||
const result = await service.update("cfg-1", {
|
||||
name: "Secondary",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
});
|
||||
|
||||
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
data: {
|
||||
name: "Secondary",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(updated);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when updating a missing provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.update("missing", { name: "Updated" })).rejects.toBeInstanceOf(
|
||||
NotFoundException
|
||||
);
|
||||
expect(prisma.agentProviderConfig.update).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("deletes a provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
});
|
||||
|
||||
const deleted = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.delete.mockResolvedValue(deleted);
|
||||
|
||||
const result = await service.delete("cfg-1");
|
||||
|
||||
expect(prisma.agentProviderConfig.delete).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
});
|
||||
expect(result).toEqual(deleted);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when deleting a missing provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.delete("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||
expect(prisma.agentProviderConfig.delete).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,71 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import type { AgentProviderConfig, Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||
|
||||
@Injectable()
|
||||
export class AgentProvidersService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async list(): Promise<AgentProviderConfig[]> {
|
||||
return this.prisma.agentProviderConfig.findMany({
|
||||
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||
});
|
||||
}
|
||||
|
||||
async getById(id: string): Promise<AgentProviderConfig> {
|
||||
const providerConfig = await this.prisma.agentProviderConfig.findUnique({
|
||||
where: { id },
|
||||
});
|
||||
|
||||
if (!providerConfig) {
|
||||
throw new NotFoundException(`Agent provider config with id ${id} not found`);
|
||||
}
|
||||
|
||||
return providerConfig;
|
||||
}
|
||||
|
||||
async create(dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
return this.prisma.agentProviderConfig.create({
|
||||
data: {
|
||||
workspaceId: dto.workspaceId,
|
||||
name: dto.name,
|
||||
provider: dto.provider,
|
||||
gatewayUrl: dto.gatewayUrl,
|
||||
credentials: this.toJsonValue(dto.credentials ?? {}),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async update(id: string, dto: UpdateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
await this.getById(id);
|
||||
|
||||
const data: Prisma.AgentProviderConfigUpdateInput = {
|
||||
...(dto.workspaceId !== undefined ? { workspaceId: dto.workspaceId } : {}),
|
||||
...(dto.name !== undefined ? { name: dto.name } : {}),
|
||||
...(dto.provider !== undefined ? { provider: dto.provider } : {}),
|
||||
...(dto.gatewayUrl !== undefined ? { gatewayUrl: dto.gatewayUrl } : {}),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
...(dto.credentials !== undefined ? { credentials: this.toJsonValue(dto.credentials) } : {}),
|
||||
};
|
||||
|
||||
return this.prisma.agentProviderConfig.update({
|
||||
where: { id },
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
async delete(id: string): Promise<AgentProviderConfig> {
|
||||
await this.getById(id);
|
||||
|
||||
return this.prisma.agentProviderConfig.delete({
|
||||
where: { id },
|
||||
});
|
||||
}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||
|
||||
export class CreateAgentProviderDto {
|
||||
@IsUUID()
|
||||
workspaceId!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
name!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
provider!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
gatewayUrl!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsObject()
|
||||
credentials?: Record<string, unknown>;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
isActive?: boolean;
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||
|
||||
export class UpdateAgentProviderDto {
|
||||
@IsOptional()
|
||||
@IsUUID()
|
||||
workspaceId?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
name?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
provider?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
gatewayUrl?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsObject()
|
||||
credentials?: Record<string, unknown>;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
isActive?: boolean;
|
||||
}
|
||||
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
@@ -0,0 +1,202 @@
|
||||
import { Logger } from "@nestjs/common";
|
||||
import type {
|
||||
AgentMessage,
|
||||
AgentSession,
|
||||
AgentSessionList,
|
||||
IAgentProvider,
|
||||
InjectResult,
|
||||
} from "@mosaic/shared";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
|
||||
type MockProvider = IAgentProvider & {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||
return;
|
||||
};
|
||||
|
||||
const createProvider = (providerId: string, sessions: AgentSession[] = []): MockProvider => {
|
||||
return {
|
||||
providerId,
|
||||
providerType: providerId,
|
||||
displayName: providerId,
|
||||
listSessions: vi.fn().mockResolvedValue({
|
||||
sessions,
|
||||
total: sessions.length,
|
||||
} as AgentSessionList),
|
||||
getSession: vi.fn().mockResolvedValue(null),
|
||||
getMessages: vi.fn().mockResolvedValue([]),
|
||||
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||
killSession: vi.fn().mockResolvedValue(undefined),
|
||||
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||
isAvailable: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
};
|
||||
|
||||
describe("AgentProviderRegistry", () => {
|
||||
let registry: AgentProviderRegistry;
|
||||
let internalProvider: MockProvider;
|
||||
|
||||
beforeEach(() => {
|
||||
internalProvider = createProvider("internal");
|
||||
registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("registers InternalAgentProvider on module init", () => {
|
||||
registry.onModuleInit();
|
||||
|
||||
expect(registry.getProvider("internal")).toBe(internalProvider);
|
||||
});
|
||||
|
||||
it("registers providers and returns null for unknown provider ids", () => {
|
||||
const externalProvider = createProvider("openclaw");
|
||||
|
||||
registry.registerProvider(externalProvider);
|
||||
|
||||
expect(registry.getProvider("openclaw")).toBe(externalProvider);
|
||||
expect(registry.getProvider("missing")).toBeNull();
|
||||
});
|
||||
|
||||
it("aggregates and sorts sessions from all providers", async () => {
|
||||
const internalSessions: AgentSession[] = [
|
||||
{
|
||||
id: "session-older",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T10:10:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
const externalSessions: AgentSession[] = [
|
||||
{
|
||||
id: "session-newer",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "paused",
|
||||
createdAt: new Date("2026-03-07T09:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T10:20:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
internalProvider.listSessions.mockResolvedValue({
|
||||
sessions: internalSessions,
|
||||
total: internalSessions.length,
|
||||
} as AgentSessionList);
|
||||
|
||||
const externalProvider = createProvider("openclaw", externalSessions);
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(externalProvider);
|
||||
|
||||
const result = await registry.listAllSessions();
|
||||
|
||||
expect(result.map((session) => session.id)).toEqual(["session-newer", "session-older"]);
|
||||
expect(internalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||
expect(externalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("skips provider failures and logs warning", async () => {
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
|
||||
const healthyProvider = createProvider("healthy", [
|
||||
{
|
||||
id: "session-1",
|
||||
providerId: "healthy",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
const failingProvider = createProvider("failing");
|
||||
failingProvider.listSessions.mockRejectedValue(new Error("provider offline"));
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(healthyProvider);
|
||||
registry.registerProvider(failingProvider);
|
||||
|
||||
const result = await registry.listAllSessions();
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.id).toBe("session-1");
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Failed to list sessions for provider failing")
|
||||
);
|
||||
});
|
||||
|
||||
it("finds a provider for an existing session", async () => {
|
||||
const targetSession: AgentSession = {
|
||||
id: "session-found",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T12:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
|
||||
};
|
||||
|
||||
const openclawProvider = createProvider("openclaw");
|
||||
openclawProvider.getSession.mockResolvedValue(targetSession);
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(openclawProvider);
|
||||
|
||||
const result = await registry.getProviderForSession(targetSession.id);
|
||||
|
||||
expect(result).toEqual({
|
||||
provider: openclawProvider,
|
||||
session: targetSession,
|
||||
});
|
||||
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||
});
|
||||
|
||||
it("returns null when no provider has the requested session", async () => {
|
||||
const openclawProvider = createProvider("openclaw");
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(openclawProvider);
|
||||
|
||||
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it("continues searching providers when getSession throws", async () => {
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
const failingProvider = createProvider("failing");
|
||||
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
|
||||
|
||||
const healthySession: AgentSession = {
|
||||
id: "session-healthy",
|
||||
providerId: "healthy",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T12:15:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
|
||||
};
|
||||
|
||||
const healthyProvider = createProvider("healthy");
|
||||
healthyProvider.getSession.mockResolvedValue(healthySession);
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(failingProvider);
|
||||
registry.registerProvider(healthyProvider);
|
||||
|
||||
const result = await registry.getProviderForSession(healthySession.id);
|
||||
|
||||
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Failed to get session session-healthy for provider failing")
|
||||
);
|
||||
});
|
||||
});
|
||||
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
|
||||
import type { AgentSession, IAgentProvider } from "@mosaic/shared";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
|
||||
@Injectable()
|
||||
export class AgentProviderRegistry implements OnModuleInit {
|
||||
private readonly logger = new Logger(AgentProviderRegistry.name);
|
||||
private readonly providers = new Map<string, IAgentProvider>();
|
||||
|
||||
constructor(private readonly internalProvider: InternalAgentProvider) {}
|
||||
|
||||
onModuleInit(): void {
|
||||
this.registerProvider(this.internalProvider);
|
||||
}
|
||||
|
||||
registerProvider(provider: IAgentProvider): void {
|
||||
const existingProvider = this.providers.get(provider.providerId);
|
||||
if (existingProvider !== undefined) {
|
||||
this.logger.warn(`Replacing existing provider registration for ${provider.providerId}`);
|
||||
}
|
||||
|
||||
this.providers.set(provider.providerId, provider);
|
||||
}
|
||||
|
||||
getProvider(providerId: string): IAgentProvider | null {
|
||||
return this.providers.get(providerId) ?? null;
|
||||
}
|
||||
|
||||
async getProviderForSession(
|
||||
sessionId: string
|
||||
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
|
||||
for (const provider of this.providers.values()) {
|
||||
try {
|
||||
const session = await provider.getSession(sessionId);
|
||||
if (session !== null) {
|
||||
return {
|
||||
provider,
|
||||
session,
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async listAllSessions(): Promise<AgentSession[]> {
|
||||
const providers = [...this.providers.values()];
|
||||
const sessionsByProvider = await Promise.all(
|
||||
providers.map(async (provider) => {
|
||||
try {
|
||||
const { sessions } = await provider.listSessions();
|
||||
return sessions;
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to list sessions for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||
);
|
||||
return [];
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
return sessionsByProvider
|
||||
.flat()
|
||||
.sort((left, right) => right.updatedAt.getTime() - left.updatedAt.getTime());
|
||||
}
|
||||
|
||||
private toErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
|
||||
return String(error);
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import { AgentMessagesService } from "./agent-messages.service";
|
||||
import { AgentControlService } from "./agent-control.service";
|
||||
import { AgentTreeService } from "./agent-tree.service";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||
|
||||
@Module({
|
||||
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
|
||||
@@ -22,7 +23,8 @@ import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
AgentControlService,
|
||||
AgentTreeService,
|
||||
InternalAgentProvider,
|
||||
AgentProviderRegistry,
|
||||
],
|
||||
exports: [InternalAgentProvider],
|
||||
exports: [InternalAgentProvider, AgentProviderRegistry],
|
||||
})
|
||||
export class AgentsModule {}
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
import { Type } from "class-transformer";
|
||||
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
|
||||
|
||||
export class GetMissionControlMessagesQueryDto {
|
||||
@IsOptional()
|
||||
@Type(() => Number)
|
||||
@IsInt()
|
||||
@Min(1)
|
||||
@Max(200)
|
||||
limit?: number;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
before?: string;
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
import { IsBoolean, IsOptional } from "class-validator";
|
||||
|
||||
export class KillSessionDto {
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
force?: boolean;
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AgentSession } from "@mosaic/shared";
|
||||
import type { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
import { MissionControlController } from "./mission-control.controller";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
describe("MissionControlController", () => {
|
||||
let controller: MissionControlController;
|
||||
let registry: {
|
||||
listAllSessions: ReturnType<typeof vi.fn>;
|
||||
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
registry = {
|
||||
listAllSessions: vi.fn(),
|
||||
getProviderForSession: vi.fn(),
|
||||
};
|
||||
|
||||
const prisma = {
|
||||
operatorAuditLog: {
|
||||
create: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
|
||||
const service = new MissionControlService(
|
||||
registry as unknown as AgentProviderRegistry,
|
||||
prisma as unknown as PrismaService
|
||||
);
|
||||
|
||||
controller = new MissionControlController(service);
|
||||
});
|
||||
|
||||
it("Phase 1 gate: unified sessions endpoint returns internal provider sessions", async () => {
|
||||
const internalSession: AgentSession = {
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T20:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T20:01:00.000Z"),
|
||||
};
|
||||
|
||||
const externalSession: AgentSession = {
|
||||
id: "session-openclaw-1",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T20:02:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T20:03:00.000Z"),
|
||||
};
|
||||
|
||||
registry.listAllSessions.mockResolvedValue([internalSession, externalSession]);
|
||||
|
||||
const response = await controller.listSessions();
|
||||
|
||||
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||
expect(response.sessions).toEqual([internalSession, externalSession]);
|
||||
expect(response.sessions).toContainEqual(
|
||||
expect.objectContaining({
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,183 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Get,
|
||||
Header,
|
||||
HttpCode,
|
||||
MessageEvent,
|
||||
Param,
|
||||
Post,
|
||||
Query,
|
||||
Request,
|
||||
Sse,
|
||||
UseGuards,
|
||||
UsePipes,
|
||||
ValidationPipe,
|
||||
} from "@nestjs/common";
|
||||
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
|
||||
import { Observable } from "rxjs";
|
||||
import { AuthGuard } from "../../auth/guards/auth.guard";
|
||||
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
|
||||
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
|
||||
import { KillSessionDto } from "./dto/kill-session.dto";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||
|
||||
interface MissionControlRequest {
|
||||
user?: {
|
||||
id?: string;
|
||||
};
|
||||
}
|
||||
|
||||
@Controller("api/mission-control")
|
||||
@UseGuards(AuthGuard)
|
||||
export class MissionControlController {
|
||||
constructor(private readonly missionControlService: MissionControlService) {}
|
||||
|
||||
@Get("sessions")
|
||||
async listSessions(): Promise<{ sessions: AgentSession[] }> {
|
||||
const sessions = await this.missionControlService.listSessions();
|
||||
return { sessions };
|
||||
}
|
||||
|
||||
@Get("sessions/:sessionId")
|
||||
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
|
||||
return this.missionControlService.getSession(sessionId);
|
||||
}
|
||||
|
||||
@Get("sessions/:sessionId/messages")
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async getMessages(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Query() query: GetMissionControlMessagesQueryDto
|
||||
): Promise<{ messages: AgentMessage[] }> {
|
||||
const messages = await this.missionControlService.getMessages(
|
||||
sessionId,
|
||||
query.limit,
|
||||
query.before
|
||||
);
|
||||
|
||||
return { messages };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/inject")
|
||||
@HttpCode(200)
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
injectMessage(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Body() dto: InjectAgentDto,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<InjectResult> {
|
||||
return this.missionControlService.injectMessage(
|
||||
sessionId,
|
||||
dto.message,
|
||||
this.resolveOperatorId(req)
|
||||
);
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/pause")
|
||||
@HttpCode(200)
|
||||
async pauseSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
|
||||
|
||||
return { message: `Session ${sessionId} paused` };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/resume")
|
||||
@HttpCode(200)
|
||||
async resumeSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
|
||||
|
||||
return { message: `Session ${sessionId} resumed` };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/kill")
|
||||
@HttpCode(200)
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async killSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Body() dto: KillSessionDto,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.killSession(
|
||||
sessionId,
|
||||
dto.force ?? true,
|
||||
this.resolveOperatorId(req)
|
||||
);
|
||||
|
||||
return { message: `Session ${sessionId} killed` };
|
||||
}
|
||||
|
||||
@Sse("sessions/:sessionId/stream")
|
||||
@Header("Content-Type", "text/event-stream")
|
||||
@Header("Cache-Control", "no-cache")
|
||||
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
|
||||
return new Observable<MessageEvent>((subscriber) => {
|
||||
let isClosed = false;
|
||||
let iterator: AsyncIterator<AgentMessage> | null = null;
|
||||
|
||||
void this.missionControlService
|
||||
.streamMessages(sessionId)
|
||||
.then(async (stream) => {
|
||||
iterator = stream[Symbol.asyncIterator]();
|
||||
|
||||
for (;;) {
|
||||
if (isClosed) {
|
||||
break;
|
||||
}
|
||||
|
||||
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
|
||||
if (next.done) {
|
||||
break;
|
||||
}
|
||||
|
||||
subscriber.next({
|
||||
data: this.toStreamPayload(next.value),
|
||||
});
|
||||
}
|
||||
|
||||
subscriber.complete();
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
subscriber.error(error);
|
||||
});
|
||||
|
||||
return () => {
|
||||
isClosed = true;
|
||||
void iterator?.return?.();
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private resolveOperatorId(req: MissionControlRequest): string {
|
||||
const operatorId = req.user?.id;
|
||||
return typeof operatorId === "string" && operatorId.length > 0
|
||||
? operatorId
|
||||
: DEFAULT_OPERATOR_ID;
|
||||
}
|
||||
|
||||
private toStreamPayload(message: AgentMessage): {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
role: string;
|
||||
content: string;
|
||||
timestamp: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
} {
|
||||
return {
|
||||
id: message.id,
|
||||
sessionId: message.sessionId,
|
||||
role: message.role,
|
||||
content: message.content,
|
||||
timestamp: message.timestamp.toISOString(),
|
||||
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { AgentsModule } from "../agents/agents.module";
|
||||
import { AuthModule } from "../../auth/auth.module";
|
||||
import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { MissionControlController } from "./mission-control.controller";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
@Module({
|
||||
imports: [AgentsModule, AuthModule, PrismaModule],
|
||||
controllers: [MissionControlController],
|
||||
providers: [MissionControlService],
|
||||
})
|
||||
export class MissionControlModule {}
|
||||
@@ -0,0 +1,213 @@
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||
import type { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
type MockProvider = IAgentProvider & {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
getMessages: ReturnType<typeof vi.fn>;
|
||||
injectMessage: ReturnType<typeof vi.fn>;
|
||||
pauseSession: ReturnType<typeof vi.fn>;
|
||||
resumeSession: ReturnType<typeof vi.fn>;
|
||||
killSession: ReturnType<typeof vi.fn>;
|
||||
streamMessages: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||
return;
|
||||
};
|
||||
|
||||
const createProvider = (providerId = "internal"): MockProvider => ({
|
||||
providerId,
|
||||
providerType: providerId,
|
||||
displayName: providerId,
|
||||
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
|
||||
getSession: vi.fn().mockResolvedValue(null),
|
||||
getMessages: vi.fn().mockResolvedValue([]),
|
||||
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||
killSession: vi.fn().mockResolvedValue(undefined),
|
||||
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||
isAvailable: vi.fn().mockResolvedValue(true),
|
||||
});
|
||||
|
||||
describe("MissionControlService", () => {
|
||||
let service: MissionControlService;
|
||||
let registry: {
|
||||
listAllSessions: ReturnType<typeof vi.fn>;
|
||||
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let prisma: {
|
||||
operatorAuditLog: {
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
const session: AgentSession = {
|
||||
id: "session-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T14:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
registry = {
|
||||
listAllSessions: vi.fn().mockResolvedValue([session]),
|
||||
getProviderForSession: vi.fn().mockResolvedValue(null),
|
||||
};
|
||||
|
||||
prisma = {
|
||||
operatorAuditLog: {
|
||||
create: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
|
||||
service = new MissionControlService(
|
||||
registry as unknown as AgentProviderRegistry,
|
||||
prisma as unknown as PrismaService
|
||||
);
|
||||
});
|
||||
|
||||
it("lists sessions from the registry", async () => {
|
||||
await expect(service.listSessions()).resolves.toEqual([session]);
|
||||
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("returns a session when it is found", async () => {
|
||||
const provider = createProvider("internal");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.getSession(session.id)).resolves.toEqual(session);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when session lookup fails", async () => {
|
||||
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||
});
|
||||
|
||||
it("gets messages from the resolved provider", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
const messages: AgentMessage[] = [
|
||||
{
|
||||
id: "message-1",
|
||||
sessionId: session.id,
|
||||
role: "assistant",
|
||||
content: "hello",
|
||||
timestamp: new Date("2026-03-07T14:01:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
provider.getMessages.mockResolvedValue(messages);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
|
||||
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
|
||||
});
|
||||
|
||||
it("injects a message and writes an audit log", async () => {
|
||||
const provider = createProvider("internal");
|
||||
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
|
||||
provider.injectMessage.mockResolvedValue(injectResult);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
|
||||
injectResult
|
||||
);
|
||||
|
||||
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "operator-1",
|
||||
provider: "internal",
|
||||
action: "inject",
|
||||
content: "ship it",
|
||||
metadata: {
|
||||
payload: { message: "ship it" },
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("pauses and resumes using default operator id", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await service.pauseSession(session.id);
|
||||
await service.resumeSession(session.id);
|
||||
|
||||
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
|
||||
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "mission-control",
|
||||
provider: "openclaw",
|
||||
action: "pause",
|
||||
metadata: {
|
||||
payload: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "mission-control",
|
||||
provider: "openclaw",
|
||||
action: "resume",
|
||||
metadata: {
|
||||
payload: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("kills with provided force value and writes audit log", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await service.killSession(session.id, false, "operator-2");
|
||||
|
||||
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "operator-2",
|
||||
provider: "openclaw",
|
||||
action: "kill",
|
||||
metadata: {
|
||||
payload: { force: false },
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves provider message stream", async () => {
|
||||
const provider = createProvider("internal");
|
||||
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
|
||||
yield {
|
||||
id: "message-1",
|
||||
sessionId: session.id,
|
||||
role: "assistant",
|
||||
content: "stream",
|
||||
timestamp: new Date("2026-03-07T14:03:00.000Z"),
|
||||
};
|
||||
})();
|
||||
|
||||
provider.streamMessages.mockReturnValue(messageStream);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
|
||||
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
|
||||
});
|
||||
|
||||
it("does not write audit log when session cannot be resolved", async () => {
|
||||
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,139 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||
import type { Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
|
||||
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
|
||||
|
||||
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||
|
||||
@Injectable()
|
||||
export class MissionControlService {
|
||||
constructor(
|
||||
private readonly registry: AgentProviderRegistry,
|
||||
private readonly prisma: PrismaService
|
||||
) {}
|
||||
|
||||
listSessions(): Promise<AgentSession[]> {
|
||||
return this.registry.listAllSessions();
|
||||
}
|
||||
|
||||
async getSession(sessionId: string): Promise<AgentSession> {
|
||||
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||
if (!resolved) {
|
||||
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||
}
|
||||
|
||||
return resolved.session;
|
||||
}
|
||||
|
||||
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
return provider.getMessages(sessionId, limit, before);
|
||||
}
|
||||
|
||||
async injectMessage(
|
||||
sessionId: string,
|
||||
message: string,
|
||||
operatorId = DEFAULT_OPERATOR_ID
|
||||
): Promise<InjectResult> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
const result = await provider.injectMessage(sessionId, message);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "inject",
|
||||
content: message,
|
||||
payload: { message },
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.pauseSession(sessionId);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "pause",
|
||||
payload: {},
|
||||
});
|
||||
}
|
||||
|
||||
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.resumeSession(sessionId);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "resume",
|
||||
payload: {},
|
||||
});
|
||||
}
|
||||
|
||||
async killSession(
|
||||
sessionId: string,
|
||||
force = true,
|
||||
operatorId = DEFAULT_OPERATOR_ID
|
||||
): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.killSession(sessionId, force);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "kill",
|
||||
payload: { force },
|
||||
});
|
||||
}
|
||||
|
||||
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
return provider.streamMessages(sessionId);
|
||||
}
|
||||
|
||||
private async getProviderForSessionOrThrow(
|
||||
sessionId: string
|
||||
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
|
||||
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||
|
||||
if (!resolved) {
|
||||
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||
}
|
||||
|
||||
return resolved;
|
||||
}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
}
|
||||
|
||||
private async writeOperatorAuditLog(params: {
|
||||
sessionId: string;
|
||||
providerId: string;
|
||||
operatorId: string;
|
||||
action: MissionControlAction;
|
||||
content?: string;
|
||||
payload: Record<string, unknown>;
|
||||
}): Promise<void> {
|
||||
await this.prisma.operatorAuditLog.create({
|
||||
data: {
|
||||
sessionId: params.sessionId,
|
||||
userId: params.operatorId,
|
||||
provider: params.providerId,
|
||||
action: params.action,
|
||||
...(params.content !== undefined ? { content: params.content } : {}),
|
||||
metadata: this.toJsonValue({ payload: params.payload }),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,9 @@ import { BullModule } from "@nestjs/bullmq";
|
||||
import { ThrottlerModule } from "@nestjs/throttler";
|
||||
import { HealthModule } from "./api/health/health.module";
|
||||
import { AgentsModule } from "./api/agents/agents.module";
|
||||
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
||||
import { QueueApiModule } from "./api/queue/queue-api.module";
|
||||
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||
import { BudgetModule } from "./budget/budget.module";
|
||||
import { CIModule } from "./ci";
|
||||
@@ -51,6 +53,8 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
||||
]),
|
||||
HealthModule,
|
||||
AgentsModule,
|
||||
AgentProvidersModule,
|
||||
MissionControlModule,
|
||||
QueueApiModule,
|
||||
CoordinatorModule,
|
||||
BudgetModule,
|
||||
|
||||
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import { AuthGuard } from "./guards/auth.guard";
|
||||
|
||||
@Module({
|
||||
providers: [OrchestratorApiKeyGuard, AuthGuard],
|
||||
exports: [AuthGuard],
|
||||
})
|
||||
export class AuthModule {}
|
||||
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
|
||||
@Injectable()
|
||||
export class AuthGuard implements CanActivate {
|
||||
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
|
||||
|
||||
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
|
||||
return this.apiKeyGuard.canActivate(context);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
import { MissionControlLayout } from "@/components/mission-control/MissionControlLayout";
|
||||
|
||||
export default function MissionControlPage(): React.JSX.Element {
|
||||
return <MissionControlLayout />;
|
||||
}
|
||||
@@ -156,6 +156,26 @@ function IconTerminal(): React.JSX.Element {
|
||||
);
|
||||
}
|
||||
|
||||
function IconMissionControl(): React.JSX.Element {
|
||||
return (
|
||||
<svg
|
||||
width="16"
|
||||
height="16"
|
||||
viewBox="0 0 16 16"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
strokeWidth="1.5"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<circle cx="8" cy="8" r="1.5" />
|
||||
<path d="M11 5a4.25 4.25 0 0 1 0 6" />
|
||||
<path d="M5 5a4.25 4.25 0 0 0 0 6" />
|
||||
<path d="M13.5 2.5a7.75 7.75 0 0 1 0 11" />
|
||||
<path d="M2.5 2.5a7.75 7.75 0 0 0 0 11" />
|
||||
</svg>
|
||||
);
|
||||
}
|
||||
|
||||
function IconSettings(): React.JSX.Element {
|
||||
return (
|
||||
<svg
|
||||
@@ -260,6 +280,11 @@ const NAV_GROUPS: NavGroup[] = [
|
||||
label: "Terminal",
|
||||
icon: <IconTerminal />,
|
||||
},
|
||||
{
|
||||
href: "/mission-control",
|
||||
label: "Mission Control",
|
||||
icon: <IconMissionControl />,
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
"use client";
|
||||
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
|
||||
export function GlobalAgentRoster(): React.JSX.Element {
|
||||
return (
|
||||
<Card className="flex h-full min-h-0 flex-col">
|
||||
<CardHeader>
|
||||
<CardTitle className="text-base">Agent Roster</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="flex flex-1 items-center justify-center text-sm text-muted-foreground">
|
||||
No active agents
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
"use client";
|
||||
|
||||
import { GlobalAgentRoster } from "@/components/mission-control/GlobalAgentRoster";
|
||||
import { MissionControlPanel } from "@/components/mission-control/MissionControlPanel";
|
||||
|
||||
const DEFAULT_PANEL_SLOTS = ["panel-1", "panel-2", "panel-3", "panel-4"] as const;
|
||||
|
||||
export function MissionControlLayout(): React.JSX.Element {
|
||||
return (
|
||||
<section className="h-full min-h-0 overflow-hidden" aria-label="Mission Control">
|
||||
<div className="grid h-full min-h-0 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]">
|
||||
<aside className="h-full min-h-0">
|
||||
<GlobalAgentRoster />
|
||||
</aside>
|
||||
<main className="h-full min-h-0 overflow-hidden">
|
||||
<MissionControlPanel panels={DEFAULT_PANEL_SLOTS} />
|
||||
</main>
|
||||
</div>
|
||||
</section>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
"use client";
|
||||
|
||||
import { OrchestratorPanel } from "@/components/mission-control/OrchestratorPanel";
|
||||
|
||||
interface MissionControlPanelProps {
|
||||
panels: readonly string[];
|
||||
}
|
||||
|
||||
export function MissionControlPanel({ panels }: MissionControlPanelProps): React.JSX.Element {
|
||||
return (
|
||||
<div className="grid h-full min-h-0 auto-rows-fr grid-cols-1 gap-4 overflow-y-auto pr-1 md:grid-cols-2">
|
||||
{panels.map((panelId) => (
|
||||
<OrchestratorPanel key={panelId} />
|
||||
))}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
"use client";
|
||||
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
|
||||
export function OrchestratorPanel(): React.JSX.Element {
|
||||
return (
|
||||
<Card className="flex h-full min-h-[220px] flex-col">
|
||||
<CardHeader>
|
||||
<CardTitle className="text-base">Orchestrator Panel</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="flex flex-1 items-center justify-center text-sm text-muted-foreground">
|
||||
Select an agent
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
10
apps/web/src/hooks/useMissionControl.ts
Normal file
10
apps/web/src/hooks/useMissionControl.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
interface UseMissionControlResult {
|
||||
sessions: [];
|
||||
loading: boolean;
|
||||
error: null;
|
||||
}
|
||||
|
||||
// Stub — will be wired in P2-002
|
||||
export function useMissionControl(): UseMissionControlResult {
|
||||
return { sessions: [], loading: false, error: null };
|
||||
}
|
||||
Reference in New Issue
Block a user