From 9cc82e7fcf716c72f7f8ad19d08ab3a74280ee5f Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 7 Mar 2026 19:15:59 +0000 Subject: [PATCH] feat(orchestrator): MS23-P1-003 AgentProviderRegistry (#720) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- .../agents/agent-provider.registry.spec.ts | 137 ++++++++++++++++++ .../src/api/agents/agent-provider.registry.ts | 57 ++++++++ .../src/api/agents/agents.module.ts | 4 +- 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts create mode 100644 apps/orchestrator/src/api/agents/agent-provider.registry.ts diff --git a/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts b/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts new file mode 100644 index 0000000..0241c6a --- /dev/null +++ b/apps/orchestrator/src/api/agents/agent-provider.registry.spec.ts @@ -0,0 +1,137 @@ +import { Logger } from "@nestjs/common"; +import type { + AgentMessage, + AgentSession, + AgentSessionList, + IAgentProvider, + InjectResult, +} from "@mosaic/shared"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { AgentProviderRegistry } from "./agent-provider.registry"; +import { InternalAgentProvider } from "./internal-agent.provider"; + +type MockProvider = IAgentProvider & { + listSessions: ReturnType; +}; + +const emptyMessageStream = async function* (): AsyncIterable { + return; +}; + +const createProvider = (providerId: string, sessions: AgentSession[] = []): MockProvider => { + return { + providerId, + providerType: providerId, + displayName: providerId, + listSessions: vi.fn().mockResolvedValue({ + sessions, + total: sessions.length, + } as AgentSessionList), + getSession: vi.fn().mockResolvedValue(null), + getMessages: vi.fn().mockResolvedValue([]), + injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult), + pauseSession: vi.fn().mockResolvedValue(undefined), + resumeSession: vi.fn().mockResolvedValue(undefined), + killSession: vi.fn().mockResolvedValue(undefined), + streamMessages: vi.fn().mockReturnValue(emptyMessageStream()), + isAvailable: vi.fn().mockResolvedValue(true), + }; +}; + +describe("AgentProviderRegistry", () => { + let registry: AgentProviderRegistry; + let internalProvider: MockProvider; + + beforeEach(() => { + internalProvider = createProvider("internal"); + registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("registers InternalAgentProvider on module init", () => { + registry.onModuleInit(); + + expect(registry.getProvider("internal")).toBe(internalProvider); + }); + + it("registers providers and returns null for unknown provider ids", () => { + const externalProvider = createProvider("openclaw"); + + registry.registerProvider(externalProvider); + + expect(registry.getProvider("openclaw")).toBe(externalProvider); + expect(registry.getProvider("missing")).toBeNull(); + }); + + it("aggregates and sorts sessions from all providers", async () => { + const internalSessions: AgentSession[] = [ + { + id: "session-older", + providerId: "internal", + providerType: "internal", + status: "active", + createdAt: new Date("2026-03-07T10:00:00.000Z"), + updatedAt: new Date("2026-03-07T10:10:00.000Z"), + }, + ]; + + const externalSessions: AgentSession[] = [ + { + id: "session-newer", + providerId: "openclaw", + providerType: "external", + status: "paused", + createdAt: new Date("2026-03-07T09:00:00.000Z"), + updatedAt: new Date("2026-03-07T10:20:00.000Z"), + }, + ]; + + internalProvider.listSessions.mockResolvedValue({ + sessions: internalSessions, + total: internalSessions.length, + } as AgentSessionList); + + const externalProvider = createProvider("openclaw", externalSessions); + registry.onModuleInit(); + registry.registerProvider(externalProvider); + + const result = await registry.listAllSessions(); + + expect(result.map((session) => session.id)).toEqual(["session-newer", "session-older"]); + expect(internalProvider.listSessions).toHaveBeenCalledTimes(1); + expect(externalProvider.listSessions).toHaveBeenCalledTimes(1); + }); + + it("skips provider failures and logs warning", async () => { + const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined); + + const healthyProvider = createProvider("healthy", [ + { + id: "session-1", + providerId: "healthy", + providerType: "external", + status: "active", + createdAt: new Date("2026-03-07T11:00:00.000Z"), + updatedAt: new Date("2026-03-07T11:00:00.000Z"), + }, + ]); + + const failingProvider = createProvider("failing"); + failingProvider.listSessions.mockRejectedValue(new Error("provider offline")); + + registry.onModuleInit(); + registry.registerProvider(healthyProvider); + registry.registerProvider(failingProvider); + + const result = await registry.listAllSessions(); + + expect(result).toHaveLength(1); + expect(result[0]?.id).toBe("session-1"); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("Failed to list sessions for provider failing") + ); + }); +}); diff --git a/apps/orchestrator/src/api/agents/agent-provider.registry.ts b/apps/orchestrator/src/api/agents/agent-provider.registry.ts new file mode 100644 index 0000000..526a65e --- /dev/null +++ b/apps/orchestrator/src/api/agents/agent-provider.registry.ts @@ -0,0 +1,57 @@ +import { Injectable, Logger, OnModuleInit } from "@nestjs/common"; +import type { AgentSession, IAgentProvider } from "@mosaic/shared"; +import { InternalAgentProvider } from "./internal-agent.provider"; + +@Injectable() +export class AgentProviderRegistry implements OnModuleInit { + private readonly logger = new Logger(AgentProviderRegistry.name); + private readonly providers = new Map(); + + constructor(private readonly internalProvider: InternalAgentProvider) {} + + onModuleInit(): void { + this.registerProvider(this.internalProvider); + } + + registerProvider(provider: IAgentProvider): void { + const existingProvider = this.providers.get(provider.providerId); + if (existingProvider !== undefined) { + this.logger.warn(`Replacing existing provider registration for ${provider.providerId}`); + } + + this.providers.set(provider.providerId, provider); + } + + getProvider(providerId: string): IAgentProvider | null { + return this.providers.get(providerId) ?? null; + } + + async listAllSessions(): Promise { + const providers = [...this.providers.values()]; + const sessionsByProvider = await Promise.all( + providers.map(async (provider) => { + try { + const { sessions } = await provider.listSessions(); + return sessions; + } catch (error) { + this.logger.warn( + `Failed to list sessions for provider ${provider.providerId}: ${this.toErrorMessage(error)}` + ); + return []; + } + }) + ); + + return sessionsByProvider + .flat() + .sort((left, right) => right.updatedAt.getTime() - left.updatedAt.getTime()); + } + + private toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return String(error); + } +} diff --git a/apps/orchestrator/src/api/agents/agents.module.ts b/apps/orchestrator/src/api/agents/agents.module.ts index 16b6857..ffee5a4 100644 --- a/apps/orchestrator/src/api/agents/agents.module.ts +++ b/apps/orchestrator/src/api/agents/agents.module.ts @@ -11,6 +11,7 @@ import { AgentMessagesService } from "./agent-messages.service"; import { AgentControlService } from "./agent-control.service"; import { AgentTreeService } from "./agent-tree.service"; import { InternalAgentProvider } from "./internal-agent.provider"; +import { AgentProviderRegistry } from "./agent-provider.registry"; @Module({ imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule], @@ -22,7 +23,8 @@ import { InternalAgentProvider } from "./internal-agent.provider"; AgentControlService, AgentTreeService, InternalAgentProvider, + AgentProviderRegistry, ], - exports: [InternalAgentProvider], + exports: [InternalAgentProvider, AgentProviderRegistry], }) export class AgentsModule {}