From 5d66e007103353a6aed56c63f8dc6f3c970fdb77 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Mar 2026 09:23:43 -0600 Subject: [PATCH] feat(api): ContainerLifecycleService for per-user OpenClaw (MS22-P1d) --- apps/api/package.json | 2 + apps/api/src/app.module.ts | 2 + .../container-lifecycle.module.ts | 11 + .../container-lifecycle.service.spec.ts | 593 ++++++++++++++++++ .../container-lifecycle.service.ts | 532 ++++++++++++++++ pnpm-lock.yaml | 7 +- 6 files changed, 1146 insertions(+), 1 deletion(-) create mode 100644 apps/api/src/container-lifecycle/container-lifecycle.module.ts create mode 100644 apps/api/src/container-lifecycle/container-lifecycle.service.spec.ts create mode 100644 apps/api/src/container-lifecycle/container-lifecycle.service.ts diff --git a/apps/api/package.json b/apps/api/package.json index 1e76617..76d3bd5 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -59,6 +59,7 @@ "class-validator": "^0.14.3", "cookie-parser": "^1.4.7", "discord.js": "^14.25.1", + "dockerode": "^4.0.9", "gray-matter": "^4.0.3", "highlight.js": "^11.11.1", "ioredis": "^5.9.2", @@ -88,6 +89,7 @@ "@types/archiver": "^7.0.0", "@types/bcryptjs": "^3.0.0", "@types/cookie-parser": "^1.4.10", + "@types/dockerode": "^3.3.47", "@types/express": "^5.0.1", "@types/highlight.js": "^10.1.0", "@types/node": "^22.13.4", diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index d3bc776..660497c 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -52,6 +52,7 @@ import { ImportModule } from "./import/import.module"; import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module"; import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor"; import { AgentConfigModule } from "./agent-config/agent-config.module"; +import { ContainerLifecycleModule } from "./container-lifecycle/container-lifecycle.module"; @Module({ imports: [ @@ -125,6 +126,7 @@ import { AgentConfigModule } from "./agent-config/agent-config.module"; ImportModule, ConversationArchiveModule, AgentConfigModule, + ContainerLifecycleModule, ], controllers: [AppController, CsrfController], providers: [ diff --git a/apps/api/src/container-lifecycle/container-lifecycle.module.ts b/apps/api/src/container-lifecycle/container-lifecycle.module.ts new file mode 100644 index 0000000..a0e94c3 --- /dev/null +++ b/apps/api/src/container-lifecycle/container-lifecycle.module.ts @@ -0,0 +1,11 @@ +import { Module } from "@nestjs/common"; +import { PrismaModule } from "../prisma/prisma.module"; +import { CryptoModule } from "../crypto/crypto.module"; +import { ContainerLifecycleService } from "./container-lifecycle.service"; + +@Module({ + imports: [PrismaModule, CryptoModule], + providers: [ContainerLifecycleService], + exports: [ContainerLifecycleService], +}) +export class ContainerLifecycleModule {} diff --git a/apps/api/src/container-lifecycle/container-lifecycle.service.spec.ts b/apps/api/src/container-lifecycle/container-lifecycle.service.spec.ts new file mode 100644 index 0000000..c379e31 --- /dev/null +++ b/apps/api/src/container-lifecycle/container-lifecycle.service.spec.ts @@ -0,0 +1,593 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ConfigService } from "@nestjs/config"; +import type { PrismaService } from "../prisma/prisma.service"; +import type { CryptoService } from "../crypto/crypto.service"; + +interface MockUserContainerRecord { + id: string; + userId: string; + containerId: string | null; + containerName: string; + gatewayPort: number | null; + gatewayToken: string; + status: string; + lastActiveAt: Date | null; + idleTimeoutMin: number; + config: Record; + createdAt: Date; + updatedAt: Date; +} + +const dockerMock = vi.hoisted(() => { + interface MockDockerContainerState { + id: string; + name: string; + running: boolean; + port: number; + } + + const containers = new Map(); + const handles = new Map< + string, + { + inspect: ReturnType; + start: ReturnType; + stop: ReturnType; + } + >(); + + const ensureHandle = (id: string) => { + const existing = handles.get(id); + if (existing) { + return existing; + } + + const handle = { + inspect: vi.fn(async () => { + const container = containers.get(id); + if (!container) { + throw { statusCode: 404 }; + } + + return { + Id: container.id, + State: { + Running: container.running, + }, + NetworkSettings: { + Ports: { + "18789/tcp": [{ HostPort: String(container.port) }], + }, + }, + }; + }), + start: vi.fn(async () => { + const container = containers.get(id); + if (!container) { + throw { statusCode: 404 }; + } + container.running = true; + }), + stop: vi.fn(async () => { + const container = containers.get(id); + if (!container) { + throw { statusCode: 404 }; + } + container.running = false; + }), + }; + + handles.set(id, handle); + return handle; + }; + + const listContainers = vi.fn( + async (options?: { all?: boolean; filters?: { name?: string[] } }) => { + const nameFilter = options?.filters?.name?.[0]; + return [...containers.values()] + .filter((container) => (nameFilter ? container.name.includes(nameFilter) : true)) + .map((container) => ({ + Id: container.id, + Names: [`/${container.name}`], + })); + } + ); + + const getContainer = vi.fn((id: string) => ensureHandle(id)); + + const createContainer = vi.fn( + async (options: { + name?: string; + HostConfig?: { PortBindings?: Record> }; + }) => { + const id = `ctr-${containers.size + 1}`; + const name = options.name ?? id; + const hostPort = options.HostConfig?.PortBindings?.["18789/tcp"]?.[0]?.HostPort; + const port = hostPort ? Number.parseInt(hostPort, 10) : 0; + + containers.set(id, { + id, + name, + running: false, + port, + }); + + return ensureHandle(id); + } + ); + + const dockerInstance = { + listContainers, + getContainer, + createContainer, + }; + + const constructorSpy = vi.fn(); + class DockerConstructorMock { + constructor(options?: unknown) { + constructorSpy(options); + return dockerInstance; + } + } + + const registerContainer = (container: MockDockerContainerState) => { + containers.set(container.id, { ...container }); + ensureHandle(container.id); + }; + + const reset = () => { + containers.clear(); + handles.clear(); + constructorSpy.mockClear(); + listContainers.mockClear(); + getContainer.mockClear(); + createContainer.mockClear(); + }; + + return { + DockerConstructorMock, + constructorSpy, + createContainer, + handles, + registerContainer, + reset, + }; +}); + +vi.mock("dockerode", () => ({ + default: dockerMock.DockerConstructorMock, +})); + +import { ContainerLifecycleService } from "./container-lifecycle.service"; + +function createConfigMock(values: Record = {}) { + return { + get: vi.fn((key: string) => values[key]), + }; +} + +function createCryptoMock() { + return { + generateToken: vi.fn(() => "generated-token"), + encrypt: vi.fn((value: string) => `enc:${value}`), + decrypt: vi.fn((value: string) => value.replace(/^enc:/, "")), + isEncrypted: vi.fn((value: string) => value.startsWith("enc:")), + }; +} + +function projectRecord( + record: MockUserContainerRecord, + select?: Record +): Partial { + if (!select) { + return { ...record }; + } + + const projection: Partial = {}; + for (const [field, enabled] of Object.entries(select)) { + if (enabled) { + const key = field as keyof MockUserContainerRecord; + projection[key] = record[key]; + } + } + + return projection; +} + +function createPrismaMock(initialRecords: MockUserContainerRecord[] = []) { + const records = new Map(); + for (const record of initialRecords) { + records.set(record.userId, { ...record }); + } + + const userContainer = { + findUnique: vi.fn( + async (args: { + where: { userId?: string; id?: string }; + select?: Record; + }) => { + let record: MockUserContainerRecord | undefined; + if (args.where.userId) { + record = records.get(args.where.userId); + } else if (args.where.id) { + record = [...records.values()].find((entry) => entry.id === args.where.id); + } + + if (!record) { + return null; + } + + return projectRecord(record, args.select); + } + ), + create: vi.fn( + async (args: { + data: Partial & { + userId: string; + containerName: string; + gatewayToken: string; + }; + }) => { + const now = new Date(); + const next: MockUserContainerRecord = { + id: args.data.id ?? `uc-${records.size + 1}`, + userId: args.data.userId, + containerId: args.data.containerId ?? null, + containerName: args.data.containerName, + gatewayPort: args.data.gatewayPort ?? null, + gatewayToken: args.data.gatewayToken, + status: args.data.status ?? "stopped", + lastActiveAt: args.data.lastActiveAt ?? null, + idleTimeoutMin: args.data.idleTimeoutMin ?? 30, + config: args.data.config ?? {}, + createdAt: now, + updatedAt: now, + }; + + records.set(next.userId, next); + return { ...next }; + } + ), + update: vi.fn( + async (args: { where: { userId: string }; data: Partial }) => { + const record = records.get(args.where.userId); + if (!record) { + throw new Error(`Record ${args.where.userId} not found`); + } + + const updated: MockUserContainerRecord = { + ...record, + ...args.data, + updatedAt: new Date(), + }; + records.set(updated.userId, updated); + return { ...updated }; + } + ), + updateMany: vi.fn( + async (args: { where: { userId: string }; data: Partial }) => { + const record = records.get(args.where.userId); + if (!record) { + return { count: 0 }; + } + + const updated: MockUserContainerRecord = { + ...record, + ...args.data, + updatedAt: new Date(), + }; + records.set(updated.userId, updated); + return { count: 1 }; + } + ), + findMany: vi.fn( + async (args?: { + where?: { + status?: string; + lastActiveAt?: { not: null }; + gatewayPort?: { not: null }; + }; + select?: Record; + }) => { + let rows = [...records.values()]; + + if (args?.where?.status) { + rows = rows.filter((record) => record.status === args.where?.status); + } + + if (args?.where?.lastActiveAt?.not === null) { + rows = rows.filter((record) => record.lastActiveAt !== null); + } + + if (args?.where?.gatewayPort?.not === null) { + rows = rows.filter((record) => record.gatewayPort !== null); + } + + return rows.map((record) => projectRecord(record, args?.select)); + } + ), + }; + + return { + prisma: { + userContainer, + }, + records, + }; +} + +function createRecord(overrides: Partial): MockUserContainerRecord { + const now = new Date(); + return { + id: overrides.id ?? "uc-default", + userId: overrides.userId ?? "user-default", + containerId: overrides.containerId ?? null, + containerName: overrides.containerName ?? "mosaic-user-user-default", + gatewayPort: overrides.gatewayPort ?? null, + gatewayToken: overrides.gatewayToken ?? "enc:token-default", + status: overrides.status ?? "stopped", + lastActiveAt: overrides.lastActiveAt ?? null, + idleTimeoutMin: overrides.idleTimeoutMin ?? 30, + config: overrides.config ?? {}, + createdAt: overrides.createdAt ?? now, + updatedAt: overrides.updatedAt ?? now, + }; +} + +describe("ContainerLifecycleService", () => { + beforeEach(() => { + dockerMock.reset(); + }); + + it("ensureRunning creates container when none exists", async () => { + const { prisma, records } = createPrismaMock(); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + const result = await service.ensureRunning("user-1"); + + expect(result).toEqual({ + url: "http://mosaic-user-user-1:19000", + token: "generated-token", + }); + + const updatedRecord = records.get("user-1"); + expect(updatedRecord?.status).toBe("running"); + expect(updatedRecord?.containerId).toBe("ctr-1"); + expect(updatedRecord?.gatewayPort).toBe(19000); + expect(updatedRecord?.gatewayToken).toBe("enc:generated-token"); + + expect(dockerMock.createContainer).toHaveBeenCalledTimes(1); + const [createCall] = dockerMock.createContainer.mock.calls[0] as [ + { + name: string; + Image: string; + Env: string[]; + HostConfig: { Binds: string[]; NetworkMode: string }; + }, + ]; + expect(createCall.name).toBe("mosaic-user-user-1"); + expect(createCall.Image).toBe("alpine/openclaw:latest"); + expect(createCall.HostConfig.Binds).toEqual(["mosaic-user-user-1-state:/home/node/.openclaw"]); + expect(createCall.HostConfig.NetworkMode).toBe("mosaic-internal"); + expect(createCall.Env).toContain("AGENT_TOKEN=generated-token"); + }); + + it("ensureRunning starts existing stopped container", async () => { + const { prisma, records } = createPrismaMock([ + createRecord({ + id: "uc-1", + userId: "user-2", + containerId: "ctr-stopped", + containerName: "mosaic-user-user-2", + gatewayToken: "enc:existing-token", + status: "stopped", + }), + ]); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + dockerMock.registerContainer({ + id: "ctr-stopped", + name: "mosaic-user-user-2", + running: false, + port: 19042, + }); + + const result = await service.ensureRunning("user-2"); + + expect(result).toEqual({ + url: "http://mosaic-user-user-2:19042", + token: "existing-token", + }); + + const handle = dockerMock.handles.get("ctr-stopped"); + expect(handle?.start).toHaveBeenCalledTimes(1); + expect(records.get("user-2")?.status).toBe("running"); + expect(records.get("user-2")?.gatewayPort).toBe(19042); + }); + + it("ensureRunning returns existing running container", async () => { + const { prisma } = createPrismaMock([ + createRecord({ + id: "uc-2", + userId: "user-3", + containerId: "ctr-running", + containerName: "mosaic-user-user-3", + gatewayPort: 19043, + gatewayToken: "enc:running-token", + status: "running", + }), + ]); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + dockerMock.registerContainer({ + id: "ctr-running", + name: "mosaic-user-user-3", + running: true, + port: 19043, + }); + + const result = await service.ensureRunning("user-3"); + + expect(result).toEqual({ + url: "http://mosaic-user-user-3:19043", + token: "running-token", + }); + + expect(dockerMock.createContainer).not.toHaveBeenCalled(); + const handle = dockerMock.handles.get("ctr-running"); + expect(handle?.start).not.toHaveBeenCalled(); + }); + + it("stop gracefully stops container and updates DB", async () => { + const { prisma, records } = createPrismaMock([ + createRecord({ + id: "uc-stop", + userId: "user-stop", + containerId: "ctr-stop", + containerName: "mosaic-user-user-stop", + gatewayPort: 19044, + status: "running", + }), + ]); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + dockerMock.registerContainer({ + id: "ctr-stop", + name: "mosaic-user-user-stop", + running: true, + port: 19044, + }); + + await service.stop("user-stop"); + + const handle = dockerMock.handles.get("ctr-stop"); + expect(handle?.stop).toHaveBeenCalledWith({ t: 10 }); + + const updatedRecord = records.get("user-stop"); + expect(updatedRecord?.status).toBe("stopped"); + expect(updatedRecord?.containerId).toBeNull(); + expect(updatedRecord?.gatewayPort).toBeNull(); + }); + + it("reapIdle stops only containers past their idle timeout", async () => { + const now = Date.now(); + const { prisma, records } = createPrismaMock([ + createRecord({ + id: "uc-old", + userId: "user-old", + containerId: "ctr-old", + containerName: "mosaic-user-user-old", + gatewayPort: 19045, + status: "running", + lastActiveAt: new Date(now - 60 * 60 * 1000), + idleTimeoutMin: 30, + }), + createRecord({ + id: "uc-fresh", + userId: "user-fresh", + containerId: "ctr-fresh", + containerName: "mosaic-user-user-fresh", + gatewayPort: 19046, + status: "running", + lastActiveAt: new Date(now - 5 * 60 * 1000), + idleTimeoutMin: 30, + }), + ]); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + dockerMock.registerContainer({ + id: "ctr-old", + name: "mosaic-user-user-old", + running: true, + port: 19045, + }); + dockerMock.registerContainer({ + id: "ctr-fresh", + name: "mosaic-user-user-fresh", + running: true, + port: 19046, + }); + + const result = await service.reapIdle(); + + expect(result).toEqual({ + stopped: ["user-old"], + }); + + expect(records.get("user-old")?.status).toBe("stopped"); + expect(records.get("user-fresh")?.status).toBe("running"); + + const oldHandle = dockerMock.handles.get("ctr-old"); + const freshHandle = dockerMock.handles.get("ctr-fresh"); + expect(oldHandle?.stop).toHaveBeenCalledTimes(1); + expect(freshHandle?.stop).not.toHaveBeenCalled(); + }); + + it("touch updates lastActiveAt", async () => { + const { prisma, records } = createPrismaMock([ + createRecord({ + id: "uc-touch", + userId: "user-touch", + containerName: "mosaic-user-user-touch", + lastActiveAt: null, + }), + ]); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + await service.touch("user-touch"); + + const updatedRecord = records.get("user-touch"); + expect(updatedRecord?.lastActiveAt).toBeInstanceOf(Date); + }); + + it("getStatus returns null for unknown user", async () => { + const { prisma } = createPrismaMock(); + const crypto = createCryptoMock(); + const config = createConfigMock(); + const service = new ContainerLifecycleService( + prisma as unknown as PrismaService, + crypto as unknown as CryptoService, + config as unknown as ConfigService + ); + + const status = await service.getStatus("missing-user"); + + expect(status).toBeNull(); + }); +}); diff --git a/apps/api/src/container-lifecycle/container-lifecycle.service.ts b/apps/api/src/container-lifecycle/container-lifecycle.service.ts new file mode 100644 index 0000000..4fdf7b1 --- /dev/null +++ b/apps/api/src/container-lifecycle/container-lifecycle.service.ts @@ -0,0 +1,532 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import Docker from "dockerode"; +import { PrismaService } from "../prisma/prisma.service"; +import { CryptoService } from "../crypto/crypto.service"; + +const DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock"; +const DEFAULT_DOCKER_TCP_PORT = 2375; +const DEFAULT_OPENCLAW_IMAGE = "alpine/openclaw:latest"; +const DEFAULT_OPENCLAW_NETWORK = "mosaic-internal"; +const DEFAULT_OPENCLAW_PORT_RANGE_START = 19000; +const DEFAULT_MOSAIC_API_URL = "http://mosaic-api:3000/api"; +const OPENCLAW_GATEWAY_PORT_KEY = "18789/tcp"; +const OPENCLAW_STATE_PATH = "/home/node/.openclaw"; +const CONTAINER_STOP_TIMEOUT_SECONDS = 10; + +interface ContainerHandle { + inspect(): Promise; + start(): Promise; + stop(options?: { t?: number }): Promise; +} + +interface DockerInspect { + Id?: string; + State?: { + Running?: boolean; + Health?: { + Status?: string; + }; + }; + NetworkSettings?: { + Ports?: Record; + }; + HostConfig?: { + PortBindings?: Record; + }; +} + +interface UserContainerRecord { + id: string; + userId: string; + containerId: string | null; + containerName: string; + gatewayPort: number | null; + gatewayToken: string; + status: string; + lastActiveAt: Date | null; + idleTimeoutMin: number; +} + +interface ContainerLookup { + containerId: string | null; + containerName: string; +} + +@Injectable() +export class ContainerLifecycleService { + private readonly logger = new Logger(ContainerLifecycleService.name); + private readonly docker: Docker; + + constructor( + private readonly prisma: PrismaService, + private readonly crypto: CryptoService, + private readonly config: ConfigService + ) { + const dockerHost = this.config.get("DOCKER_HOST"); + this.docker = this.createDockerClient(dockerHost); + } + + // Ensure a user's container is running. Creates if needed, starts if stopped. + // Returns the container's internal URL and gateway token. + async ensureRunning(userId: string): Promise<{ url: string; token: string }> { + const containerRecord = await this.getOrCreateContainerRecord(userId); + const token = this.getGatewayToken(containerRecord.gatewayToken); + const existingContainer = await this.resolveContainer(containerRecord); + + let container: ContainerHandle; + if (existingContainer) { + container = existingContainer; + const inspect = await container.inspect(); + if (!inspect.State?.Running) { + await container.start(); + } + } else { + const port = await this.findAvailableGatewayPort(); + container = await this.createContainer(containerRecord, token, port); + await container.start(); + } + + const inspect = await container.inspect(); + const containerId = inspect.Id; + if (!containerId) { + throw new Error( + `Docker inspect did not return container ID for ${containerRecord.containerName}` + ); + } + + const gatewayPort = this.extractGatewayPort(inspect); + if (!gatewayPort) { + throw new Error(`Could not determine gateway port for ${containerRecord.containerName}`); + } + + const now = new Date(); + await this.prisma.userContainer.update({ + where: { userId }, + data: { + containerId, + gatewayPort, + status: "running", + lastActiveAt: now, + }, + }); + + return { + url: `http://${containerRecord.containerName}:${String(gatewayPort)}`, + token, + }; + } + + // Stop a user's container + async stop(userId: string): Promise { + const containerRecord = await this.prisma.userContainer.findUnique({ + where: { userId }, + }); + + if (!containerRecord) { + return; + } + + const container = await this.resolveContainer(containerRecord); + if (container) { + try { + await container.stop({ t: CONTAINER_STOP_TIMEOUT_SECONDS }); + } catch (error) { + if (!this.isDockerNotFound(error) && !this.isAlreadyStopped(error)) { + throw error; + } + } + } + + await this.prisma.userContainer.update({ + where: { userId }, + data: { + status: "stopped", + containerId: null, + gatewayPort: null, + }, + }); + } + + // Stop idle containers (called by cron/scheduler) + async reapIdle(): Promise<{ stopped: string[] }> { + const now = Date.now(); + const runningContainers = await this.prisma.userContainer.findMany({ + where: { + status: "running", + lastActiveAt: { not: null }, + }, + select: { + userId: true, + lastActiveAt: true, + idleTimeoutMin: true, + }, + }); + + const stopped: string[] = []; + for (const container of runningContainers) { + const lastActiveAt = container.lastActiveAt; + if (!lastActiveAt) { + continue; + } + + const idleLimitMs = container.idleTimeoutMin * 60 * 1000; + if (now - lastActiveAt.getTime() < idleLimitMs) { + continue; + } + + try { + await this.stop(container.userId); + stopped.push(container.userId); + } catch (error) { + this.logger.warn( + `Failed to stop idle container for user ${container.userId}: ${this.getErrorMessage(error)}` + ); + } + } + + return { stopped }; + } + + // Health check all running containers + async healthCheckAll(): Promise<{ userId: string; healthy: boolean; error?: string }[]> { + const runningContainers = await this.prisma.userContainer.findMany({ + where: { + status: "running", + }, + select: { + userId: true, + containerId: true, + containerName: true, + }, + }); + + const results: { userId: string; healthy: boolean; error?: string }[] = []; + for (const containerRecord of runningContainers) { + const container = await this.resolveContainer(containerRecord); + if (!container) { + results.push({ + userId: containerRecord.userId, + healthy: false, + error: "Container not found", + }); + continue; + } + + try { + const inspect = await container.inspect(); + const isRunning = inspect.State?.Running === true; + const healthState = inspect.State?.Health?.Status; + const healthy = isRunning && healthState !== "unhealthy"; + + if (healthy) { + results.push({ + userId: containerRecord.userId, + healthy: true, + }); + continue; + } + + results.push({ + userId: containerRecord.userId, + healthy: false, + error: + healthState === "unhealthy" ? "Container healthcheck failed" : "Container not running", + }); + } catch (error) { + results.push({ + userId: containerRecord.userId, + healthy: false, + error: this.getErrorMessage(error), + }); + } + } + + return results; + } + + // Restart a container with fresh config (for config updates) + async restart(userId: string): Promise { + await this.stop(userId); + await this.ensureRunning(userId); + } + + // Update lastActiveAt timestamp (called on each chat request) + async touch(userId: string): Promise { + await this.prisma.userContainer.updateMany({ + where: { userId }, + data: { + lastActiveAt: new Date(), + }, + }); + } + + // Get container status for a user + async getStatus( + userId: string + ): Promise<{ status: string; port?: number; lastActive?: Date } | null> { + const container = await this.prisma.userContainer.findUnique({ + where: { userId }, + select: { + status: true, + gatewayPort: true, + lastActiveAt: true, + }, + }); + + if (!container) { + return null; + } + + const status: { status: string; port?: number; lastActive?: Date } = { + status: container.status, + }; + + if (container.gatewayPort !== null) { + status.port = container.gatewayPort; + } + + if (container.lastActiveAt !== null) { + status.lastActive = container.lastActiveAt; + } + + return status; + } + + private createDockerClient(dockerHost?: string): Docker { + if (!dockerHost || dockerHost.trim().length === 0) { + return new Docker({ socketPath: DEFAULT_DOCKER_SOCKET_PATH }); + } + + if (dockerHost.startsWith("unix://")) { + return new Docker({ socketPath: dockerHost.slice("unix://".length) }); + } + + if (dockerHost.startsWith("tcp://")) { + const parsed = new URL(dockerHost.replace("tcp://", "http://")); + return new Docker({ + host: parsed.hostname, + port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT), + protocol: "http", + }); + } + + if (dockerHost.startsWith("http://") || dockerHost.startsWith("https://")) { + const parsed = new URL(dockerHost); + const protocol = parsed.protocol.replace(":", ""); + return new Docker({ + host: parsed.hostname, + port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT), + protocol: protocol === "https" ? "https" : "http", + }); + } + + return new Docker({ socketPath: dockerHost }); + } + + private async getOrCreateContainerRecord(userId: string): Promise { + const existingContainer = await this.prisma.userContainer.findUnique({ + where: { userId }, + }); + + if (existingContainer) { + return existingContainer; + } + + const token = this.crypto.generateToken(); + const containerName = this.getContainerName(userId); + return this.prisma.userContainer.create({ + data: { + userId, + containerName, + gatewayToken: this.crypto.encrypt(token), + status: "stopped", + }, + }); + } + + private getContainerName(userId: string): string { + return `mosaic-user-${userId}`; + } + + private getVolumeName(userId: string): string { + return `mosaic-user-${userId}-state`; + } + + private getOpenClawImage(): string { + return this.config.get("OPENCLAW_IMAGE") ?? DEFAULT_OPENCLAW_IMAGE; + } + + private getOpenClawNetwork(): string { + return this.config.get("OPENCLAW_NETWORK") ?? DEFAULT_OPENCLAW_NETWORK; + } + + private getMosaicApiUrl(): string { + return this.config.get("MOSAIC_API_URL") ?? DEFAULT_MOSAIC_API_URL; + } + + private getPortRangeStart(): number { + return this.parseInteger( + this.config.get("OPENCLAW_PORT_RANGE_START"), + DEFAULT_OPENCLAW_PORT_RANGE_START + ); + } + + private async resolveContainer(record: ContainerLookup): Promise { + if (record.containerId) { + const byId = this.docker.getContainer(record.containerId) as unknown as ContainerHandle; + if (await this.containerExists(byId)) { + return byId; + } + } + + const byName = await this.findContainerByName(record.containerName); + if (byName) { + return byName; + } + + return null; + } + + private async findContainerByName(containerName: string): Promise { + const containers = await this.docker.listContainers({ + all: true, + filters: { + name: [containerName], + }, + }); + + const match = containers.find((container) => { + const names = container.Names; + return names.some((name) => name === `/${containerName}` || name.includes(containerName)); + }); + + if (!match?.Id) { + return null; + } + + return this.docker.getContainer(match.Id) as unknown as ContainerHandle; + } + + private async containerExists(container: ContainerHandle): Promise { + try { + await container.inspect(); + return true; + } catch (error) { + if (this.isDockerNotFound(error)) { + return false; + } + + throw error; + } + } + + private async createContainer( + containerRecord: UserContainerRecord, + token: string, + gatewayPort: number + ): Promise { + const container = await this.docker.createContainer({ + name: containerRecord.containerName, + Image: this.getOpenClawImage(), + Env: [ + `MOSAIC_API_URL=${this.getMosaicApiUrl()}`, + `AGENT_TOKEN=${token}`, + `AGENT_ID=${containerRecord.id}`, + ], + ExposedPorts: { + [OPENCLAW_GATEWAY_PORT_KEY]: {}, + }, + HostConfig: { + Binds: [`${this.getVolumeName(containerRecord.userId)}:${OPENCLAW_STATE_PATH}`], + PortBindings: { + [OPENCLAW_GATEWAY_PORT_KEY]: [{ HostPort: String(gatewayPort) }], + }, + NetworkMode: this.getOpenClawNetwork(), + }, + }); + + return container as unknown as ContainerHandle; + } + + private extractGatewayPort(inspect: DockerInspect): number | null { + const networkPort = inspect.NetworkSettings?.Ports?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort; + if (networkPort) { + return this.parseInteger(networkPort, 0) || null; + } + + const hostPort = inspect.HostConfig?.PortBindings?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort; + if (hostPort) { + return this.parseInteger(hostPort, 0) || null; + } + + return null; + } + + private async findAvailableGatewayPort(): Promise { + const usedPorts = await this.prisma.userContainer.findMany({ + where: { + gatewayPort: { not: null }, + }, + select: { + gatewayPort: true, + }, + }); + + const takenPorts = new Set(); + for (const entry of usedPorts) { + if (entry.gatewayPort !== null) { + takenPorts.add(entry.gatewayPort); + } + } + + let candidate = this.getPortRangeStart(); + while (takenPorts.has(candidate)) { + candidate += 1; + } + + return candidate; + } + + private getGatewayToken(storedToken: string): string { + if (this.crypto.isEncrypted(storedToken)) { + return this.crypto.decrypt(storedToken); + } + + return storedToken; + } + + private parseInteger(value: string | undefined, fallback: number): number { + if (!value) { + return fallback; + } + + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) ? parsed : fallback; + } + + private isDockerNotFound(error: unknown): boolean { + return this.getDockerStatusCode(error) === 404; + } + + private isAlreadyStopped(error: unknown): boolean { + return this.getDockerStatusCode(error) === 304; + } + + private getDockerStatusCode(error: unknown): number | null { + if (typeof error !== "object" || error === null || !("statusCode" in error)) { + return null; + } + + const statusCode = error.statusCode; + return typeof statusCode === "number" ? statusCode : null; + } + + private getErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return "Unknown error"; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2325434..32afd3b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -171,6 +171,9 @@ importers: discord.js: specifier: ^14.25.1 version: 14.25.1 + dockerode: + specifier: ^4.0.9 + version: 4.0.9 gray-matter: specifier: ^4.0.3 version: 4.0.3 @@ -253,6 +256,9 @@ importers: '@types/cookie-parser': specifier: ^1.4.10 version: 1.4.10(@types/express@5.0.6) + '@types/dockerode': + specifier: ^3.3.47 + version: 3.3.47 '@types/express': specifier: ^5.0.1 version: 5.0.6 @@ -1604,7 +1610,6 @@ packages: '@mosaicstack/telemetry-client@0.1.1': resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz} - engines: {node: '>=18'} '@mrleebo/prisma-ast@0.13.1': resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==} -- 2.49.1