Compare commits
16 Commits
chore/ms23
...
feat/ms23-
| Author | SHA1 | Date | |
|---|---|---|---|
| b2c751caca | |||
| 2c36569f85 | |||
| 7c086db7e4 | |||
| 577e6141e0 | |||
| 631ba499e3 | |||
| a61106c24a | |||
| 487aac6903 | |||
| 544e828e58 | |||
| 9489bc63f8 | |||
| ad644799aa | |||
| 81bf349270 | |||
| bcada71e88 | |||
| 9cc82e7fcf | |||
| 4b135ae1f0 | |||
| 364619b332 | |||
| 18ed3a5411 |
@@ -0,0 +1,54 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Delete,
|
||||
Get,
|
||||
Param,
|
||||
Patch,
|
||||
Post,
|
||||
UseGuards,
|
||||
UsePipes,
|
||||
ValidationPipe,
|
||||
} from "@nestjs/common";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||
|
||||
@Controller("agent-providers")
|
||||
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
|
||||
export class AgentProvidersController {
|
||||
constructor(private readonly agentProvidersService: AgentProvidersService) {}
|
||||
|
||||
@Get()
|
||||
async list(): Promise<AgentProviderConfig[]> {
|
||||
return this.agentProvidersService.list();
|
||||
}
|
||||
|
||||
@Get(":id")
|
||||
async getById(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.getById(id);
|
||||
}
|
||||
|
||||
@Post()
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async create(@Body() dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.create(dto);
|
||||
}
|
||||
|
||||
@Patch(":id")
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async update(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: UpdateAgentProviderDto
|
||||
): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.update(id, dto);
|
||||
}
|
||||
|
||||
@Delete(":id")
|
||||
async delete(@Param("id") id: string): Promise<AgentProviderConfig> {
|
||||
return this.agentProvidersService.delete(id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
import { AgentProvidersController } from "./agent-providers.controller";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
|
||||
@Module({
|
||||
imports: [PrismaModule],
|
||||
controllers: [AgentProvidersController],
|
||||
providers: [OrchestratorApiKeyGuard, AgentProvidersService],
|
||||
})
|
||||
export class AgentProvidersModule {}
|
||||
@@ -0,0 +1,211 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
|
||||
describe("AgentProvidersService", () => {
|
||||
let service: AgentProvidersService;
|
||||
let prisma: {
|
||||
agentProviderConfig: {
|
||||
findMany: ReturnType<typeof vi.fn>;
|
||||
findUnique: ReturnType<typeof vi.fn>;
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
update: ReturnType<typeof vi.fn>;
|
||||
delete: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = {
|
||||
agentProviderConfig: {
|
||||
findMany: vi.fn(),
|
||||
findUnique: vi.fn(),
|
||||
create: vi.fn(),
|
||||
update: vi.fn(),
|
||||
delete: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
service = new AgentProvidersService(prisma as unknown as PrismaService);
|
||||
});
|
||||
|
||||
it("lists all provider configs", async () => {
|
||||
const expected = [
|
||||
{
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
},
|
||||
];
|
||||
prisma.agentProviderConfig.findMany.mockResolvedValue(expected);
|
||||
|
||||
const result = await service.list();
|
||||
|
||||
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
|
||||
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||
});
|
||||
expect(result).toEqual(expected);
|
||||
});
|
||||
|
||||
it("returns a single provider config", async () => {
|
||||
const expected = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: { apiKeyRef: "vault:openai" },
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(expected);
|
||||
|
||||
const result = await service.getById("cfg-1");
|
||||
|
||||
expect(prisma.agentProviderConfig.findUnique).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
});
|
||||
expect(result).toEqual(expected);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when provider config is missing", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.getById("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||
});
|
||||
|
||||
it("creates a provider config with default credentials", async () => {
|
||||
const created = {
|
||||
id: "cfg-created",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.create.mockResolvedValue(created);
|
||||
|
||||
const result = await service.create({
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
});
|
||||
|
||||
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "New Provider",
|
||||
provider: "claude",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(created);
|
||||
});
|
||||
|
||||
it("updates a provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
});
|
||||
|
||||
const updated = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Secondary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.update.mockResolvedValue(updated);
|
||||
|
||||
const result = await service.update("cfg-1", {
|
||||
name: "Secondary",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
});
|
||||
|
||||
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
data: {
|
||||
name: "Secondary",
|
||||
gatewayUrl: "https://gateway2.example.com",
|
||||
credentials: { apiKeyRef: "vault:new" },
|
||||
isActive: false,
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(updated);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when updating a missing provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.update("missing", { name: "Updated" })).rejects.toBeInstanceOf(
|
||||
NotFoundException
|
||||
);
|
||||
expect(prisma.agentProviderConfig.update).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("deletes a provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
});
|
||||
|
||||
const deleted = {
|
||||
id: "cfg-1",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "Primary",
|
||||
provider: "openai",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.delete.mockResolvedValue(deleted);
|
||||
|
||||
const result = await service.delete("cfg-1");
|
||||
|
||||
expect(prisma.agentProviderConfig.delete).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-1" },
|
||||
});
|
||||
expect(result).toEqual(deleted);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when deleting a missing provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(service.delete("missing")).rejects.toBeInstanceOf(NotFoundException);
|
||||
expect(prisma.agentProviderConfig.delete).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,71 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import type { AgentProviderConfig, Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||
|
||||
@Injectable()
|
||||
export class AgentProvidersService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async list(): Promise<AgentProviderConfig[]> {
|
||||
return this.prisma.agentProviderConfig.findMany({
|
||||
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
|
||||
});
|
||||
}
|
||||
|
||||
async getById(id: string): Promise<AgentProviderConfig> {
|
||||
const providerConfig = await this.prisma.agentProviderConfig.findUnique({
|
||||
where: { id },
|
||||
});
|
||||
|
||||
if (!providerConfig) {
|
||||
throw new NotFoundException(`Agent provider config with id ${id} not found`);
|
||||
}
|
||||
|
||||
return providerConfig;
|
||||
}
|
||||
|
||||
async create(dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
return this.prisma.agentProviderConfig.create({
|
||||
data: {
|
||||
workspaceId: dto.workspaceId,
|
||||
name: dto.name,
|
||||
provider: dto.provider,
|
||||
gatewayUrl: dto.gatewayUrl,
|
||||
credentials: this.toJsonValue(dto.credentials ?? {}),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async update(id: string, dto: UpdateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
await this.getById(id);
|
||||
|
||||
const data: Prisma.AgentProviderConfigUpdateInput = {
|
||||
...(dto.workspaceId !== undefined ? { workspaceId: dto.workspaceId } : {}),
|
||||
...(dto.name !== undefined ? { name: dto.name } : {}),
|
||||
...(dto.provider !== undefined ? { provider: dto.provider } : {}),
|
||||
...(dto.gatewayUrl !== undefined ? { gatewayUrl: dto.gatewayUrl } : {}),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
...(dto.credentials !== undefined ? { credentials: this.toJsonValue(dto.credentials) } : {}),
|
||||
};
|
||||
|
||||
return this.prisma.agentProviderConfig.update({
|
||||
where: { id },
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
async delete(id: string): Promise<AgentProviderConfig> {
|
||||
await this.getById(id);
|
||||
|
||||
return this.prisma.agentProviderConfig.delete({
|
||||
where: { id },
|
||||
});
|
||||
}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||
|
||||
export class CreateAgentProviderDto {
|
||||
@IsUUID()
|
||||
workspaceId!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
name!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
provider!: string;
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
gatewayUrl!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsObject()
|
||||
credentials?: Record<string, unknown>;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
isActive?: boolean;
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
|
||||
|
||||
export class UpdateAgentProviderDto {
|
||||
@IsOptional()
|
||||
@IsUUID()
|
||||
workspaceId?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
name?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
provider?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
gatewayUrl?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsObject()
|
||||
credentials?: Record<string, unknown>;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
isActive?: boolean;
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||
import { AgentControlService } from "./agent-control.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { KillswitchService } from "../../killswitch/killswitch.service";
|
||||
|
||||
describe("AgentControlService", () => {
|
||||
let service: AgentControlService;
|
||||
@@ -16,6 +17,9 @@ describe("AgentControlService", () => {
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
let killswitchService: {
|
||||
killAgent: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = {
|
||||
@@ -31,7 +35,14 @@ describe("AgentControlService", () => {
|
||||
},
|
||||
};
|
||||
|
||||
service = new AgentControlService(prisma as unknown as PrismaService);
|
||||
killswitchService = {
|
||||
killAgent: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
service = new AgentControlService(
|
||||
prisma as unknown as PrismaService,
|
||||
killswitchService as unknown as KillswitchService
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -137,4 +148,25 @@ describe("AgentControlService", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("killAgent", () => {
|
||||
it("delegates kill to killswitch and logs audit", async () => {
|
||||
await service.killAgent("agent-654", "operator-kill", false);
|
||||
|
||||
expect(killswitchService.killAgent).toHaveBeenCalledWith("agent-654");
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
sessionId: "agent-654",
|
||||
userId: "operator-kill",
|
||||
provider: "internal",
|
||||
action: "kill",
|
||||
metadata: {
|
||||
payload: {
|
||||
force: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import type { Prisma } from "@prisma/client";
|
||||
import { KillswitchService } from "../../killswitch/killswitch.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
|
||||
@Injectable()
|
||||
export class AgentControlService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly killswitchService: KillswitchService
|
||||
) {}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
@@ -13,7 +17,7 @@ export class AgentControlService {
|
||||
private async createOperatorAuditLog(
|
||||
agentId: string,
|
||||
operatorId: string,
|
||||
action: "inject" | "pause" | "resume",
|
||||
action: "inject" | "pause" | "resume" | "kill",
|
||||
payload: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
await this.prisma.operatorAuditLog.create({
|
||||
@@ -65,4 +69,9 @@ export class AgentControlService {
|
||||
|
||||
await this.createOperatorAuditLog(agentId, operatorId, "resume", {});
|
||||
}
|
||||
|
||||
async killAgent(agentId: string, operatorId: string, force = true): Promise<void> {
|
||||
await this.killswitchService.killAgent(agentId);
|
||||
await this.createOperatorAuditLog(agentId, operatorId, "kill", { force });
|
||||
}
|
||||
}
|
||||
|
||||
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
202
apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts
Normal file
@@ -0,0 +1,202 @@
|
||||
import { Logger } from "@nestjs/common";
|
||||
import type {
|
||||
AgentMessage,
|
||||
AgentSession,
|
||||
AgentSessionList,
|
||||
IAgentProvider,
|
||||
InjectResult,
|
||||
} from "@mosaic/shared";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
|
||||
type MockProvider = IAgentProvider & {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||
return;
|
||||
};
|
||||
|
||||
const createProvider = (providerId: string, sessions: AgentSession[] = []): MockProvider => {
|
||||
return {
|
||||
providerId,
|
||||
providerType: providerId,
|
||||
displayName: providerId,
|
||||
listSessions: vi.fn().mockResolvedValue({
|
||||
sessions,
|
||||
total: sessions.length,
|
||||
} as AgentSessionList),
|
||||
getSession: vi.fn().mockResolvedValue(null),
|
||||
getMessages: vi.fn().mockResolvedValue([]),
|
||||
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||
killSession: vi.fn().mockResolvedValue(undefined),
|
||||
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||
isAvailable: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
};
|
||||
|
||||
describe("AgentProviderRegistry", () => {
|
||||
let registry: AgentProviderRegistry;
|
||||
let internalProvider: MockProvider;
|
||||
|
||||
beforeEach(() => {
|
||||
internalProvider = createProvider("internal");
|
||||
registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("registers InternalAgentProvider on module init", () => {
|
||||
registry.onModuleInit();
|
||||
|
||||
expect(registry.getProvider("internal")).toBe(internalProvider);
|
||||
});
|
||||
|
||||
it("registers providers and returns null for unknown provider ids", () => {
|
||||
const externalProvider = createProvider("openclaw");
|
||||
|
||||
registry.registerProvider(externalProvider);
|
||||
|
||||
expect(registry.getProvider("openclaw")).toBe(externalProvider);
|
||||
expect(registry.getProvider("missing")).toBeNull();
|
||||
});
|
||||
|
||||
it("aggregates and sorts sessions from all providers", async () => {
|
||||
const internalSessions: AgentSession[] = [
|
||||
{
|
||||
id: "session-older",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T10:10:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
const externalSessions: AgentSession[] = [
|
||||
{
|
||||
id: "session-newer",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "paused",
|
||||
createdAt: new Date("2026-03-07T09:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T10:20:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
internalProvider.listSessions.mockResolvedValue({
|
||||
sessions: internalSessions,
|
||||
total: internalSessions.length,
|
||||
} as AgentSessionList);
|
||||
|
||||
const externalProvider = createProvider("openclaw", externalSessions);
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(externalProvider);
|
||||
|
||||
const result = await registry.listAllSessions();
|
||||
|
||||
expect(result.map((session) => session.id)).toEqual(["session-newer", "session-older"]);
|
||||
expect(internalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||
expect(externalProvider.listSessions).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("skips provider failures and logs warning", async () => {
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
|
||||
const healthyProvider = createProvider("healthy", [
|
||||
{
|
||||
id: "session-1",
|
||||
providerId: "healthy",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
const failingProvider = createProvider("failing");
|
||||
failingProvider.listSessions.mockRejectedValue(new Error("provider offline"));
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(healthyProvider);
|
||||
registry.registerProvider(failingProvider);
|
||||
|
||||
const result = await registry.listAllSessions();
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.id).toBe("session-1");
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Failed to list sessions for provider failing")
|
||||
);
|
||||
});
|
||||
|
||||
it("finds a provider for an existing session", async () => {
|
||||
const targetSession: AgentSession = {
|
||||
id: "session-found",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T12:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
|
||||
};
|
||||
|
||||
const openclawProvider = createProvider("openclaw");
|
||||
openclawProvider.getSession.mockResolvedValue(targetSession);
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(openclawProvider);
|
||||
|
||||
const result = await registry.getProviderForSession(targetSession.id);
|
||||
|
||||
expect(result).toEqual({
|
||||
provider: openclawProvider,
|
||||
session: targetSession,
|
||||
});
|
||||
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
|
||||
});
|
||||
|
||||
it("returns null when no provider has the requested session", async () => {
|
||||
const openclawProvider = createProvider("openclaw");
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(openclawProvider);
|
||||
|
||||
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it("continues searching providers when getSession throws", async () => {
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
const failingProvider = createProvider("failing");
|
||||
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
|
||||
|
||||
const healthySession: AgentSession = {
|
||||
id: "session-healthy",
|
||||
providerId: "healthy",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T12:15:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
|
||||
};
|
||||
|
||||
const healthyProvider = createProvider("healthy");
|
||||
healthyProvider.getSession.mockResolvedValue(healthySession);
|
||||
|
||||
registry.onModuleInit();
|
||||
registry.registerProvider(failingProvider);
|
||||
registry.registerProvider(healthyProvider);
|
||||
|
||||
const result = await registry.getProviderForSession(healthySession.id);
|
||||
|
||||
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Failed to get session session-healthy for provider failing")
|
||||
);
|
||||
});
|
||||
});
|
||||
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
79
apps/orchestrator/src/api/agents/agent-provider.registry.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
|
||||
import type { AgentSession, IAgentProvider } from "@mosaic/shared";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
|
||||
@Injectable()
|
||||
export class AgentProviderRegistry implements OnModuleInit {
|
||||
private readonly logger = new Logger(AgentProviderRegistry.name);
|
||||
private readonly providers = new Map<string, IAgentProvider>();
|
||||
|
||||
constructor(private readonly internalProvider: InternalAgentProvider) {}
|
||||
|
||||
onModuleInit(): void {
|
||||
this.registerProvider(this.internalProvider);
|
||||
}
|
||||
|
||||
registerProvider(provider: IAgentProvider): void {
|
||||
const existingProvider = this.providers.get(provider.providerId);
|
||||
if (existingProvider !== undefined) {
|
||||
this.logger.warn(`Replacing existing provider registration for ${provider.providerId}`);
|
||||
}
|
||||
|
||||
this.providers.set(provider.providerId, provider);
|
||||
}
|
||||
|
||||
getProvider(providerId: string): IAgentProvider | null {
|
||||
return this.providers.get(providerId) ?? null;
|
||||
}
|
||||
|
||||
async getProviderForSession(
|
||||
sessionId: string
|
||||
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
|
||||
for (const provider of this.providers.values()) {
|
||||
try {
|
||||
const session = await provider.getSession(sessionId);
|
||||
if (session !== null) {
|
||||
return {
|
||||
provider,
|
||||
session,
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async listAllSessions(): Promise<AgentSession[]> {
|
||||
const providers = [...this.providers.values()];
|
||||
const sessionsByProvider = await Promise.all(
|
||||
providers.map(async (provider) => {
|
||||
try {
|
||||
const { sessions } = await provider.listSessions();
|
||||
return sessions;
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to list sessions for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
|
||||
);
|
||||
return [];
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
return sessionsByProvider
|
||||
.flat()
|
||||
.sort((left, right) => right.updatedAt.getTime() - left.updatedAt.getTime());
|
||||
}
|
||||
|
||||
private toErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
|
||||
return String(error);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@ describe("AgentTreeService", () => {
|
||||
let prisma: {
|
||||
agentSessionTree: {
|
||||
findMany: ReturnType<typeof vi.fn>;
|
||||
count: ReturnType<typeof vi.fn>;
|
||||
findUnique: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
@@ -14,6 +16,8 @@ describe("AgentTreeService", () => {
|
||||
prisma = {
|
||||
agentSessionTree: {
|
||||
findMany: vi.fn(),
|
||||
count: vi.fn(),
|
||||
findUnique: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -24,6 +28,141 @@ describe("AgentTreeService", () => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("listSessions", () => {
|
||||
it("returns paginated sessions and cursor", async () => {
|
||||
const sessions = [
|
||||
{
|
||||
id: "tree-2",
|
||||
sessionId: "agent-2",
|
||||
parentSessionId: null,
|
||||
provider: "internal",
|
||||
missionId: null,
|
||||
taskId: "task-2",
|
||||
taskSource: "queue",
|
||||
agentType: "worker",
|
||||
status: "running",
|
||||
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
completedAt: null,
|
||||
metadata: {},
|
||||
},
|
||||
{
|
||||
id: "tree-1",
|
||||
sessionId: "agent-1",
|
||||
parentSessionId: null,
|
||||
provider: "internal",
|
||||
missionId: null,
|
||||
taskId: "task-1",
|
||||
taskSource: "queue",
|
||||
agentType: "worker",
|
||||
status: "running",
|
||||
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
completedAt: null,
|
||||
metadata: {},
|
||||
},
|
||||
];
|
||||
|
||||
prisma.agentSessionTree.findMany.mockResolvedValue(sessions);
|
||||
prisma.agentSessionTree.count.mockResolvedValue(7);
|
||||
|
||||
const result = await service.listSessions(undefined, 2);
|
||||
|
||||
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
|
||||
where: undefined,
|
||||
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
|
||||
take: 2,
|
||||
});
|
||||
expect(prisma.agentSessionTree.count).toHaveBeenCalledWith();
|
||||
expect(result.sessions).toEqual(sessions);
|
||||
expect(result.total).toBe(7);
|
||||
expect(result.cursor).toBeTypeOf("string");
|
||||
});
|
||||
|
||||
it("applies cursor filter when provided", async () => {
|
||||
prisma.agentSessionTree.findMany.mockResolvedValue([]);
|
||||
prisma.agentSessionTree.count.mockResolvedValue(0);
|
||||
|
||||
const cursorDate = "2026-03-07T10:00:00.000Z";
|
||||
const cursorSessionId = "agent-5";
|
||||
const cursor = Buffer.from(
|
||||
JSON.stringify({
|
||||
spawnedAt: cursorDate,
|
||||
sessionId: cursorSessionId,
|
||||
}),
|
||||
"utf8"
|
||||
).toString("base64url");
|
||||
|
||||
await service.listSessions(cursor, 25);
|
||||
|
||||
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
OR: [
|
||||
{
|
||||
spawnedAt: {
|
||||
lt: new Date(cursorDate),
|
||||
},
|
||||
},
|
||||
{
|
||||
spawnedAt: new Date(cursorDate),
|
||||
sessionId: {
|
||||
lt: cursorSessionId,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
|
||||
take: 25,
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores invalid cursor values", async () => {
|
||||
prisma.agentSessionTree.findMany.mockResolvedValue([]);
|
||||
prisma.agentSessionTree.count.mockResolvedValue(0);
|
||||
|
||||
await service.listSessions("invalid-cursor", 10);
|
||||
|
||||
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
|
||||
where: undefined,
|
||||
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
|
||||
take: 10,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("getSession", () => {
|
||||
it("returns matching session entry", async () => {
|
||||
const session = {
|
||||
id: "tree-1",
|
||||
sessionId: "agent-123",
|
||||
parentSessionId: null,
|
||||
provider: "internal",
|
||||
missionId: null,
|
||||
taskId: "task-1",
|
||||
taskSource: "queue",
|
||||
agentType: "worker",
|
||||
status: "running",
|
||||
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
|
||||
completedAt: null,
|
||||
metadata: {},
|
||||
};
|
||||
prisma.agentSessionTree.findUnique.mockResolvedValue(session);
|
||||
|
||||
const result = await service.getSession("agent-123");
|
||||
|
||||
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
|
||||
where: { sessionId: "agent-123" },
|
||||
});
|
||||
expect(result).toEqual(session);
|
||||
});
|
||||
|
||||
it("returns null when session does not exist", async () => {
|
||||
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
|
||||
|
||||
const result = await service.getSession("agent-missing");
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("getTree", () => {
|
||||
it("returns mapped entries from Prisma", async () => {
|
||||
prisma.agentSessionTree.findMany.mockResolvedValue([
|
||||
|
||||
@@ -1,11 +1,78 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import type { AgentSessionTree, Prisma } from "@prisma/client";
|
||||
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
|
||||
const DEFAULT_PAGE_LIMIT = 50;
|
||||
const MAX_PAGE_LIMIT = 200;
|
||||
|
||||
interface SessionCursor {
|
||||
spawnedAt: Date;
|
||||
sessionId: string;
|
||||
}
|
||||
|
||||
export interface AgentSessionTreeListResult {
|
||||
sessions: AgentSessionTree[];
|
||||
total: number;
|
||||
cursor?: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class AgentTreeService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async listSessions(
|
||||
cursor?: string,
|
||||
limit = DEFAULT_PAGE_LIMIT
|
||||
): Promise<AgentSessionTreeListResult> {
|
||||
const safeLimit = this.normalizeLimit(limit);
|
||||
const parsedCursor = this.parseCursor(cursor);
|
||||
|
||||
const where: Prisma.AgentSessionTreeWhereInput | undefined = parsedCursor
|
||||
? {
|
||||
OR: [
|
||||
{
|
||||
spawnedAt: {
|
||||
lt: parsedCursor.spawnedAt,
|
||||
},
|
||||
},
|
||||
{
|
||||
spawnedAt: parsedCursor.spawnedAt,
|
||||
sessionId: {
|
||||
lt: parsedCursor.sessionId,
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
: undefined;
|
||||
|
||||
const [sessions, total] = await Promise.all([
|
||||
this.prisma.agentSessionTree.findMany({
|
||||
where,
|
||||
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
|
||||
take: safeLimit,
|
||||
}),
|
||||
this.prisma.agentSessionTree.count(),
|
||||
]);
|
||||
|
||||
const nextCursor =
|
||||
sessions.length === safeLimit
|
||||
? this.serializeCursor(sessions[sessions.length - 1])
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
sessions,
|
||||
total,
|
||||
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async getSession(sessionId: string): Promise<AgentSessionTree | null> {
|
||||
return this.prisma.agentSessionTree.findUnique({
|
||||
where: { sessionId },
|
||||
});
|
||||
}
|
||||
|
||||
async getTree(): Promise<AgentTreeResponseDto[]> {
|
||||
const entries = await this.prisma.agentSessionTree.findMany({
|
||||
orderBy: { spawnedAt: "desc" },
|
||||
@@ -27,4 +94,53 @@ export class AgentTreeService {
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private normalizeLimit(limit: number): number {
|
||||
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_PAGE_LIMIT;
|
||||
if (normalized < 1) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return Math.min(normalized, MAX_PAGE_LIMIT);
|
||||
}
|
||||
|
||||
private serializeCursor(entry: Pick<AgentSessionTree, "spawnedAt" | "sessionId">): string {
|
||||
return Buffer.from(
|
||||
JSON.stringify({
|
||||
spawnedAt: entry.spawnedAt.toISOString(),
|
||||
sessionId: entry.sessionId,
|
||||
}),
|
||||
"utf8"
|
||||
).toString("base64url");
|
||||
}
|
||||
|
||||
private parseCursor(cursor?: string): SessionCursor | null {
|
||||
if (!cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const decoded = Buffer.from(cursor, "base64url").toString("utf8");
|
||||
const parsed = JSON.parse(decoded) as {
|
||||
spawnedAt?: string;
|
||||
sessionId?: string;
|
||||
};
|
||||
|
||||
if (typeof parsed.spawnedAt !== "string" || typeof parsed.sessionId !== "string") {
|
||||
return null;
|
||||
}
|
||||
|
||||
const spawnedAt = new Date(parsed.spawnedAt);
|
||||
if (Number.isNaN(spawnedAt.getTime())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
spawnedAt,
|
||||
sessionId: parsed.sessionId,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { AgentMessagesService } from "./agent-messages.service";
|
||||
import { AgentControlService } from "./agent-control.service";
|
||||
import { AgentTreeService } from "./agent-tree.service";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
import { AgentProviderRegistry } from "./agent-provider.registry";
|
||||
|
||||
@Module({
|
||||
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
|
||||
@@ -20,6 +22,9 @@ import { AgentTreeService } from "./agent-tree.service";
|
||||
AgentMessagesService,
|
||||
AgentControlService,
|
||||
AgentTreeService,
|
||||
InternalAgentProvider,
|
||||
AgentProviderRegistry,
|
||||
],
|
||||
exports: [InternalAgentProvider, AgentProviderRegistry],
|
||||
})
|
||||
export class AgentsModule {}
|
||||
|
||||
216
apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts
Normal file
216
apps/orchestrator/src/api/agents/internal-agent.provider.spec.ts
Normal file
@@ -0,0 +1,216 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
|
||||
import { AgentControlService } from "./agent-control.service";
|
||||
import { AgentMessagesService } from "./agent-messages.service";
|
||||
import { AgentTreeService } from "./agent-tree.service";
|
||||
import { InternalAgentProvider } from "./internal-agent.provider";
|
||||
|
||||
describe("InternalAgentProvider", () => {
|
||||
let provider: InternalAgentProvider;
|
||||
let messagesService: {
|
||||
getMessages: ReturnType<typeof vi.fn>;
|
||||
getReplayMessages: ReturnType<typeof vi.fn>;
|
||||
getMessagesAfter: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let controlService: {
|
||||
injectMessage: ReturnType<typeof vi.fn>;
|
||||
pauseAgent: ReturnType<typeof vi.fn>;
|
||||
resumeAgent: ReturnType<typeof vi.fn>;
|
||||
killAgent: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let treeService: {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
messagesService = {
|
||||
getMessages: vi.fn(),
|
||||
getReplayMessages: vi.fn(),
|
||||
getMessagesAfter: vi.fn(),
|
||||
};
|
||||
|
||||
controlService = {
|
||||
injectMessage: vi.fn().mockResolvedValue(undefined),
|
||||
pauseAgent: vi.fn().mockResolvedValue(undefined),
|
||||
resumeAgent: vi.fn().mockResolvedValue(undefined),
|
||||
killAgent: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
treeService = {
|
||||
listSessions: vi.fn(),
|
||||
getSession: vi.fn(),
|
||||
};
|
||||
|
||||
provider = new InternalAgentProvider(
|
||||
messagesService as unknown as AgentMessagesService,
|
||||
controlService as unknown as AgentControlService,
|
||||
treeService as unknown as AgentTreeService
|
||||
);
|
||||
});
|
||||
|
||||
it("maps paginated sessions", async () => {
|
||||
const sessionEntry: AgentSessionTree = {
|
||||
id: "tree-1",
|
||||
sessionId: "session-1",
|
||||
parentSessionId: "parent-1",
|
||||
provider: "internal",
|
||||
missionId: null,
|
||||
taskId: "task-123",
|
||||
taskSource: "queue",
|
||||
agentType: "worker",
|
||||
status: "running",
|
||||
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
completedAt: null,
|
||||
metadata: { branch: "feat/test" },
|
||||
};
|
||||
|
||||
treeService.listSessions.mockResolvedValue({
|
||||
sessions: [sessionEntry],
|
||||
total: 1,
|
||||
cursor: "next-cursor",
|
||||
});
|
||||
|
||||
const result = await provider.listSessions("cursor-1", 25);
|
||||
|
||||
expect(treeService.listSessions).toHaveBeenCalledWith("cursor-1", 25);
|
||||
expect(result).toEqual({
|
||||
sessions: [
|
||||
{
|
||||
id: "session-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
label: "task-123",
|
||||
status: "active",
|
||||
parentSessionId: "parent-1",
|
||||
createdAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T10:00:00.000Z"),
|
||||
metadata: { branch: "feat/test" },
|
||||
},
|
||||
],
|
||||
total: 1,
|
||||
cursor: "next-cursor",
|
||||
});
|
||||
});
|
||||
|
||||
it("returns null for missing session", async () => {
|
||||
treeService.getSession.mockResolvedValue(null);
|
||||
|
||||
const result = await provider.getSession("missing-session");
|
||||
|
||||
expect(treeService.getSession).toHaveBeenCalledWith("missing-session");
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("maps message history and parses skip cursor", async () => {
|
||||
const message: AgentConversationMessage = {
|
||||
id: "msg-1",
|
||||
sessionId: "session-1",
|
||||
provider: "internal",
|
||||
role: "agent",
|
||||
content: "hello",
|
||||
timestamp: new Date("2026-03-07T10:05:00.000Z"),
|
||||
metadata: { tokens: 42 },
|
||||
};
|
||||
|
||||
messagesService.getMessages.mockResolvedValue({
|
||||
messages: [message],
|
||||
total: 10,
|
||||
});
|
||||
|
||||
const result = await provider.getMessages("session-1", 30, "2");
|
||||
|
||||
expect(messagesService.getMessages).toHaveBeenCalledWith("session-1", 30, 2);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: "msg-1",
|
||||
sessionId: "session-1",
|
||||
role: "assistant",
|
||||
content: "hello",
|
||||
timestamp: new Date("2026-03-07T10:05:00.000Z"),
|
||||
metadata: { tokens: 42 },
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("routes control operations through AgentControlService", async () => {
|
||||
const injectResult = await provider.injectMessage("session-1", "new instruction");
|
||||
|
||||
await provider.pauseSession("session-1");
|
||||
await provider.resumeSession("session-1");
|
||||
await provider.killSession("session-1", false);
|
||||
|
||||
expect(controlService.injectMessage).toHaveBeenCalledWith(
|
||||
"session-1",
|
||||
"internal-provider",
|
||||
"new instruction"
|
||||
);
|
||||
expect(injectResult).toEqual({ accepted: true });
|
||||
expect(controlService.pauseAgent).toHaveBeenCalledWith("session-1", "internal-provider");
|
||||
expect(controlService.resumeAgent).toHaveBeenCalledWith("session-1", "internal-provider");
|
||||
expect(controlService.killAgent).toHaveBeenCalledWith("session-1", "internal-provider", false);
|
||||
});
|
||||
|
||||
it("streams replay and incremental messages", async () => {
|
||||
const replayMessage: AgentConversationMessage = {
|
||||
id: "msg-replay",
|
||||
sessionId: "session-1",
|
||||
provider: "internal",
|
||||
role: "agent",
|
||||
content: "replay",
|
||||
timestamp: new Date("2026-03-07T10:00:00.000Z"),
|
||||
metadata: {},
|
||||
};
|
||||
const incrementalMessage: AgentConversationMessage = {
|
||||
id: "msg-live",
|
||||
sessionId: "session-1",
|
||||
provider: "internal",
|
||||
role: "operator",
|
||||
content: "live",
|
||||
timestamp: new Date("2026-03-07T10:00:01.000Z"),
|
||||
metadata: {},
|
||||
};
|
||||
|
||||
messagesService.getReplayMessages.mockResolvedValue([replayMessage]);
|
||||
messagesService.getMessagesAfter
|
||||
.mockResolvedValueOnce([incrementalMessage])
|
||||
.mockResolvedValueOnce([]);
|
||||
|
||||
const iterator = provider.streamMessages("session-1")[Symbol.asyncIterator]();
|
||||
|
||||
const first = await iterator.next();
|
||||
const second = await iterator.next();
|
||||
|
||||
expect(first.done).toBe(false);
|
||||
expect(first.value).toEqual({
|
||||
id: "msg-replay",
|
||||
sessionId: "session-1",
|
||||
role: "assistant",
|
||||
content: "replay",
|
||||
timestamp: new Date("2026-03-07T10:00:00.000Z"),
|
||||
metadata: {},
|
||||
});
|
||||
expect(second.done).toBe(false);
|
||||
expect(second.value).toEqual({
|
||||
id: "msg-live",
|
||||
sessionId: "session-1",
|
||||
role: "user",
|
||||
content: "live",
|
||||
timestamp: new Date("2026-03-07T10:00:01.000Z"),
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
await iterator.return?.();
|
||||
|
||||
expect(messagesService.getReplayMessages).toHaveBeenCalledWith("session-1", 50);
|
||||
expect(messagesService.getMessagesAfter).toHaveBeenCalledWith(
|
||||
"session-1",
|
||||
new Date("2026-03-07T10:00:00.000Z"),
|
||||
"msg-replay"
|
||||
);
|
||||
});
|
||||
|
||||
it("reports provider availability", async () => {
|
||||
await expect(provider.isAvailable()).resolves.toBe(true);
|
||||
});
|
||||
});
|
||||
218
apps/orchestrator/src/api/agents/internal-agent.provider.ts
Normal file
218
apps/orchestrator/src/api/agents/internal-agent.provider.ts
Normal file
@@ -0,0 +1,218 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import type {
|
||||
AgentMessage,
|
||||
AgentMessageRole,
|
||||
AgentSession,
|
||||
AgentSessionList,
|
||||
AgentSessionStatus,
|
||||
IAgentProvider,
|
||||
InjectResult,
|
||||
} from "@mosaic/shared";
|
||||
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
|
||||
import { AgentControlService } from "./agent-control.service";
|
||||
import { AgentMessagesService } from "./agent-messages.service";
|
||||
import { AgentTreeService } from "./agent-tree.service";
|
||||
|
||||
const DEFAULT_SESSION_LIMIT = 50;
|
||||
const DEFAULT_MESSAGE_LIMIT = 50;
|
||||
const MAX_MESSAGE_LIMIT = 200;
|
||||
const STREAM_POLL_INTERVAL_MS = 1000;
|
||||
const INTERNAL_OPERATOR_ID = "internal-provider";
|
||||
|
||||
@Injectable()
|
||||
export class InternalAgentProvider implements IAgentProvider {
|
||||
readonly providerId = "internal";
|
||||
readonly providerType = "internal";
|
||||
readonly displayName = "Internal Orchestrator";
|
||||
|
||||
constructor(
|
||||
private readonly messagesService: AgentMessagesService,
|
||||
private readonly controlService: AgentControlService,
|
||||
private readonly treeService: AgentTreeService
|
||||
) {}
|
||||
|
||||
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
|
||||
const {
|
||||
sessions,
|
||||
total,
|
||||
cursor: nextCursor,
|
||||
} = await this.treeService.listSessions(cursor, limit);
|
||||
|
||||
return {
|
||||
sessions: sessions.map((session) => this.toAgentSession(session)),
|
||||
total,
|
||||
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async getSession(sessionId: string): Promise<AgentSession | null> {
|
||||
const session = await this.treeService.getSession(sessionId);
|
||||
return session ? this.toAgentSession(session) : null;
|
||||
}
|
||||
|
||||
async getMessages(
|
||||
sessionId: string,
|
||||
limit = DEFAULT_MESSAGE_LIMIT,
|
||||
before?: string
|
||||
): Promise<AgentMessage[]> {
|
||||
const safeLimit = this.normalizeMessageLimit(limit);
|
||||
const skip = this.parseSkip(before);
|
||||
|
||||
const result = await this.messagesService.getMessages(sessionId, safeLimit, skip);
|
||||
return result.messages.map((message) => this.toAgentMessage(message));
|
||||
}
|
||||
|
||||
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
|
||||
await this.controlService.injectMessage(sessionId, INTERNAL_OPERATOR_ID, content);
|
||||
|
||||
return {
|
||||
accepted: true,
|
||||
};
|
||||
}
|
||||
|
||||
async pauseSession(sessionId: string): Promise<void> {
|
||||
await this.controlService.pauseAgent(sessionId, INTERNAL_OPERATOR_ID);
|
||||
}
|
||||
|
||||
async resumeSession(sessionId: string): Promise<void> {
|
||||
await this.controlService.resumeAgent(sessionId, INTERNAL_OPERATOR_ID);
|
||||
}
|
||||
|
||||
async killSession(sessionId: string, force = true): Promise<void> {
|
||||
await this.controlService.killAgent(sessionId, INTERNAL_OPERATOR_ID, force);
|
||||
}
|
||||
|
||||
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
|
||||
const replayMessages = await this.messagesService.getReplayMessages(
|
||||
sessionId,
|
||||
DEFAULT_MESSAGE_LIMIT
|
||||
);
|
||||
|
||||
let lastSeenTimestamp = new Date();
|
||||
let lastSeenMessageId: string | null = null;
|
||||
|
||||
for (const message of replayMessages) {
|
||||
yield this.toAgentMessage(message);
|
||||
lastSeenTimestamp = message.timestamp;
|
||||
lastSeenMessageId = message.id;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
const newMessages = await this.messagesService.getMessagesAfter(
|
||||
sessionId,
|
||||
lastSeenTimestamp,
|
||||
lastSeenMessageId
|
||||
);
|
||||
|
||||
for (const message of newMessages) {
|
||||
yield this.toAgentMessage(message);
|
||||
lastSeenTimestamp = message.timestamp;
|
||||
lastSeenMessageId = message.id;
|
||||
}
|
||||
|
||||
await this.delay(STREAM_POLL_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
isAvailable(): Promise<boolean> {
|
||||
return Promise.resolve(true);
|
||||
}
|
||||
|
||||
private toAgentSession(session: AgentSessionTree): AgentSession {
|
||||
const metadata = this.toMetadata(session.metadata);
|
||||
|
||||
return {
|
||||
id: session.sessionId,
|
||||
providerId: this.providerId,
|
||||
providerType: this.providerType,
|
||||
...(session.taskId !== null ? { label: session.taskId } : {}),
|
||||
status: this.toSessionStatus(session.status),
|
||||
...(session.parentSessionId !== null ? { parentSessionId: session.parentSessionId } : {}),
|
||||
createdAt: session.spawnedAt,
|
||||
updatedAt: session.completedAt ?? session.spawnedAt,
|
||||
...(metadata !== undefined ? { metadata } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
private toAgentMessage(message: AgentConversationMessage): AgentMessage {
|
||||
const metadata = this.toMetadata(message.metadata);
|
||||
|
||||
return {
|
||||
id: message.id,
|
||||
sessionId: message.sessionId,
|
||||
role: this.toMessageRole(message.role),
|
||||
content: message.content,
|
||||
timestamp: message.timestamp,
|
||||
...(metadata !== undefined ? { metadata } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
private toSessionStatus(status: string): AgentSessionStatus {
|
||||
switch (status) {
|
||||
case "running":
|
||||
return "active";
|
||||
case "paused":
|
||||
return "paused";
|
||||
case "completed":
|
||||
return "completed";
|
||||
case "failed":
|
||||
case "killed":
|
||||
return "failed";
|
||||
case "spawning":
|
||||
default:
|
||||
return "idle";
|
||||
}
|
||||
}
|
||||
|
||||
private toMessageRole(role: string): AgentMessageRole {
|
||||
switch (role) {
|
||||
case "agent":
|
||||
case "assistant":
|
||||
return "assistant";
|
||||
case "system":
|
||||
return "system";
|
||||
case "tool":
|
||||
return "tool";
|
||||
case "operator":
|
||||
case "user":
|
||||
default:
|
||||
return "user";
|
||||
}
|
||||
}
|
||||
|
||||
private normalizeMessageLimit(limit: number): number {
|
||||
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_MESSAGE_LIMIT;
|
||||
if (normalized < 1) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return Math.min(normalized, MAX_MESSAGE_LIMIT);
|
||||
}
|
||||
|
||||
private parseSkip(before?: string): number {
|
||||
if (!before) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const parsed = Number.parseInt(before, 10);
|
||||
if (Number.isNaN(parsed) || parsed < 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return parsed;
|
||||
}
|
||||
|
||||
private toMetadata(value: unknown): Record<string, unknown> | undefined {
|
||||
if (value !== null && typeof value === "object" && !Array.isArray(value)) {
|
||||
return value as Record<string, unknown>;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private async delay(ms: number): Promise<void> {
|
||||
await new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
import { Type } from "class-transformer";
|
||||
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
|
||||
|
||||
export class GetMissionControlMessagesQueryDto {
|
||||
@IsOptional()
|
||||
@Type(() => Number)
|
||||
@IsInt()
|
||||
@Min(1)
|
||||
@Max(200)
|
||||
limit?: number;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
before?: string;
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
import { IsBoolean, IsOptional } from "class-validator";
|
||||
|
||||
export class KillSessionDto {
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
force?: boolean;
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AgentSession } from "@mosaic/shared";
|
||||
import type { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
import { MissionControlController } from "./mission-control.controller";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
describe("MissionControlController", () => {
|
||||
let controller: MissionControlController;
|
||||
let registry: {
|
||||
listAllSessions: ReturnType<typeof vi.fn>;
|
||||
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
registry = {
|
||||
listAllSessions: vi.fn(),
|
||||
getProviderForSession: vi.fn(),
|
||||
};
|
||||
|
||||
const prisma = {
|
||||
operatorAuditLog: {
|
||||
create: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
|
||||
const service = new MissionControlService(
|
||||
registry as unknown as AgentProviderRegistry,
|
||||
prisma as unknown as PrismaService
|
||||
);
|
||||
|
||||
controller = new MissionControlController(service);
|
||||
});
|
||||
|
||||
it("Phase 1 gate: unified sessions endpoint returns internal provider sessions", async () => {
|
||||
const internalSession: AgentSession = {
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T20:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T20:01:00.000Z"),
|
||||
};
|
||||
|
||||
const externalSession: AgentSession = {
|
||||
id: "session-openclaw-1",
|
||||
providerId: "openclaw",
|
||||
providerType: "external",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T20:02:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T20:03:00.000Z"),
|
||||
};
|
||||
|
||||
registry.listAllSessions.mockResolvedValue([internalSession, externalSession]);
|
||||
|
||||
const response = await controller.listSessions();
|
||||
|
||||
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||
expect(response.sessions).toEqual([internalSession, externalSession]);
|
||||
expect(response.sessions).toContainEqual(
|
||||
expect.objectContaining({
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,183 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Get,
|
||||
Header,
|
||||
HttpCode,
|
||||
MessageEvent,
|
||||
Param,
|
||||
Post,
|
||||
Query,
|
||||
Request,
|
||||
Sse,
|
||||
UseGuards,
|
||||
UsePipes,
|
||||
ValidationPipe,
|
||||
} from "@nestjs/common";
|
||||
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
|
||||
import { Observable } from "rxjs";
|
||||
import { AuthGuard } from "../../auth/guards/auth.guard";
|
||||
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
|
||||
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
|
||||
import { KillSessionDto } from "./dto/kill-session.dto";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||
|
||||
interface MissionControlRequest {
|
||||
user?: {
|
||||
id?: string;
|
||||
};
|
||||
}
|
||||
|
||||
@Controller("api/mission-control")
|
||||
@UseGuards(AuthGuard)
|
||||
export class MissionControlController {
|
||||
constructor(private readonly missionControlService: MissionControlService) {}
|
||||
|
||||
@Get("sessions")
|
||||
async listSessions(): Promise<{ sessions: AgentSession[] }> {
|
||||
const sessions = await this.missionControlService.listSessions();
|
||||
return { sessions };
|
||||
}
|
||||
|
||||
@Get("sessions/:sessionId")
|
||||
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
|
||||
return this.missionControlService.getSession(sessionId);
|
||||
}
|
||||
|
||||
@Get("sessions/:sessionId/messages")
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async getMessages(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Query() query: GetMissionControlMessagesQueryDto
|
||||
): Promise<{ messages: AgentMessage[] }> {
|
||||
const messages = await this.missionControlService.getMessages(
|
||||
sessionId,
|
||||
query.limit,
|
||||
query.before
|
||||
);
|
||||
|
||||
return { messages };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/inject")
|
||||
@HttpCode(200)
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
injectMessage(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Body() dto: InjectAgentDto,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<InjectResult> {
|
||||
return this.missionControlService.injectMessage(
|
||||
sessionId,
|
||||
dto.message,
|
||||
this.resolveOperatorId(req)
|
||||
);
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/pause")
|
||||
@HttpCode(200)
|
||||
async pauseSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
|
||||
|
||||
return { message: `Session ${sessionId} paused` };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/resume")
|
||||
@HttpCode(200)
|
||||
async resumeSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
|
||||
|
||||
return { message: `Session ${sessionId} resumed` };
|
||||
}
|
||||
|
||||
@Post("sessions/:sessionId/kill")
|
||||
@HttpCode(200)
|
||||
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
|
||||
async killSession(
|
||||
@Param("sessionId") sessionId: string,
|
||||
@Body() dto: KillSessionDto,
|
||||
@Request() req: MissionControlRequest
|
||||
): Promise<{ message: string }> {
|
||||
await this.missionControlService.killSession(
|
||||
sessionId,
|
||||
dto.force ?? true,
|
||||
this.resolveOperatorId(req)
|
||||
);
|
||||
|
||||
return { message: `Session ${sessionId} killed` };
|
||||
}
|
||||
|
||||
@Sse("sessions/:sessionId/stream")
|
||||
@Header("Content-Type", "text/event-stream")
|
||||
@Header("Cache-Control", "no-cache")
|
||||
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
|
||||
return new Observable<MessageEvent>((subscriber) => {
|
||||
let isClosed = false;
|
||||
let iterator: AsyncIterator<AgentMessage> | null = null;
|
||||
|
||||
void this.missionControlService
|
||||
.streamMessages(sessionId)
|
||||
.then(async (stream) => {
|
||||
iterator = stream[Symbol.asyncIterator]();
|
||||
|
||||
for (;;) {
|
||||
if (isClosed) {
|
||||
break;
|
||||
}
|
||||
|
||||
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
|
||||
if (next.done) {
|
||||
break;
|
||||
}
|
||||
|
||||
subscriber.next({
|
||||
data: this.toStreamPayload(next.value),
|
||||
});
|
||||
}
|
||||
|
||||
subscriber.complete();
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
subscriber.error(error);
|
||||
});
|
||||
|
||||
return () => {
|
||||
isClosed = true;
|
||||
void iterator?.return?.();
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private resolveOperatorId(req: MissionControlRequest): string {
|
||||
const operatorId = req.user?.id;
|
||||
return typeof operatorId === "string" && operatorId.length > 0
|
||||
? operatorId
|
||||
: DEFAULT_OPERATOR_ID;
|
||||
}
|
||||
|
||||
private toStreamPayload(message: AgentMessage): {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
role: string;
|
||||
content: string;
|
||||
timestamp: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
} {
|
||||
return {
|
||||
id: message.id,
|
||||
sessionId: message.sessionId,
|
||||
role: message.role,
|
||||
content: message.content,
|
||||
timestamp: message.timestamp.toISOString(),
|
||||
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { AgentsModule } from "../agents/agents.module";
|
||||
import { AuthModule } from "../../auth/auth.module";
|
||||
import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { MissionControlController } from "./mission-control.controller";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
@Module({
|
||||
imports: [AgentsModule, AuthModule, PrismaModule],
|
||||
controllers: [MissionControlController],
|
||||
providers: [MissionControlService],
|
||||
})
|
||||
export class MissionControlModule {}
|
||||
@@ -0,0 +1,213 @@
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||
import type { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
import { MissionControlService } from "./mission-control.service";
|
||||
|
||||
type MockProvider = IAgentProvider & {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
getMessages: ReturnType<typeof vi.fn>;
|
||||
injectMessage: ReturnType<typeof vi.fn>;
|
||||
pauseSession: ReturnType<typeof vi.fn>;
|
||||
resumeSession: ReturnType<typeof vi.fn>;
|
||||
killSession: ReturnType<typeof vi.fn>;
|
||||
streamMessages: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||
return;
|
||||
};
|
||||
|
||||
const createProvider = (providerId = "internal"): MockProvider => ({
|
||||
providerId,
|
||||
providerType: providerId,
|
||||
displayName: providerId,
|
||||
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
|
||||
getSession: vi.fn().mockResolvedValue(null),
|
||||
getMessages: vi.fn().mockResolvedValue([]),
|
||||
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||
killSession: vi.fn().mockResolvedValue(undefined),
|
||||
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||
isAvailable: vi.fn().mockResolvedValue(true),
|
||||
});
|
||||
|
||||
describe("MissionControlService", () => {
|
||||
let service: MissionControlService;
|
||||
let registry: {
|
||||
listAllSessions: ReturnType<typeof vi.fn>;
|
||||
getProviderForSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let prisma: {
|
||||
operatorAuditLog: {
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
const session: AgentSession = {
|
||||
id: "session-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T14:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
registry = {
|
||||
listAllSessions: vi.fn().mockResolvedValue([session]),
|
||||
getProviderForSession: vi.fn().mockResolvedValue(null),
|
||||
};
|
||||
|
||||
prisma = {
|
||||
operatorAuditLog: {
|
||||
create: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
|
||||
service = new MissionControlService(
|
||||
registry as unknown as AgentProviderRegistry,
|
||||
prisma as unknown as PrismaService
|
||||
);
|
||||
});
|
||||
|
||||
it("lists sessions from the registry", async () => {
|
||||
await expect(service.listSessions()).resolves.toEqual([session]);
|
||||
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("returns a session when it is found", async () => {
|
||||
const provider = createProvider("internal");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.getSession(session.id)).resolves.toEqual(session);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when session lookup fails", async () => {
|
||||
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||
});
|
||||
|
||||
it("gets messages from the resolved provider", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
const messages: AgentMessage[] = [
|
||||
{
|
||||
id: "message-1",
|
||||
sessionId: session.id,
|
||||
role: "assistant",
|
||||
content: "hello",
|
||||
timestamp: new Date("2026-03-07T14:01:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
provider.getMessages.mockResolvedValue(messages);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
|
||||
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
|
||||
});
|
||||
|
||||
it("injects a message and writes an audit log", async () => {
|
||||
const provider = createProvider("internal");
|
||||
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
|
||||
provider.injectMessage.mockResolvedValue(injectResult);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
|
||||
injectResult
|
||||
);
|
||||
|
||||
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "operator-1",
|
||||
provider: "internal",
|
||||
action: "inject",
|
||||
content: "ship it",
|
||||
metadata: {
|
||||
payload: { message: "ship it" },
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("pauses and resumes using default operator id", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await service.pauseSession(session.id);
|
||||
await service.resumeSession(session.id);
|
||||
|
||||
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
|
||||
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "mission-control",
|
||||
provider: "openclaw",
|
||||
action: "pause",
|
||||
metadata: {
|
||||
payload: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "mission-control",
|
||||
provider: "openclaw",
|
||||
action: "resume",
|
||||
metadata: {
|
||||
payload: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("kills with provided force value and writes audit log", async () => {
|
||||
const provider = createProvider("openclaw");
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await service.killSession(session.id, false, "operator-2");
|
||||
|
||||
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
sessionId: session.id,
|
||||
userId: "operator-2",
|
||||
provider: "openclaw",
|
||||
action: "kill",
|
||||
metadata: {
|
||||
payload: { force: false },
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves provider message stream", async () => {
|
||||
const provider = createProvider("internal");
|
||||
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
|
||||
yield {
|
||||
id: "message-1",
|
||||
sessionId: session.id,
|
||||
role: "assistant",
|
||||
content: "stream",
|
||||
timestamp: new Date("2026-03-07T14:03:00.000Z"),
|
||||
};
|
||||
})();
|
||||
|
||||
provider.streamMessages.mockReturnValue(messageStream);
|
||||
registry.getProviderForSession.mockResolvedValue({ provider, session });
|
||||
|
||||
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
|
||||
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
|
||||
});
|
||||
|
||||
it("does not write audit log when session cannot be resolved", async () => {
|
||||
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
|
||||
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,139 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
|
||||
import type { Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
|
||||
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
|
||||
|
||||
const DEFAULT_OPERATOR_ID = "mission-control";
|
||||
|
||||
@Injectable()
|
||||
export class MissionControlService {
|
||||
constructor(
|
||||
private readonly registry: AgentProviderRegistry,
|
||||
private readonly prisma: PrismaService
|
||||
) {}
|
||||
|
||||
listSessions(): Promise<AgentSession[]> {
|
||||
return this.registry.listAllSessions();
|
||||
}
|
||||
|
||||
async getSession(sessionId: string): Promise<AgentSession> {
|
||||
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||
if (!resolved) {
|
||||
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||
}
|
||||
|
||||
return resolved.session;
|
||||
}
|
||||
|
||||
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
return provider.getMessages(sessionId, limit, before);
|
||||
}
|
||||
|
||||
async injectMessage(
|
||||
sessionId: string,
|
||||
message: string,
|
||||
operatorId = DEFAULT_OPERATOR_ID
|
||||
): Promise<InjectResult> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
const result = await provider.injectMessage(sessionId, message);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "inject",
|
||||
content: message,
|
||||
payload: { message },
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.pauseSession(sessionId);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "pause",
|
||||
payload: {},
|
||||
});
|
||||
}
|
||||
|
||||
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.resumeSession(sessionId);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "resume",
|
||||
payload: {},
|
||||
});
|
||||
}
|
||||
|
||||
async killSession(
|
||||
sessionId: string,
|
||||
force = true,
|
||||
operatorId = DEFAULT_OPERATOR_ID
|
||||
): Promise<void> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
await provider.killSession(sessionId, force);
|
||||
|
||||
await this.writeOperatorAuditLog({
|
||||
sessionId,
|
||||
providerId: provider.providerId,
|
||||
operatorId,
|
||||
action: "kill",
|
||||
payload: { force },
|
||||
});
|
||||
}
|
||||
|
||||
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
|
||||
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
|
||||
return provider.streamMessages(sessionId);
|
||||
}
|
||||
|
||||
private async getProviderForSessionOrThrow(
|
||||
sessionId: string
|
||||
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
|
||||
const resolved = await this.registry.getProviderForSession(sessionId);
|
||||
|
||||
if (!resolved) {
|
||||
throw new NotFoundException(`Session ${sessionId} not found`);
|
||||
}
|
||||
|
||||
return resolved;
|
||||
}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
}
|
||||
|
||||
private async writeOperatorAuditLog(params: {
|
||||
sessionId: string;
|
||||
providerId: string;
|
||||
operatorId: string;
|
||||
action: MissionControlAction;
|
||||
content?: string;
|
||||
payload: Record<string, unknown>;
|
||||
}): Promise<void> {
|
||||
await this.prisma.operatorAuditLog.create({
|
||||
data: {
|
||||
sessionId: params.sessionId,
|
||||
userId: params.operatorId,
|
||||
provider: params.providerId,
|
||||
action: params.action,
|
||||
...(params.content !== undefined ? { content: params.content } : {}),
|
||||
metadata: this.toJsonValue({ payload: params.payload }),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,9 @@ import { BullModule } from "@nestjs/bullmq";
|
||||
import { ThrottlerModule } from "@nestjs/throttler";
|
||||
import { HealthModule } from "./api/health/health.module";
|
||||
import { AgentsModule } from "./api/agents/agents.module";
|
||||
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
||||
import { QueueApiModule } from "./api/queue/queue-api.module";
|
||||
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||
import { BudgetModule } from "./budget/budget.module";
|
||||
import { CIModule } from "./ci";
|
||||
@@ -51,6 +53,8 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
||||
]),
|
||||
HealthModule,
|
||||
AgentsModule,
|
||||
AgentProvidersModule,
|
||||
MissionControlModule,
|
||||
QueueApiModule,
|
||||
CoordinatorModule,
|
||||
BudgetModule,
|
||||
|
||||
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
9
apps/orchestrator/src/auth/auth.module.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import { AuthGuard } from "./guards/auth.guard";
|
||||
|
||||
@Module({
|
||||
providers: [OrchestratorApiKeyGuard, AuthGuard],
|
||||
exports: [AuthGuard],
|
||||
})
|
||||
export class AuthModule {}
|
||||
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
11
apps/orchestrator/src/auth/guards/auth.guard.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
|
||||
@Injectable()
|
||||
export class AuthGuard implements CanActivate {
|
||||
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
|
||||
|
||||
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
|
||||
return this.apiKeyGuard.canActivate(context);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
import { MissionControlLayout } from "@/components/mission-control/MissionControlLayout";
|
||||
|
||||
export default function MissionControlPage(): React.JSX.Element {
|
||||
return <MissionControlLayout />;
|
||||
}
|
||||
@@ -156,6 +156,26 @@ function IconTerminal(): React.JSX.Element {
|
||||
);
|
||||
}
|
||||
|
||||
function IconMissionControl(): React.JSX.Element {
|
||||
return (
|
||||
<svg
|
||||
width="16"
|
||||
height="16"
|
||||
viewBox="0 0 16 16"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
strokeWidth="1.5"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<circle cx="8" cy="8" r="1.5" />
|
||||
<path d="M11 5a4.25 4.25 0 0 1 0 6" />
|
||||
<path d="M5 5a4.25 4.25 0 0 0 0 6" />
|
||||
<path d="M13.5 2.5a7.75 7.75 0 0 1 0 11" />
|
||||
<path d="M2.5 2.5a7.75 7.75 0 0 0 0 11" />
|
||||
</svg>
|
||||
);
|
||||
}
|
||||
|
||||
function IconSettings(): React.JSX.Element {
|
||||
return (
|
||||
<svg
|
||||
@@ -260,6 +280,11 @@ const NAV_GROUPS: NavGroup[] = [
|
||||
label: "Terminal",
|
||||
icon: <IconTerminal />,
|
||||
},
|
||||
{
|
||||
href: "/mission-control",
|
||||
label: "Mission Control",
|
||||
icon: <IconMissionControl />,
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
147
apps/web/src/components/mission-control/BargeInInput.tsx
Normal file
147
apps/web/src/components/mission-control/BargeInInput.tsx
Normal file
@@ -0,0 +1,147 @@
|
||||
"use client";
|
||||
|
||||
import { useCallback, useState, type KeyboardEvent } from "react";
|
||||
import { Loader2 } from "lucide-react";
|
||||
import { useToast } from "@mosaic/ui";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { apiPost } from "@/lib/api/client";
|
||||
|
||||
const MAX_ROWS = 4;
|
||||
const TEXTAREA_MAX_HEIGHT_REM = 6.5;
|
||||
|
||||
interface BargeInMutationResponse {
|
||||
message?: string;
|
||||
}
|
||||
|
||||
export interface BargeInInputProps {
|
||||
sessionId: string;
|
||||
onSent?: () => void;
|
||||
}
|
||||
|
||||
function getErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error && error.message.trim().length > 0) {
|
||||
return error.message;
|
||||
}
|
||||
return "Failed to send message to the session.";
|
||||
}
|
||||
|
||||
export function BargeInInput({ sessionId, onSent }: BargeInInputProps): React.JSX.Element {
|
||||
const { showToast } = useToast();
|
||||
const [content, setContent] = useState("");
|
||||
const [pauseBeforeSend, setPauseBeforeSend] = useState(false);
|
||||
const [isSending, setIsSending] = useState(false);
|
||||
const [errorMessage, setErrorMessage] = useState<string | null>(null);
|
||||
|
||||
const handleSend = useCallback(async (): Promise<void> => {
|
||||
const trimmedContent = content.trim();
|
||||
if (!trimmedContent || isSending) {
|
||||
return;
|
||||
}
|
||||
|
||||
const encodedSessionId = encodeURIComponent(sessionId);
|
||||
const baseEndpoint = `/api/mission-control/sessions/${encodedSessionId}`;
|
||||
let didPause = false;
|
||||
let didInject = false;
|
||||
|
||||
setIsSending(true);
|
||||
setErrorMessage(null);
|
||||
|
||||
try {
|
||||
if (pauseBeforeSend) {
|
||||
await apiPost<BargeInMutationResponse>(`${baseEndpoint}/pause`);
|
||||
didPause = true;
|
||||
}
|
||||
|
||||
await apiPost<BargeInMutationResponse>(`${baseEndpoint}/inject`, { content: trimmedContent });
|
||||
didInject = true;
|
||||
setContent("");
|
||||
onSent?.();
|
||||
} catch (error) {
|
||||
const message = getErrorMessage(error);
|
||||
setErrorMessage(message);
|
||||
showToast(message, "error");
|
||||
} finally {
|
||||
if (didPause) {
|
||||
try {
|
||||
await apiPost<BargeInMutationResponse>(`${baseEndpoint}/resume`);
|
||||
} catch (resumeError) {
|
||||
const resumeMessage = getErrorMessage(resumeError);
|
||||
const message = didInject
|
||||
? `Message sent, but failed to resume session: ${resumeMessage}`
|
||||
: `Failed to resume session: ${resumeMessage}`;
|
||||
setErrorMessage(message);
|
||||
showToast(message, "error");
|
||||
}
|
||||
}
|
||||
|
||||
setIsSending(false);
|
||||
}
|
||||
}, [content, isSending, onSent, pauseBeforeSend, sessionId, showToast]);
|
||||
|
||||
const handleKeyDown = useCallback(
|
||||
(event: KeyboardEvent<HTMLTextAreaElement>): void => {
|
||||
if (event.key === "Enter" && !event.shiftKey) {
|
||||
event.preventDefault();
|
||||
void handleSend();
|
||||
}
|
||||
},
|
||||
[handleSend]
|
||||
);
|
||||
|
||||
const isSendDisabled = isSending || content.trim().length === 0;
|
||||
|
||||
return (
|
||||
<div className="space-y-2">
|
||||
<textarea
|
||||
value={content}
|
||||
onChange={(event) => {
|
||||
setContent(event.target.value);
|
||||
}}
|
||||
onKeyDown={handleKeyDown}
|
||||
disabled={isSending}
|
||||
rows={MAX_ROWS}
|
||||
placeholder="Inject a message into this session..."
|
||||
className="block w-full resize-y rounded-md border border-border bg-background px-3 py-2 text-sm leading-5 text-foreground outline-none placeholder:text-muted-foreground disabled:cursor-not-allowed disabled:opacity-60"
|
||||
style={{ maxHeight: `${String(TEXTAREA_MAX_HEIGHT_REM)}rem` }}
|
||||
aria-label="Inject message"
|
||||
/>
|
||||
<div className="flex items-center justify-between gap-3">
|
||||
<label className="flex items-center gap-2 text-sm text-muted-foreground">
|
||||
<input
|
||||
type="checkbox"
|
||||
checked={pauseBeforeSend}
|
||||
onChange={(event) => {
|
||||
setPauseBeforeSend(event.target.checked);
|
||||
}}
|
||||
disabled={isSending}
|
||||
className="h-4 w-4 rounded border-border"
|
||||
/>
|
||||
<span>Pause before send</span>
|
||||
</label>
|
||||
<Button
|
||||
type="button"
|
||||
variant="primary"
|
||||
size="sm"
|
||||
disabled={isSendDisabled}
|
||||
onClick={() => {
|
||||
void handleSend();
|
||||
}}
|
||||
>
|
||||
{isSending ? (
|
||||
<span className="flex items-center gap-2">
|
||||
<Loader2 className="h-4 w-4 animate-spin" aria-hidden="true" />
|
||||
Sending...
|
||||
</span>
|
||||
) : (
|
||||
"Send"
|
||||
)}
|
||||
</Button>
|
||||
</div>
|
||||
{errorMessage ? (
|
||||
<p role="alert" className="text-sm text-red-500">
|
||||
{errorMessage}
|
||||
</p>
|
||||
) : null}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
260
apps/web/src/components/mission-control/GlobalAgentRoster.tsx
Normal file
260
apps/web/src/components/mission-control/GlobalAgentRoster.tsx
Normal file
@@ -0,0 +1,260 @@
|
||||
"use client";
|
||||
|
||||
import { useMemo, useState } from "react";
|
||||
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
|
||||
import type { AgentSession } from "@mosaic/shared";
|
||||
import { ChevronRight, Loader2, X } from "lucide-react";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import type { BadgeVariant } from "@/components/ui/badge";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
import { Collapsible } from "@/components/ui/collapsible";
|
||||
import { ScrollArea } from "@/components/ui/scroll-area";
|
||||
import { Skeleton } from "@/components/ui/skeleton";
|
||||
import { apiGet, apiPost } from "@/lib/api/client";
|
||||
|
||||
const SESSIONS_QUERY_KEY = ["mission-control", "sessions"] as const;
|
||||
const SESSIONS_POLL_INTERVAL_MS = 5_000;
|
||||
|
||||
type MissionControlSessionStatus = AgentSession["status"] | "killed";
|
||||
|
||||
interface MissionControlSession extends Omit<AgentSession, "status" | "createdAt" | "updatedAt"> {
|
||||
status: MissionControlSessionStatus;
|
||||
createdAt: string | Date;
|
||||
updatedAt: string | Date;
|
||||
}
|
||||
|
||||
interface SessionsPayload {
|
||||
sessions: MissionControlSession[];
|
||||
}
|
||||
|
||||
interface ProviderSessionGroup {
|
||||
providerId: string;
|
||||
providerType: string;
|
||||
sessions: MissionControlSession[];
|
||||
}
|
||||
|
||||
export interface GlobalAgentRosterProps {
|
||||
onSelectSession?: (sessionId: string) => void;
|
||||
selectedSessionId?: string;
|
||||
}
|
||||
|
||||
function getStatusVariant(status: MissionControlSessionStatus): BadgeVariant {
|
||||
switch (status) {
|
||||
case "active":
|
||||
return "status-success";
|
||||
case "paused":
|
||||
return "status-warning";
|
||||
case "killed":
|
||||
return "status-error";
|
||||
default:
|
||||
return "status-neutral";
|
||||
}
|
||||
}
|
||||
|
||||
function truncateSessionId(sessionId: string): string {
|
||||
return sessionId.slice(0, 8);
|
||||
}
|
||||
|
||||
function resolveProviderName(providerId: string, providerType: string): string {
|
||||
return providerId === providerType ? providerId : `${providerId} (${providerType})`;
|
||||
}
|
||||
|
||||
function groupByProvider(sessions: MissionControlSession[]): ProviderSessionGroup[] {
|
||||
const grouped = new Map<string, ProviderSessionGroup>();
|
||||
|
||||
for (const session of sessions) {
|
||||
const existing = grouped.get(session.providerId);
|
||||
if (existing) {
|
||||
existing.sessions.push(session);
|
||||
continue;
|
||||
}
|
||||
|
||||
grouped.set(session.providerId, {
|
||||
providerId: session.providerId,
|
||||
providerType: session.providerType,
|
||||
sessions: [session],
|
||||
});
|
||||
}
|
||||
|
||||
return Array.from(grouped.values()).sort((a, b) => a.providerId.localeCompare(b.providerId));
|
||||
}
|
||||
|
||||
async function fetchSessions(): Promise<MissionControlSession[]> {
|
||||
const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
|
||||
"/api/mission-control/sessions"
|
||||
);
|
||||
return Array.isArray(payload) ? payload : payload.sessions;
|
||||
}
|
||||
|
||||
export function GlobalAgentRoster({
|
||||
onSelectSession,
|
||||
selectedSessionId,
|
||||
}: GlobalAgentRosterProps): React.JSX.Element {
|
||||
const queryClient = useQueryClient();
|
||||
const [openProviders, setOpenProviders] = useState<Record<string, boolean>>({});
|
||||
|
||||
const sessionsQuery = useQuery<MissionControlSession[]>({
|
||||
queryKey: SESSIONS_QUERY_KEY,
|
||||
queryFn: fetchSessions,
|
||||
refetchInterval: SESSIONS_POLL_INTERVAL_MS,
|
||||
});
|
||||
|
||||
const killMutation = useMutation({
|
||||
mutationFn: async (sessionId: string): Promise<string> => {
|
||||
await apiPost<{ message: string }>(`/api/mission-control/sessions/${sessionId}/kill`, {
|
||||
force: false,
|
||||
});
|
||||
return sessionId;
|
||||
},
|
||||
onSuccess: (): void => {
|
||||
void queryClient.invalidateQueries({ queryKey: SESSIONS_QUERY_KEY });
|
||||
},
|
||||
});
|
||||
|
||||
const groupedSessions = useMemo(
|
||||
() => groupByProvider(sessionsQuery.data ?? []),
|
||||
[sessionsQuery.data]
|
||||
);
|
||||
|
||||
const pendingKillSessionId = killMutation.isPending ? killMutation.variables : undefined;
|
||||
|
||||
const toggleProvider = (providerId: string): void => {
|
||||
setOpenProviders((prev) => ({
|
||||
...prev,
|
||||
[providerId]: !(prev[providerId] ?? true),
|
||||
}));
|
||||
};
|
||||
|
||||
const isProviderOpen = (providerId: string): boolean => openProviders[providerId] ?? true;
|
||||
|
||||
return (
|
||||
<Card className="flex h-full min-h-0 flex-col">
|
||||
<CardHeader className="pb-2">
|
||||
<CardTitle className="flex items-center justify-between text-base">
|
||||
<span>Agent Roster</span>
|
||||
{sessionsQuery.isFetching && !sessionsQuery.isLoading ? (
|
||||
<Loader2 className="h-4 w-4 animate-spin text-muted-foreground" aria-hidden="true" />
|
||||
) : null}
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="min-h-0 flex-1 px-3 pb-3">
|
||||
{sessionsQuery.isLoading ? (
|
||||
<ScrollArea className="h-full">
|
||||
<div className="space-y-2 pr-1">
|
||||
{Array.from({ length: 6 }).map((_, index) => (
|
||||
<Skeleton key={`roster-skeleton-${String(index)}`} className="h-10 w-full" />
|
||||
))}
|
||||
</div>
|
||||
</ScrollArea>
|
||||
) : sessionsQuery.error ? (
|
||||
<div className="flex h-full items-center justify-center text-center text-sm text-red-500">
|
||||
Failed to load agents: {sessionsQuery.error.message}
|
||||
</div>
|
||||
) : groupedSessions.length === 0 ? (
|
||||
<div className="flex h-full items-center justify-center text-sm text-muted-foreground">
|
||||
No active agents
|
||||
</div>
|
||||
) : (
|
||||
<ScrollArea className="h-full">
|
||||
<div className="space-y-3 pr-1">
|
||||
{groupedSessions.map((group) => {
|
||||
const providerOpen = isProviderOpen(group.providerId);
|
||||
|
||||
return (
|
||||
<Collapsible key={group.providerId} open={providerOpen} className="space-y-1">
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => {
|
||||
toggleProvider(group.providerId);
|
||||
}}
|
||||
className="flex w-full items-center gap-2 rounded-md px-1 py-1 text-left text-sm hover:bg-muted/40"
|
||||
aria-expanded={providerOpen}
|
||||
>
|
||||
<ChevronRight
|
||||
className={`h-4 w-4 text-muted-foreground transition-transform ${providerOpen ? "rotate-90" : ""}`}
|
||||
aria-hidden="true"
|
||||
/>
|
||||
<span className="font-medium">
|
||||
{resolveProviderName(group.providerId, group.providerType)}
|
||||
</span>
|
||||
<span className="ml-auto text-xs text-muted-foreground">
|
||||
{group.sessions.length}
|
||||
</span>
|
||||
</button>
|
||||
{providerOpen ? (
|
||||
<div className="space-y-1 pl-2">
|
||||
{group.sessions.map((session) => {
|
||||
const isSelected = selectedSessionId === session.id;
|
||||
const isKilling = pendingKillSessionId === session.id;
|
||||
|
||||
return (
|
||||
<div
|
||||
key={session.id}
|
||||
role="button"
|
||||
tabIndex={0}
|
||||
onClick={() => {
|
||||
onSelectSession?.(session.id);
|
||||
}}
|
||||
onKeyDown={(event) => {
|
||||
if (event.key === "Enter" || event.key === " ") {
|
||||
event.preventDefault();
|
||||
onSelectSession?.(session.id);
|
||||
}
|
||||
}}
|
||||
className="flex items-center justify-between gap-2 rounded-md border border-transparent px-2 py-1.5 transition-colors hover:bg-muted/40"
|
||||
style={
|
||||
isSelected
|
||||
? {
|
||||
borderColor: "rgba(47, 128, 255, 0.35)",
|
||||
backgroundColor: "rgba(47, 128, 255, 0.08)",
|
||||
}
|
||||
: undefined
|
||||
}
|
||||
>
|
||||
<div className="flex min-w-0 items-center gap-2">
|
||||
<span className="font-mono text-xs" title={session.id}>
|
||||
{truncateSessionId(session.id)}
|
||||
</span>
|
||||
<Badge
|
||||
variant={getStatusVariant(session.status)}
|
||||
className="capitalize"
|
||||
>
|
||||
{session.status}
|
||||
</Badge>
|
||||
</div>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
className="h-7 min-h-7 w-7 min-w-7 p-0"
|
||||
disabled={isKilling}
|
||||
onClick={(event) => {
|
||||
event.stopPropagation();
|
||||
killMutation.mutate(session.id);
|
||||
}}
|
||||
aria-label={`Kill session ${truncateSessionId(session.id)}`}
|
||||
>
|
||||
{isKilling ? (
|
||||
<Loader2
|
||||
className="h-3.5 w-3.5 animate-spin"
|
||||
aria-hidden="true"
|
||||
/>
|
||||
) : (
|
||||
<X className="h-3.5 w-3.5" aria-hidden="true" />
|
||||
)}
|
||||
</Button>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
) : null}
|
||||
</Collapsible>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
</ScrollArea>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
"use client";
|
||||
|
||||
import { useState } from "react";
|
||||
import { GlobalAgentRoster } from "@/components/mission-control/GlobalAgentRoster";
|
||||
import { MissionControlPanel } from "@/components/mission-control/MissionControlPanel";
|
||||
import { useSessions } from "@/hooks/useMissionControl";
|
||||
|
||||
const DEFAULT_PANEL_SLOTS = ["panel-1", "panel-2", "panel-3", "panel-4"] as const;
|
||||
|
||||
export function MissionControlLayout(): React.JSX.Element {
|
||||
const { sessions } = useSessions();
|
||||
const [selectedSessionId, setSelectedSessionId] = useState<string>();
|
||||
|
||||
// First panel: selected session (from roster click) or first available session
|
||||
const firstPanelSessionId = selectedSessionId ?? sessions[0]?.id;
|
||||
const panelSessionIds = [firstPanelSessionId, undefined, undefined, undefined] as const;
|
||||
|
||||
return (
|
||||
<section className="h-full min-h-0 overflow-hidden" aria-label="Mission Control">
|
||||
<div className="grid h-full min-h-0 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]">
|
||||
<aside className="h-full min-h-0">
|
||||
<GlobalAgentRoster
|
||||
onSelectSession={setSelectedSessionId}
|
||||
{...(selectedSessionId ? { selectedSessionId } : {})}
|
||||
/>
|
||||
</aside>
|
||||
<main className="h-full min-h-0 overflow-hidden">
|
||||
<MissionControlPanel panels={DEFAULT_PANEL_SLOTS} panelSessionIds={panelSessionIds} />
|
||||
</main>
|
||||
</div>
|
||||
</section>
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
"use client";
|
||||
|
||||
import { OrchestratorPanel } from "@/components/mission-control/OrchestratorPanel";
|
||||
|
||||
interface MissionControlPanelProps {
|
||||
panels: readonly string[];
|
||||
panelSessionIds?: readonly (string | undefined)[];
|
||||
}
|
||||
|
||||
export function MissionControlPanel({
|
||||
panels,
|
||||
panelSessionIds,
|
||||
}: MissionControlPanelProps): React.JSX.Element {
|
||||
return (
|
||||
<div className="grid h-full min-h-0 auto-rows-fr grid-cols-1 gap-4 overflow-y-auto pr-1 md:grid-cols-2">
|
||||
{panels.map((panelId, index) => {
|
||||
const sessionId = panelSessionIds?.[index];
|
||||
|
||||
if (sessionId === undefined) {
|
||||
return <OrchestratorPanel key={panelId} />;
|
||||
}
|
||||
|
||||
return <OrchestratorPanel key={panelId} sessionId={sessionId} />;
|
||||
})}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
124
apps/web/src/components/mission-control/OrchestratorPanel.tsx
Normal file
124
apps/web/src/components/mission-control/OrchestratorPanel.tsx
Normal file
@@ -0,0 +1,124 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useRef } from "react";
|
||||
import { formatDistanceToNow } from "date-fns";
|
||||
import { BargeInInput } from "@/components/mission-control/BargeInInput";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import type { BadgeVariant } from "@/components/ui/badge";
|
||||
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
import { ScrollArea } from "@/components/ui/scroll-area";
|
||||
import {
|
||||
useSessionStream,
|
||||
type MissionControlConnectionStatus,
|
||||
type MissionControlMessageRole,
|
||||
} from "@/hooks/useMissionControl";
|
||||
|
||||
const ROLE_BADGE_VARIANT: Record<MissionControlMessageRole, BadgeVariant> = {
|
||||
user: "badge-blue",
|
||||
assistant: "status-success",
|
||||
tool: "badge-amber",
|
||||
system: "badge-muted",
|
||||
};
|
||||
|
||||
const CONNECTION_DOT_CLASS: Record<MissionControlConnectionStatus, string> = {
|
||||
connected: "bg-emerald-500",
|
||||
connecting: "bg-amber-500",
|
||||
error: "bg-red-500",
|
||||
};
|
||||
|
||||
const CONNECTION_TEXT: Record<MissionControlConnectionStatus, string> = {
|
||||
connected: "Connected",
|
||||
connecting: "Connecting",
|
||||
error: "Error",
|
||||
};
|
||||
|
||||
export interface OrchestratorPanelProps {
|
||||
sessionId?: string;
|
||||
}
|
||||
|
||||
function formatRelativeTimestamp(timestamp: string): string {
|
||||
const parsedDate = new Date(timestamp);
|
||||
if (Number.isNaN(parsedDate.getTime())) {
|
||||
return "just now";
|
||||
}
|
||||
|
||||
return formatDistanceToNow(parsedDate, { addSuffix: true });
|
||||
}
|
||||
|
||||
export function OrchestratorPanel({ sessionId }: OrchestratorPanelProps): React.JSX.Element {
|
||||
const { messages, status, error } = useSessionStream(sessionId ?? "");
|
||||
const bottomAnchorRef = useRef<HTMLDivElement | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
bottomAnchorRef.current?.scrollIntoView({ block: "end" });
|
||||
}, [messages.length]);
|
||||
|
||||
if (!sessionId) {
|
||||
return (
|
||||
<Card className="flex h-full min-h-[220px] flex-col">
|
||||
<CardHeader>
|
||||
<CardTitle className="text-base">Orchestrator Panel</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="flex flex-1 items-center justify-center text-sm text-muted-foreground">
|
||||
Select an agent to view its stream
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<Card className="flex h-full min-h-[220px] flex-col">
|
||||
<CardHeader className="space-y-2">
|
||||
<div className="flex items-center justify-between gap-2">
|
||||
<CardTitle className="text-base">Orchestrator Panel</CardTitle>
|
||||
<div className="flex items-center gap-2 text-xs text-muted-foreground">
|
||||
<span
|
||||
className={`h-2.5 w-2.5 rounded-full ${CONNECTION_DOT_CLASS[status]} ${
|
||||
status === "connecting" ? "animate-pulse" : ""
|
||||
}`}
|
||||
aria-hidden="true"
|
||||
/>
|
||||
<span>{CONNECTION_TEXT[status]}</span>
|
||||
</div>
|
||||
</div>
|
||||
<p className="truncate text-xs text-muted-foreground">Session: {sessionId}</p>
|
||||
</CardHeader>
|
||||
<CardContent className="flex min-h-0 flex-1 flex-col p-0">
|
||||
<div className="min-h-0 flex-1">
|
||||
<ScrollArea className="h-full w-full">
|
||||
<div className="flex min-h-full flex-col gap-3 p-4">
|
||||
{messages.length === 0 ? (
|
||||
<p className="mt-6 text-center text-sm text-muted-foreground">
|
||||
{error ?? "Waiting for messages..."}
|
||||
</p>
|
||||
) : (
|
||||
messages.map((message) => (
|
||||
<article
|
||||
key={message.id}
|
||||
className="rounded-lg border border-border/70 bg-card px-3 py-2"
|
||||
>
|
||||
<div className="mb-2 flex items-center justify-between gap-2">
|
||||
<Badge variant={ROLE_BADGE_VARIANT[message.role]} className="uppercase">
|
||||
{message.role}
|
||||
</Badge>
|
||||
<time className="text-xs text-muted-foreground">
|
||||
{formatRelativeTimestamp(message.timestamp)}
|
||||
</time>
|
||||
</div>
|
||||
<p className="whitespace-pre-wrap break-words text-sm text-foreground">
|
||||
{message.content}
|
||||
</p>
|
||||
</article>
|
||||
))
|
||||
)}
|
||||
<div ref={bottomAnchorRef} />
|
||||
</div>
|
||||
</ScrollArea>
|
||||
</div>
|
||||
<div className="border-t border-border/70 p-3">
|
||||
<BargeInInput sessionId={sessionId} />
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
);
|
||||
}
|
||||
13
apps/web/src/components/ui/collapsible.tsx
Normal file
13
apps/web/src/components/ui/collapsible.tsx
Normal file
@@ -0,0 +1,13 @@
|
||||
import * as React from "react";
|
||||
|
||||
export interface CollapsibleProps extends React.HTMLAttributes<HTMLDivElement> {
|
||||
open?: boolean;
|
||||
}
|
||||
|
||||
export function Collapsible({
|
||||
open = true,
|
||||
className = "",
|
||||
...props
|
||||
}: CollapsibleProps): React.JSX.Element {
|
||||
return <div data-state={open ? "open" : "closed"} className={className} {...props} />;
|
||||
}
|
||||
15
apps/web/src/components/ui/scroll-area.tsx
Normal file
15
apps/web/src/components/ui/scroll-area.tsx
Normal file
@@ -0,0 +1,15 @@
|
||||
import * as React from "react";
|
||||
|
||||
export type ScrollAreaProps = React.HTMLAttributes<HTMLDivElement>;
|
||||
|
||||
export const ScrollArea = React.forwardRef<HTMLDivElement, ScrollAreaProps>(
|
||||
({ className = "", children, ...props }, ref) => {
|
||||
return (
|
||||
<div ref={ref} className={`relative overflow-hidden ${className}`} {...props}>
|
||||
<div className="h-full w-full overflow-auto">{children}</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
ScrollArea.displayName = "ScrollArea";
|
||||
15
apps/web/src/components/ui/skeleton.tsx
Normal file
15
apps/web/src/components/ui/skeleton.tsx
Normal file
@@ -0,0 +1,15 @@
|
||||
import * as React from "react";
|
||||
|
||||
export type SkeletonProps = React.HTMLAttributes<HTMLDivElement>;
|
||||
|
||||
export const Skeleton = React.forwardRef<HTMLDivElement, SkeletonProps>(
|
||||
({ className = "", ...props }, ref) => (
|
||||
<div
|
||||
ref={ref}
|
||||
className={`animate-pulse rounded-md bg-[rgb(var(--surface-2))] ${className}`}
|
||||
{...props}
|
||||
/>
|
||||
)
|
||||
);
|
||||
|
||||
Skeleton.displayName = "Skeleton";
|
||||
189
apps/web/src/hooks/useMissionControl.ts
Normal file
189
apps/web/src/hooks/useMissionControl.ts
Normal file
@@ -0,0 +1,189 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import type { AgentMessageRole, AgentSessionStatus } from "@mosaic/shared";
|
||||
import { apiGet } from "@/lib/api/client";
|
||||
|
||||
const MISSION_CONTROL_SESSIONS_QUERY_KEY = ["mission-control", "sessions"] as const;
|
||||
const SESSIONS_REFRESH_INTERVAL_MS = 15_000;
|
||||
|
||||
export type MissionControlMessageRole = AgentMessageRole;
|
||||
|
||||
export interface MissionControlSession {
|
||||
id: string;
|
||||
providerId: string;
|
||||
providerType: string;
|
||||
label?: string;
|
||||
status: AgentSessionStatus;
|
||||
parentSessionId?: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface MissionControlSessionsResponse {
|
||||
sessions: MissionControlSession[];
|
||||
}
|
||||
|
||||
export interface MissionControlStreamMessage {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
role: MissionControlMessageRole;
|
||||
content: string;
|
||||
timestamp: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export type MissionControlConnectionStatus = "connecting" | "connected" | "error";
|
||||
|
||||
export interface UseSessionsResult {
|
||||
sessions: MissionControlSession[];
|
||||
loading: boolean;
|
||||
error: Error | null;
|
||||
}
|
||||
|
||||
export interface UseSessionStreamResult {
|
||||
messages: MissionControlStreamMessage[];
|
||||
status: MissionControlConnectionStatus;
|
||||
error: string | null;
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null;
|
||||
}
|
||||
|
||||
function isMessageRole(value: unknown): value is MissionControlMessageRole {
|
||||
return value === "assistant" || value === "system" || value === "tool" || value === "user";
|
||||
}
|
||||
|
||||
function isMissionControlStreamMessage(value: unknown): value is MissionControlStreamMessage {
|
||||
if (!isRecord(value)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const { id, sessionId, role, content, timestamp, metadata } = value;
|
||||
|
||||
if (
|
||||
typeof id !== "string" ||
|
||||
typeof sessionId !== "string" ||
|
||||
!isMessageRole(role) ||
|
||||
typeof content !== "string" ||
|
||||
typeof timestamp !== "string"
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (metadata !== undefined && !isRecord(metadata)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches Mission Control sessions.
|
||||
*/
|
||||
export function useSessions(): UseSessionsResult {
|
||||
const query = useQuery<MissionControlSessionsResponse>({
|
||||
queryKey: MISSION_CONTROL_SESSIONS_QUERY_KEY,
|
||||
queryFn: async (): Promise<MissionControlSessionsResponse> => {
|
||||
return apiGet<MissionControlSessionsResponse>("/api/mission-control/sessions");
|
||||
},
|
||||
refetchInterval: SESSIONS_REFRESH_INTERVAL_MS,
|
||||
});
|
||||
|
||||
return {
|
||||
sessions: query.data?.sessions ?? [],
|
||||
loading: query.isLoading,
|
||||
error: query.error ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Backward-compatible alias for early Mission Control integration.
|
||||
*/
|
||||
export function useMissionControl(): UseSessionsResult {
|
||||
return useSessions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Streams Mission Control session messages over SSE.
|
||||
*/
|
||||
export function useSessionStream(sessionId: string): UseSessionStreamResult {
|
||||
const [messages, setMessages] = useState<MissionControlStreamMessage[]>([]);
|
||||
const [status, setStatus] = useState<MissionControlConnectionStatus>("connecting");
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
const eventSourceRef = useRef<EventSource | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
if (eventSourceRef.current !== null) {
|
||||
eventSourceRef.current.close();
|
||||
eventSourceRef.current = null;
|
||||
}
|
||||
|
||||
setMessages([]);
|
||||
setError(null);
|
||||
|
||||
if (!sessionId) {
|
||||
setStatus("connecting");
|
||||
return;
|
||||
}
|
||||
|
||||
if (typeof EventSource === "undefined") {
|
||||
setStatus("error");
|
||||
setError("Mission Control stream is not supported by this browser.");
|
||||
return;
|
||||
}
|
||||
|
||||
setStatus("connecting");
|
||||
|
||||
const source = new EventSource(
|
||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/stream`
|
||||
);
|
||||
eventSourceRef.current = source;
|
||||
|
||||
source.onopen = (): void => {
|
||||
setStatus("connected");
|
||||
setError(null);
|
||||
};
|
||||
|
||||
source.onmessage = (event: MessageEvent<string>): void => {
|
||||
try {
|
||||
const parsed = JSON.parse(event.data) as unknown;
|
||||
if (!isMissionControlStreamMessage(parsed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
setMessages((previousMessages) => [...previousMessages, parsed]);
|
||||
} catch {
|
||||
// Ignore malformed events from the stream.
|
||||
}
|
||||
};
|
||||
|
||||
source.onerror = (): void => {
|
||||
if (source.readyState === EventSource.CONNECTING) {
|
||||
setStatus("connecting");
|
||||
setError(null);
|
||||
return;
|
||||
}
|
||||
|
||||
setStatus("error");
|
||||
setError("Mission Control stream disconnected.");
|
||||
};
|
||||
|
||||
return (): void => {
|
||||
source.close();
|
||||
if (eventSourceRef.current === source) {
|
||||
eventSourceRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [sessionId]);
|
||||
|
||||
return {
|
||||
messages,
|
||||
status,
|
||||
error,
|
||||
};
|
||||
}
|
||||
78
packages/shared/src/types/agent-provider.types.ts
Normal file
78
packages/shared/src/types/agent-provider.types.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
// Agent message roles
|
||||
export type AgentMessageRole = "user" | "assistant" | "system" | "tool";
|
||||
|
||||
// A single message in an agent conversation
|
||||
export interface AgentMessage {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
role: AgentMessageRole;
|
||||
content: string;
|
||||
timestamp: Date;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
// Session lifecycle status
|
||||
export type AgentSessionStatus = "active" | "paused" | "completed" | "failed" | "idle";
|
||||
|
||||
// An agent session (conversation thread)
|
||||
export interface AgentSession {
|
||||
id: string;
|
||||
providerId: string; // which provider owns this session
|
||||
providerType: string; // "internal" | "openclaw" | etc.
|
||||
label?: string;
|
||||
status: AgentSessionStatus;
|
||||
parentSessionId?: string; // for subagent trees
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
// Result of listing sessions
|
||||
export interface AgentSessionList {
|
||||
sessions: AgentSession[];
|
||||
total: number;
|
||||
cursor?: string;
|
||||
}
|
||||
|
||||
// Result of injecting a message
|
||||
export interface InjectResult {
|
||||
accepted: boolean;
|
||||
messageId?: string;
|
||||
}
|
||||
|
||||
// The IAgentProvider interface — every provider (internal, OpenClaw, future) implements this
|
||||
export interface IAgentProvider {
|
||||
readonly providerId: string;
|
||||
readonly providerType: string;
|
||||
readonly displayName: string;
|
||||
|
||||
// Session management
|
||||
listSessions(cursor?: string, limit?: number): Promise<AgentSessionList>;
|
||||
getSession(sessionId: string): Promise<AgentSession | null>;
|
||||
getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]>;
|
||||
|
||||
// Control operations
|
||||
injectMessage(sessionId: string, content: string): Promise<InjectResult>;
|
||||
pauseSession(sessionId: string): Promise<void>;
|
||||
resumeSession(sessionId: string): Promise<void>;
|
||||
killSession(sessionId: string, force?: boolean): Promise<void>;
|
||||
|
||||
// SSE streaming — returns an AsyncIterable of AgentMessage events
|
||||
streamMessages(sessionId: string): AsyncIterable<AgentMessage>;
|
||||
|
||||
// Health
|
||||
isAvailable(): Promise<boolean>;
|
||||
}
|
||||
|
||||
// Provider configuration stored in DB (AgentProviderConfig model from P0 schema)
|
||||
export interface AgentProviderConfig {
|
||||
id: string;
|
||||
providerId: string;
|
||||
providerType: string;
|
||||
displayName: string;
|
||||
baseUrl?: string;
|
||||
apiToken?: string;
|
||||
enabled: boolean;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}
|
||||
@@ -134,3 +134,6 @@ export * from "./widget.types";
|
||||
|
||||
// Export WebSocket types
|
||||
export * from "./websocket.types";
|
||||
|
||||
// Export agent provider types
|
||||
export * from "./agent-provider.types";
|
||||
|
||||
Reference in New Issue
Block a user