Compare commits
5 Commits
feat/ms23-
...
feat/ms23-
| Author | SHA1 | Date | |
|---|---|---|---|
| 52e7b0e6e7 | |||
| 81bf349270 | |||
| bcada71e88 | |||
| 9cc82e7fcf | |||
| 4b135ae1f0 |
@@ -0,0 +1,54 @@
|
|||||||
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Delete,
|
||||||
|
Get,
|
||||||
|
Param,
|
||||||
|
Patch,
|
||||||
|
Post,
|
||||||
|
UseGuards,
|
||||||
|
UsePipes,
|
||||||
|
ValidationPipe,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||||
|
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
|
||||||
|
import { AgentProvidersService } from "./agent-providers.service";
|
||||||
|
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||||
|
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||||
|
|
||||||
|
@Controller("agent-providers")
|
||||||
|
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
|
||||||
|
export class AgentProvidersController {
|
||||||
|
constructor(private readonly agentProvidersService: AgentProvidersService) {}
|
||||||
|
|
||||||
|
@Get()
|
||||||
|
async list(): Promise<AgentProviderConfig[]> {
|
||||||
|
return this.agentProvidersService.list();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get(":id")
|
||||||
|
async getById(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||||
|
return this.agentProvidersService.getById(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post()
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async create(@Body() dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||||
|
return this.agentProvidersService.create(dto);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Patch(":id")
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async update(
|
||||||
|
@Param("id") id: string,
|
||||||
|
@Body() dto: UpdateAgentProviderDto
|
||||||
|
): Promise<AgentProviderConfig> {
|
||||||
|
return this.agentProvidersService.update(id, dto);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Delete(":id")
|
||||||
|
async delete(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||||
|
return this.agentProvidersService.delete(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { PrismaModule } from "../../prisma/prisma.module";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||||
|
import { AgentProvidersController } from "./agent-providers.controller";
|
||||||
|
import { AgentProvidersService } from "./agent-providers.service";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [PrismaModule],
|
||||||
|
controllers: [AgentProvidersController],
|
||||||
|
providers: [OrchestratorApiKeyGuard, AgentProvidersService],
|
||||||
|
})
|
||||||
|
export class AgentProvidersModule {}
|
||||||
@@ -0,0 +1,211 @@
|
|||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { NotFoundException } from "@nestjs/common";
|
||||||
|
import { AgentProvidersService } from "./agent-providers.service";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
|
||||||
|
describe("AgentProvidersService", () => {
|
||||||
|
let service: AgentProvidersService;
|
||||||
|
let prisma: {
|
||||||
|
agentProviderConfig: {
|
||||||
|
findMany: ReturnType<typeof vi.fn>;
|
||||||
|
findUnique: ReturnType<typeof vi.fn>;
|
||||||
|
create: ReturnType<typeof vi.fn>;
|
||||||
|
update: ReturnType<typeof vi.fn>;
|
||||||
|
delete: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
prisma = {
|
||||||
|
agentProviderConfig: {
|
||||||
|
findMany: vi.fn(),
|
||||||
|
findUnique: vi.fn(),
|
||||||
|
create: vi.fn(),
|
||||||
|
update: vi.fn(),
|
||||||
|
delete: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
service = new AgentProvidersService(prisma as unknown as PrismaService);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("lists all provider configs", async () => {
|
||||||
|
const expected = [
|
||||||
|
{
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Primary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
prisma.agentProviderConfig.findMany.mockResolvedValue(expected);
|
||||||
|
|
||||||
|
const result = await service.list();
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
|
||||||
|
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||||
|
});
|
||||||
|
expect(result).toEqual(expected);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns a single provider config", async () => {
|
||||||
|
const expected = {
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Primary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: { apiKeyRef: "vault:openai" },
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
};
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue(expected);
|
||||||
|
|
||||||
|
const result = await service.getById("cfg-1");
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.findUnique).toHaveBeenCalledWith({
|
||||||
|
where: { id: "cfg-1" },
|
||||||
|
});
|
||||||
|
expect(result).toEqual(expected);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when provider config is missing", async () => {
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||||
|
|
||||||
|
await expect(service.getById("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("creates a provider config with default credentials", async () => {
|
||||||
|
const created = {
|
||||||
|
id: "cfg-created",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "New Provider",
|
||||||
|
provider: "claude",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
};
|
||||||
|
prisma.agentProviderConfig.create.mockResolvedValue(created);
|
||||||
|
|
||||||
|
const result = await service.create({
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "New Provider",
|
||||||
|
provider: "claude",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
|
||||||
|
data: {
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "New Provider",
|
||||||
|
provider: "claude",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(result).toEqual(created);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("updates a provider config", async () => {
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Primary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
});
|
||||||
|
|
||||||
|
const updated = {
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Secondary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway2.example.com",
|
||||||
|
credentials: { apiKeyRef: "vault:new" },
|
||||||
|
isActive: false,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
|
||||||
|
};
|
||||||
|
prisma.agentProviderConfig.update.mockResolvedValue(updated);
|
||||||
|
|
||||||
|
const result = await service.update("cfg-1", {
|
||||||
|
name: "Secondary",
|
||||||
|
gatewayUrl: "https://gateway2.example.com",
|
||||||
|
credentials: { apiKeyRef: "vault:new" },
|
||||||
|
isActive: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
|
||||||
|
where: { id: "cfg-1" },
|
||||||
|
data: {
|
||||||
|
name: "Secondary",
|
||||||
|
gatewayUrl: "https://gateway2.example.com",
|
||||||
|
credentials: { apiKeyRef: "vault:new" },
|
||||||
|
isActive: false,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(result).toEqual(updated);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when updating a missing provider config", async () => {
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||||
|
|
||||||
|
await expect(service.update("missing", { name: "Updated" })).rejects.toBeInstanceOf(
|
||||||
|
NotFoundException
|
||||||
|
);
|
||||||
|
expect(prisma.agentProviderConfig.update).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("deletes a provider config", async () => {
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Primary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
});
|
||||||
|
|
||||||
|
const deleted = {
|
||||||
|
id: "cfg-1",
|
||||||
|
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||||
|
name: "Primary",
|
||||||
|
provider: "openai",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: {},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||||
|
};
|
||||||
|
prisma.agentProviderConfig.delete.mockResolvedValue(deleted);
|
||||||
|
|
||||||
|
const result = await service.delete("cfg-1");
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.delete).toHaveBeenCalledWith({
|
||||||
|
where: { id: "cfg-1" },
|
||||||
|
});
|
||||||
|
expect(result).toEqual(deleted);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when deleting a missing provider config", async () => {
|
||||||
|
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||||
|
|
||||||
|
await expect(service.delete("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
expect(prisma.agentProviderConfig.delete).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,71 @@
|
|||||||
|
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig, Prisma } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||||
|
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AgentProvidersService {
|
||||||
|
constructor(private readonly prisma: PrismaService) {}
|
||||||
|
|
||||||
|
async list(): Promise<AgentProviderConfig[]> {
|
||||||
|
return this.prisma.agentProviderConfig.findMany({
|
||||||
|
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async getById(id: string): Promise<AgentProviderConfig> {
|
||||||
|
const providerConfig = await this.prisma.agentProviderConfig.findUnique({
|
||||||
|
where: { id },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!providerConfig) {
|
||||||
|
throw new NotFoundException(`Agent provider config with id ${id} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return providerConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
async create(dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||||
|
return this.prisma.agentProviderConfig.create({
|
||||||
|
data: {
|
||||||
|
workspaceId: dto.workspaceId,
|
||||||
|
name: dto.name,
|
||||||
|
provider: dto.provider,
|
||||||
|
gatewayUrl: dto.gatewayUrl,
|
||||||
|
credentials: this.toJsonValue(dto.credentials ?? {}),
|
||||||
|
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async update(id: string, dto: UpdateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||||
|
await this.getById(id);
|
||||||
|
|
||||||
|
const data: Prisma.AgentProviderConfigUpdateInput = {
|
||||||
|
...(dto.workspaceId !== undefined ? { workspaceId: dto.workspaceId } : {}),
|
||||||
|
...(dto.name !== undefined ? { name: dto.name } : {}),
|
||||||
|
...(dto.provider !== undefined ? { provider: dto.provider } : {}),
|
||||||
|
...(dto.gatewayUrl !== undefined ? { gatewayUrl: dto.gatewayUrl } : {}),
|
||||||
|
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||||
|
...(dto.credentials !== undefined ? { credentials: this.toJsonValue(dto.credentials) } : {}),
|
||||||
|
};
|
||||||
|
|
||||||
|
return this.prisma.agentProviderConfig.update({
|
||||||
|
where: { id },
|
||||||
|
data,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete(id: string): Promise<AgentProviderConfig> {
|
||||||
|
await this.getById(id);
|
||||||
|
|
||||||
|
return this.prisma.agentProviderConfig.delete({
|
||||||
|
where: { id },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||||
|
return value as Prisma.InputJsonValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||||
|
|
||||||
|
export class CreateAgentProviderDto {
|
||||||
|
@IsUUID()
|
||||||
|
workspaceId!: string;
|
||||||
|
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
name!: string;
|
||||||
|
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
provider!: string;
|
||||||
|
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
gatewayUrl!: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsObject()
|
||||||
|
credentials?: Record<string, unknown>;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
isActive?: boolean;
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||||
|
|
||||||
|
export class UpdateAgentProviderDto {
|
||||||
|
@IsOptional()
|
||||||
|
@IsUUID()
|
||||||
|
workspaceId?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
name?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
provider?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@IsNotEmpty()
|
||||||
|
gatewayUrl?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsObject()
|
||||||
|
credentials?: Record<string, unknown>;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
isActive?: boolean;
|
||||||
|
}
|
||||||
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import type {
|
||||||
|
AgentMessage,
|
||||||
|
AgentSession,
|
||||||
|
AgentSessionList,
|
||||||
|
IAgentProvider,
|
||||||
|
InjectResult,
|
||||||
|
} from "@mosaic/shared";
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||||
|
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||||
|
|
||||||
|
type MockProvider = IAgentProvider & {
|
||||||
|
listSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getSession: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
const createProvider = (providerId: string, sessions: AgentSession[] = []): MockProvider => {
|
||||||
|
return {
|
||||||
|
providerId,
|
||||||
|
providerType: providerId,
|
||||||
|
displayName: providerId,
|
||||||
|
listSessions: vi.fn().mockResolvedValue({
|
||||||
|
sessions,
|
||||||
|
total: sessions.length,
|
||||||
|
} as AgentSessionList),
|
||||||
|
getSession: vi.fn().mockResolvedValue(null),
|
||||||
|
getMessages: vi.fn().mockResolvedValue([]),
|
||||||
|
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||||
|
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
killSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
describe("AgentProviderRegistry", () => {
|
||||||
|
let registry: AgentProviderRegistry;
|
||||||
|
let internalProvider: MockProvider;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
internalProvider = createProvider("internal");
|
||||||
|
registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("registers InternalAgentProvider on module init", () => {
|
||||||
|
registry.onModuleInit();
|
||||||
|
|
||||||
|
expect(registry.getProvider("internal")).toBe(internalProvider);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("registers providers and returns null for unknown provider ids", () => {
|
||||||
|
const externalProvider = createProvider("openclaw");
|
||||||
|
|
||||||
|
registry.registerProvider(externalProvider);
|
||||||
|
|
||||||
|
expect(registry.getProvider("openclaw")).toBe(externalProvider);
|
||||||
|
expect(registry.getProvider("missing")).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("aggregates and sorts sessions from all providers", async () => {
|
||||||
|
const internalSessions: AgentSession[] = [
|
||||||
|
{
|
||||||
|
id: "session-older",
|
||||||
|
providerId: "internal",
|
||||||
|
providerType: "internal",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T10:10:00.000Z"),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
const externalSessions: AgentSession[] = [
|
||||||
|
{
|
||||||
|
id: "session-newer",
|
||||||
|
providerId: "openclaw",
|
||||||
|
providerType: "external",
|
||||||
|
status: "paused",
|
||||||
|
createdAt: new Date("2026-03-07T09:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T10:20:00.000Z"),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
internalProvider.listSessions.mockResolvedValue({
|
||||||
|
sessions: internalSessions,
|
||||||
|
total: internalSessions.length,
|
||||||
|
} as AgentSessionList);
|
||||||
|
|
||||||
|
const externalProvider = createProvider("openclaw", externalSessions);
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(externalProvider);
|
||||||
|
|
||||||
|
const result = await registry.listAllSessions();
|
||||||
|
|
||||||
|
expect(result.map((session) => session.id)).toEqual(["session-newer", "session-older"]);
|
||||||
|
expect(internalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||||
|
expect(externalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips provider failures and logs warning", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
|
||||||
|
const healthyProvider = createProvider("healthy", [
|
||||||
|
{
|
||||||
|
id: "session-1",
|
||||||
|
providerId: "healthy",
|
||||||
|
providerType: "external",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
const failingProvider = createProvider("failing");
|
||||||
|
failingProvider.listSessions.mockRejectedValue(new Error("provider offline"));
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(healthyProvider);
|
||||||
|
registry.registerProvider(failingProvider);
|
||||||
|
|
||||||
|
const result = await registry.listAllSessions();
|
||||||
|
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]?.id).toBe("session-1");
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Failed to list sessions for provider failing")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("finds a provider for an existing session", async () => {
|
||||||
|
const targetSession: AgentSession = {
|
||||||
|
id: "session-found",
|
||||||
|
providerId: "openclaw",
|
||||||
|
providerType: "external",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T12:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
const openclawProvider = createProvider("openclaw");
|
||||||
|
openclawProvider.getSession.mockResolvedValue(targetSession);
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(openclawProvider);
|
||||||
|
|
||||||
|
const result = await registry.getProviderForSession(targetSession.id);
|
||||||
|
|
||||||
|
expect(result).toEqual({
|
||||||
|
provider: openclawProvider,
|
||||||
|
session: targetSession,
|
||||||
|
});
|
||||||
|
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||||
|
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null when no provider has the requested session", async () => {
|
||||||
|
const openclawProvider = createProvider("openclaw");
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(openclawProvider);
|
||||||
|
|
||||||
|
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("continues searching providers when getSession throws", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const failingProvider = createProvider("failing");
|
||||||
|
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
|
||||||
|
|
||||||
|
const healthySession: AgentSession = {
|
||||||
|
id: "session-healthy",
|
||||||
|
providerId: "healthy",
|
||||||
|
providerType: "external",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T12:15:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
const healthyProvider = createProvider("healthy");
|
||||||
|
healthyProvider.getSession.mockResolvedValue(healthySession);
|
||||||
|
|
||||||
|
registry.onModuleInit();
|
||||||
|
registry.registerProvider(failingProvider);
|
||||||
|
registry.registerProvider(healthyProvider);
|
||||||
|
|
||||||
|
const result = await registry.getProviderForSession(healthySession.id);
|
||||||
|
|
||||||
|
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Failed to get session session-healthy for provider failing")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
|
||||||
|
import type { AgentSession, IAgentProvider } from "@mosaic/shared";
|
||||||
|
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AgentProviderRegistry implements OnModuleInit {
|
||||||
|
private readonly logger = new Logger(AgentProviderRegistry.name);
|
||||||
|
private readonly providers = new Map<string, IAgentProvider>();
|
||||||
|
|
||||||
|
constructor(private readonly internalProvider: InternalAgentProvider) {}
|
||||||
|
|
||||||
|
onModuleInit(): void {
|
||||||
|
this.registerProvider(this.internalProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
registerProvider(provider: IAgentProvider): void {
|
||||||
|
const existingProvider = this.providers.get(provider.providerId);
|
||||||
|
if (existingProvider !== undefined) {
|
||||||
|
this.logger.warn(`Replacing existing provider registration for ${provider.providerId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.providers.set(provider.providerId, provider);
|
||||||
|
}
|
||||||
|
|
||||||
|
getProvider(providerId: string): IAgentProvider | null {
|
||||||
|
return this.providers.get(providerId) ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getProviderForSession(
|
||||||
|
sessionId: string
|
||||||
|
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
|
||||||
|
for (const provider of this.providers.values()) {
|
||||||
|
try {
|
||||||
|
const session = await provider.getSession(sessionId);
|
||||||
|
if (session !== null) {
|
||||||
|
return {
|
||||||
|
provider,
|
||||||
|
session,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async listAllSessions(): Promise<AgentSession[]> {
|
||||||
|
const providers = [...this.providers.values()];
|
||||||
|
const sessionsByProvider = await Promise.all(
|
||||||
|
providers.map(async (provider) => {
|
||||||
|
try {
|
||||||
|
const { sessions } = await provider.listSessions();
|
||||||
|
return sessions;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to list sessions for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||||
|
);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
return sessionsByProvider
|
||||||
|
.flat()
|
||||||
|
.sort((left, right) => right.updatedAt.getTime() - left.updatedAt.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
private toErrorMessage(error: unknown): string {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return error.message;
|
||||||
|
}
|
||||||
|
|
||||||
|
return String(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ import { AgentMessagesService } from "./agent-messages.service";
|
|||||||
import { AgentControlService } from "./agent-control.service";
|
import { AgentControlService } from "./agent-control.service";
|
||||||
import { AgentTreeService } from "./agent-tree.service";
|
import { AgentTreeService } from "./agent-tree.service";
|
||||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||||
|
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
|
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
|
||||||
@@ -22,7 +23,8 @@ import { InternalAgentProvider } from "./internal-agent.provider";
|
|||||||
AgentControlService,
|
AgentControlService,
|
||||||
AgentTreeService,
|
AgentTreeService,
|
||||||
InternalAgentProvider,
|
InternalAgentProvider,
|
||||||
|
AgentProviderRegistry,
|
||||||
],
|
],
|
||||||
exports: [InternalAgentProvider],
|
exports: [InternalAgentProvider, AgentProviderRegistry],
|
||||||
})
|
})
|
||||||
export class AgentsModule {}
|
export class AgentsModule {}
|
||||||
|
|||||||
@@ -0,0 +1,15 @@
|
|||||||
|
import { Type } from "class-transformer";
|
||||||
|
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
|
||||||
|
|
||||||
|
export class GetMissionControlMessagesQueryDto {
|
||||||
|
@IsOptional()
|
||||||
|
@Type(() => Number)
|
||||||
|
@IsInt()
|
||||||
|
@Min(1)
|
||||||
|
@Max(200)
|
||||||
|
limit?: number;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
before?: string;
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
import { IsBoolean, IsOptional } from "class-validator";
|
||||||
|
|
||||||
|
export class KillSessionDto {
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
force?: boolean;
|
||||||
|
}
|
||||||
@@ -0,0 +1,183 @@
|
|||||||
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Get,
|
||||||
|
Header,
|
||||||
|
HttpCode,
|
||||||
|
MessageEvent,
|
||||||
|
Param,
|
||||||
|
Post,
|
||||||
|
Query,
|
||||||
|
Request,
|
||||||
|
Sse,
|
||||||
|
UseGuards,
|
||||||
|
UsePipes,
|
||||||
|
ValidationPipe,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
|
||||||
|
import { Observable } from "rxjs";
|
||||||
|
import { AuthGuard } from "../../auth/guards/auth.guard";
|
||||||
|
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
|
||||||
|
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
|
||||||
|
import { KillSessionDto } from "./dto/kill-session.dto";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||||
|
|
||||||
|
interface MissionControlRequest {
|
||||||
|
user?: {
|
||||||
|
id?: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Controller("api/mission-control")
|
||||||
|
@UseGuards(AuthGuard)
|
||||||
|
export class MissionControlController {
|
||||||
|
constructor(private readonly missionControlService: MissionControlService) {}
|
||||||
|
|
||||||
|
@Get("sessions")
|
||||||
|
async listSessions(): Promise<{ sessions: AgentSession[] }> {
|
||||||
|
const sessions = await this.missionControlService.listSessions();
|
||||||
|
return { sessions };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId")
|
||||||
|
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
|
||||||
|
return this.missionControlService.getSession(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/messages")
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async getMessages(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Query() query: GetMissionControlMessagesQueryDto
|
||||||
|
): Promise<{ messages: AgentMessage[] }> {
|
||||||
|
const messages = await this.missionControlService.getMessages(
|
||||||
|
sessionId,
|
||||||
|
query.limit,
|
||||||
|
query.before
|
||||||
|
);
|
||||||
|
|
||||||
|
return { messages };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/inject")
|
||||||
|
@HttpCode(200)
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
injectMessage(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() dto: InjectAgentDto,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<InjectResult> {
|
||||||
|
return this.missionControlService.injectMessage(
|
||||||
|
sessionId,
|
||||||
|
dto.message,
|
||||||
|
this.resolveOperatorId(req)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/pause")
|
||||||
|
@HttpCode(200)
|
||||||
|
async pauseSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} paused` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/resume")
|
||||||
|
@HttpCode(200)
|
||||||
|
async resumeSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} resumed` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/kill")
|
||||||
|
@HttpCode(200)
|
||||||
|
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||||
|
async killSession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() dto: KillSessionDto,
|
||||||
|
@Request() req: MissionControlRequest
|
||||||
|
): Promise<{ message: string }> {
|
||||||
|
await this.missionControlService.killSession(
|
||||||
|
sessionId,
|
||||||
|
dto.force ?? true,
|
||||||
|
this.resolveOperatorId(req)
|
||||||
|
);
|
||||||
|
|
||||||
|
return { message: `Session ${sessionId} killed` };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Sse("sessions/:sessionId/stream")
|
||||||
|
@Header("Content-Type", "text/event-stream")
|
||||||
|
@Header("Cache-Control", "no-cache")
|
||||||
|
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
|
||||||
|
return new Observable<MessageEvent>((subscriber) => {
|
||||||
|
let isClosed = false;
|
||||||
|
let iterator: AsyncIterator<AgentMessage> | null = null;
|
||||||
|
|
||||||
|
void this.missionControlService
|
||||||
|
.streamMessages(sessionId)
|
||||||
|
.then(async (stream) => {
|
||||||
|
iterator = stream[Symbol.asyncIterator]();
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (isClosed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
|
||||||
|
if (next.done) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.next({
|
||||||
|
data: this.toStreamPayload(next.value),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
subscriber.complete();
|
||||||
|
})
|
||||||
|
.catch((error: unknown) => {
|
||||||
|
subscriber.error(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
isClosed = true;
|
||||||
|
void iterator?.return?.();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveOperatorId(req: MissionControlRequest): string {
|
||||||
|
const operatorId = req.user?.id;
|
||||||
|
return typeof operatorId === "string" && operatorId.length > 0
|
||||||
|
? operatorId
|
||||||
|
: DEFAULT_OPERATOR_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toStreamPayload(message: AgentMessage): {
|
||||||
|
id: string;
|
||||||
|
sessionId: string;
|
||||||
|
role: string;
|
||||||
|
content: string;
|
||||||
|
timestamp: string;
|
||||||
|
metadata?: Record<string, unknown>;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
id: message.id,
|
||||||
|
sessionId: message.sessionId,
|
||||||
|
role: message.role,
|
||||||
|
content: message.content,
|
||||||
|
timestamp: message.timestamp.toISOString(),
|
||||||
|
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { AgentsModule } from "../agents/agents.module";
|
||||||
|
import { AuthModule } from "../../auth/auth.module";
|
||||||
|
import { PrismaModule } from "../../prisma/prisma.module";
|
||||||
|
import { MissionControlController } from "./mission-control.controller";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [AgentsModule, AuthModule, PrismaModule],
|
||||||
|
controllers: [MissionControlController],
|
||||||
|
providers: [MissionControlService],
|
||||||
|
})
|
||||||
|
export class MissionControlModule {}
|
||||||
@@ -0,0 +1,213 @@
|
|||||||
|
import { NotFoundException } from "@nestjs/common";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||||
|
import type { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
import { MissionControlService } from "./mission-control.service";
|
||||||
|
|
||||||
|
type MockProvider = IAgentProvider & {
|
||||||
|
listSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getSession: ReturnType<typeof vi.fn>;
|
||||||
|
getMessages: ReturnType<typeof vi.fn>;
|
||||||
|
injectMessage: ReturnType<typeof vi.fn>;
|
||||||
|
pauseSession: ReturnType<typeof vi.fn>;
|
||||||
|
resumeSession: ReturnType<typeof vi.fn>;
|
||||||
|
killSession: ReturnType<typeof vi.fn>;
|
||||||
|
streamMessages: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
const createProvider = (providerId = "internal"): MockProvider => ({
|
||||||
|
providerId,
|
||||||
|
providerType: providerId,
|
||||||
|
displayName: providerId,
|
||||||
|
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
|
||||||
|
getSession: vi.fn().mockResolvedValue(null),
|
||||||
|
getMessages: vi.fn().mockResolvedValue([]),
|
||||||
|
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||||
|
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
killSession: vi.fn().mockResolvedValue(undefined),
|
||||||
|
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(true),
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("MissionControlService", () => {
|
||||||
|
let service: MissionControlService;
|
||||||
|
let registry: {
|
||||||
|
listAllSessions: ReturnType<typeof vi.fn>;
|
||||||
|
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
let prisma: {
|
||||||
|
operatorAuditLog: {
|
||||||
|
create: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
const session: AgentSession = {
|
||||||
|
id: "session-1",
|
||||||
|
providerId: "internal",
|
||||||
|
providerType: "internal",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T14:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
registry = {
|
||||||
|
listAllSessions: vi.fn().mockResolvedValue([session]),
|
||||||
|
getProviderForSession: vi.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
|
|
||||||
|
prisma = {
|
||||||
|
operatorAuditLog: {
|
||||||
|
create: vi.fn().mockResolvedValue(undefined),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
service = new MissionControlService(
|
||||||
|
registry as unknown as AgentProviderRegistry,
|
||||||
|
prisma as unknown as PrismaService
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("lists sessions from the registry", async () => {
|
||||||
|
await expect(service.listSessions()).resolves.toEqual([session]);
|
||||||
|
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns a session when it is found", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.getSession(session.id)).resolves.toEqual(session);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when session lookup fails", async () => {
|
||||||
|
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("gets messages from the resolved provider", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
const messages: AgentMessage[] = [
|
||||||
|
{
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: session.id,
|
||||||
|
role: "assistant",
|
||||||
|
content: "hello",
|
||||||
|
timestamp: new Date("2026-03-07T14:01:00.000Z"),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
provider.getMessages.mockResolvedValue(messages);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
|
||||||
|
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("injects a message and writes an audit log", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
|
||||||
|
provider.injectMessage.mockResolvedValue(injectResult);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
|
||||||
|
injectResult
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "operator-1",
|
||||||
|
provider: "internal",
|
||||||
|
action: "inject",
|
||||||
|
content: "ship it",
|
||||||
|
metadata: {
|
||||||
|
payload: { message: "ship it" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("pauses and resumes using default operator id", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await service.pauseSession(session.id);
|
||||||
|
await service.resumeSession(session.id);
|
||||||
|
|
||||||
|
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
|
||||||
|
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "mission-control",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "pause",
|
||||||
|
metadata: {
|
||||||
|
payload: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "mission-control",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "resume",
|
||||||
|
metadata: {
|
||||||
|
payload: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("kills with provided force value and writes audit log", async () => {
|
||||||
|
const provider = createProvider("openclaw");
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await service.killSession(session.id, false, "operator-2");
|
||||||
|
|
||||||
|
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
|
||||||
|
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||||
|
data: {
|
||||||
|
sessionId: session.id,
|
||||||
|
userId: "operator-2",
|
||||||
|
provider: "openclaw",
|
||||||
|
action: "kill",
|
||||||
|
metadata: {
|
||||||
|
payload: { force: false },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves provider message stream", async () => {
|
||||||
|
const provider = createProvider("internal");
|
||||||
|
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
|
||||||
|
yield {
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: session.id,
|
||||||
|
role: "assistant",
|
||||||
|
content: "stream",
|
||||||
|
timestamp: new Date("2026-03-07T14:03:00.000Z"),
|
||||||
|
};
|
||||||
|
})();
|
||||||
|
|
||||||
|
provider.streamMessages.mockReturnValue(messageStream);
|
||||||
|
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||||
|
|
||||||
|
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
|
||||||
|
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not write audit log when session cannot be resolved", async () => {
|
||||||
|
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||||
|
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,139 @@
|
|||||||
|
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||||
|
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||||
|
import type { Prisma } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
|
||||||
|
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
|
||||||
|
|
||||||
|
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class MissionControlService {
|
||||||
|
constructor(
|
||||||
|
private readonly registry: AgentProviderRegistry,
|
||||||
|
private readonly prisma: PrismaService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
listSessions(): Promise<AgentSession[]> {
|
||||||
|
return this.registry.listAllSessions();
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSession(sessionId: string): Promise<AgentSession> {
|
||||||
|
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||||
|
if (!resolved) {
|
||||||
|
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resolved.session;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
return provider.getMessages(sessionId, limit, before);
|
||||||
|
}
|
||||||
|
|
||||||
|
async injectMessage(
|
||||||
|
sessionId: string,
|
||||||
|
message: string,
|
||||||
|
operatorId = DEFAULT_OPERATOR_ID
|
||||||
|
): Promise<InjectResult> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
const result = await provider.injectMessage(sessionId, message);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "inject",
|
||||||
|
content: message,
|
||||||
|
payload: { message },
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.pauseSession(sessionId);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "pause",
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.resumeSession(sessionId);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "resume",
|
||||||
|
payload: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async killSession(
|
||||||
|
sessionId: string,
|
||||||
|
force = true,
|
||||||
|
operatorId = DEFAULT_OPERATOR_ID
|
||||||
|
): Promise<void> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
await provider.killSession(sessionId, force);
|
||||||
|
|
||||||
|
await this.writeOperatorAuditLog({
|
||||||
|
sessionId,
|
||||||
|
providerId: provider.providerId,
|
||||||
|
operatorId,
|
||||||
|
action: "kill",
|
||||||
|
payload: { force },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
|
||||||
|
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||||
|
return provider.streamMessages(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getProviderForSessionOrThrow(
|
||||||
|
sessionId: string
|
||||||
|
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
|
||||||
|
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||||
|
|
||||||
|
if (!resolved) {
|
||||||
|
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resolved;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||||
|
return value as Prisma.InputJsonValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async writeOperatorAuditLog(params: {
|
||||||
|
sessionId: string;
|
||||||
|
providerId: string;
|
||||||
|
operatorId: string;
|
||||||
|
action: MissionControlAction;
|
||||||
|
content?: string;
|
||||||
|
payload: Record<string, unknown>;
|
||||||
|
}): Promise<void> {
|
||||||
|
await this.prisma.operatorAuditLog.create({
|
||||||
|
data: {
|
||||||
|
sessionId: params.sessionId,
|
||||||
|
userId: params.operatorId,
|
||||||
|
provider: params.providerId,
|
||||||
|
action: params.action,
|
||||||
|
...(params.content !== undefined ? { content: params.content } : {}),
|
||||||
|
metadata: this.toJsonValue({ payload: params.payload }),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,7 +4,9 @@ import { BullModule } from "@nestjs/bullmq";
|
|||||||
import { ThrottlerModule } from "@nestjs/throttler";
|
import { ThrottlerModule } from "@nestjs/throttler";
|
||||||
import { HealthModule } from "./api/health/health.module";
|
import { HealthModule } from "./api/health/health.module";
|
||||||
import { AgentsModule } from "./api/agents/agents.module";
|
import { AgentsModule } from "./api/agents/agents.module";
|
||||||
|
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
||||||
import { QueueApiModule } from "./api/queue/queue-api.module";
|
import { QueueApiModule } from "./api/queue/queue-api.module";
|
||||||
|
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
||||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||||
import { BudgetModule } from "./budget/budget.module";
|
import { BudgetModule } from "./budget/budget.module";
|
||||||
import { CIModule } from "./ci";
|
import { CIModule } from "./ci";
|
||||||
@@ -51,6 +53,8 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
|||||||
]),
|
]),
|
||||||
HealthModule,
|
HealthModule,
|
||||||
AgentsModule,
|
AgentsModule,
|
||||||
|
AgentProvidersModule,
|
||||||
|
MissionControlModule,
|
||||||
QueueApiModule,
|
QueueApiModule,
|
||||||
CoordinatorModule,
|
CoordinatorModule,
|
||||||
BudgetModule,
|
BudgetModule,
|
||||||
|
|||||||
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
import { AuthGuard } from "./guards/auth.guard";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
providers: [OrchestratorApiKeyGuard, AuthGuard],
|
||||||
|
exports: [AuthGuard],
|
||||||
|
})
|
||||||
|
export class AuthModule {}
|
||||||
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
|
||||||
|
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AuthGuard implements CanActivate {
|
||||||
|
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
|
||||||
|
|
||||||
|
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
|
||||||
|
return this.apiKeyGuard.canActivate(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user