import { Injectable, Logger } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import Docker from "dockerode"; import { PrismaService } from "../prisma/prisma.service"; import { CryptoService } from "../crypto/crypto.service"; const DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock"; const DEFAULT_DOCKER_TCP_PORT = 2375; const DEFAULT_OPENCLAW_IMAGE = "alpine/openclaw:latest"; const DEFAULT_OPENCLAW_NETWORK = "mosaic-internal"; const DEFAULT_OPENCLAW_PORT_RANGE_START = 19000; const DEFAULT_MOSAIC_API_URL = "http://mosaic-api:3000/api"; const OPENCLAW_GATEWAY_PORT_KEY = "18789/tcp"; const OPENCLAW_STATE_PATH = "/home/node/.openclaw"; const CONTAINER_STOP_TIMEOUT_SECONDS = 10; interface ContainerHandle { inspect(): Promise; start(): Promise; stop(options?: { t?: number }): Promise; } interface DockerInspect { Id?: string; State?: { Running?: boolean; Health?: { Status?: string; }; }; NetworkSettings?: { Ports?: Record; }; HostConfig?: { PortBindings?: Record; }; } interface UserContainerRecord { id: string; userId: string; containerId: string | null; containerName: string; gatewayPort: number | null; gatewayToken: string; status: string; lastActiveAt: Date | null; idleTimeoutMin: number; } interface ContainerLookup { containerId: string | null; containerName: string; } @Injectable() export class ContainerLifecycleService { private readonly logger = new Logger(ContainerLifecycleService.name); private readonly docker: Docker; constructor( private readonly prisma: PrismaService, private readonly crypto: CryptoService, private readonly config: ConfigService ) { const dockerHost = this.config.get("DOCKER_HOST"); this.docker = this.createDockerClient(dockerHost); } // Ensure a user's container is running. Creates if needed, starts if stopped. // Returns the container's internal URL and gateway token. async ensureRunning(userId: string): Promise<{ url: string; token: string }> { const containerRecord = await this.getOrCreateContainerRecord(userId); const token = this.getGatewayToken(containerRecord.gatewayToken); const existingContainer = await this.resolveContainer(containerRecord); let container: ContainerHandle; if (existingContainer) { container = existingContainer; const inspect = await container.inspect(); if (!inspect.State?.Running) { await container.start(); } } else { const port = await this.findAvailableGatewayPort(); container = await this.createContainer(containerRecord, token, port); await container.start(); } const inspect = await container.inspect(); const containerId = inspect.Id; if (!containerId) { throw new Error( `Docker inspect did not return container ID for ${containerRecord.containerName}` ); } const gatewayPort = this.extractGatewayPort(inspect); if (!gatewayPort) { throw new Error(`Could not determine gateway port for ${containerRecord.containerName}`); } const now = new Date(); await this.prisma.userContainer.update({ where: { userId }, data: { containerId, gatewayPort, status: "running", lastActiveAt: now, }, }); return { url: `http://${containerRecord.containerName}:${String(gatewayPort)}`, token, }; } // Stop a user's container async stop(userId: string): Promise { const containerRecord = await this.prisma.userContainer.findUnique({ where: { userId }, }); if (!containerRecord) { return; } const container = await this.resolveContainer(containerRecord); if (container) { try { await container.stop({ t: CONTAINER_STOP_TIMEOUT_SECONDS }); } catch (error) { if (!this.isDockerNotFound(error) && !this.isAlreadyStopped(error)) { throw error; } } } await this.prisma.userContainer.update({ where: { userId }, data: { status: "stopped", containerId: null, gatewayPort: null, }, }); } // Stop idle containers (called by cron/scheduler) async reapIdle(): Promise<{ stopped: string[] }> { const now = Date.now(); const runningContainers = await this.prisma.userContainer.findMany({ where: { status: "running", lastActiveAt: { not: null }, }, select: { userId: true, lastActiveAt: true, idleTimeoutMin: true, }, }); const stopped: string[] = []; for (const container of runningContainers) { const lastActiveAt = container.lastActiveAt; if (!lastActiveAt) { continue; } const idleLimitMs = container.idleTimeoutMin * 60 * 1000; if (now - lastActiveAt.getTime() < idleLimitMs) { continue; } try { await this.stop(container.userId); stopped.push(container.userId); } catch (error) { this.logger.warn( `Failed to stop idle container for user ${container.userId}: ${this.getErrorMessage(error)}` ); } } return { stopped }; } // Health check all running containers async healthCheckAll(): Promise<{ userId: string; healthy: boolean; error?: string }[]> { const runningContainers = await this.prisma.userContainer.findMany({ where: { status: "running", }, select: { userId: true, containerId: true, containerName: true, }, }); const results: { userId: string; healthy: boolean; error?: string }[] = []; for (const containerRecord of runningContainers) { const container = await this.resolveContainer(containerRecord); if (!container) { results.push({ userId: containerRecord.userId, healthy: false, error: "Container not found", }); continue; } try { const inspect = await container.inspect(); const isRunning = inspect.State?.Running === true; const healthState = inspect.State?.Health?.Status; const healthy = isRunning && healthState !== "unhealthy"; if (healthy) { results.push({ userId: containerRecord.userId, healthy: true, }); continue; } results.push({ userId: containerRecord.userId, healthy: false, error: healthState === "unhealthy" ? "Container healthcheck failed" : "Container not running", }); } catch (error) { results.push({ userId: containerRecord.userId, healthy: false, error: this.getErrorMessage(error), }); } } return results; } // Restart a container with fresh config (for config updates) async restart(userId: string): Promise { await this.stop(userId); await this.ensureRunning(userId); } // Update lastActiveAt timestamp (called on each chat request) async touch(userId: string): Promise { await this.prisma.userContainer.updateMany({ where: { userId }, data: { lastActiveAt: new Date(), }, }); } // Get container status for a user async getStatus( userId: string ): Promise<{ status: string; port?: number; lastActive?: Date } | null> { const container = await this.prisma.userContainer.findUnique({ where: { userId }, select: { status: true, gatewayPort: true, lastActiveAt: true, }, }); if (!container) { return null; } const status: { status: string; port?: number; lastActive?: Date } = { status: container.status, }; if (container.gatewayPort !== null) { status.port = container.gatewayPort; } if (container.lastActiveAt !== null) { status.lastActive = container.lastActiveAt; } return status; } private createDockerClient(dockerHost?: string): Docker { if (!dockerHost || dockerHost.trim().length === 0) { return new Docker({ socketPath: DEFAULT_DOCKER_SOCKET_PATH }); } if (dockerHost.startsWith("unix://")) { return new Docker({ socketPath: dockerHost.slice("unix://".length) }); } if (dockerHost.startsWith("tcp://")) { const parsed = new URL(dockerHost.replace("tcp://", "http://")); return new Docker({ host: parsed.hostname, port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT), protocol: "http", }); } if (dockerHost.startsWith("http://") || dockerHost.startsWith("https://")) { const parsed = new URL(dockerHost); const protocol = parsed.protocol.replace(":", ""); return new Docker({ host: parsed.hostname, port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT), protocol: protocol === "https" ? "https" : "http", }); } return new Docker({ socketPath: dockerHost }); } private async getOrCreateContainerRecord(userId: string): Promise { const existingContainer = await this.prisma.userContainer.findUnique({ where: { userId }, }); if (existingContainer) { return existingContainer; } const token = this.crypto.generateToken(); const containerName = this.getContainerName(userId); return this.prisma.userContainer.create({ data: { userId, containerName, gatewayToken: this.crypto.encrypt(token), status: "stopped", }, }); } private getContainerName(userId: string): string { return `mosaic-user-${userId}`; } private getVolumeName(userId: string): string { return `mosaic-user-${userId}-state`; } private getOpenClawImage(): string { return this.config.get("OPENCLAW_IMAGE") ?? DEFAULT_OPENCLAW_IMAGE; } private getOpenClawNetwork(): string { return this.config.get("OPENCLAW_NETWORK") ?? DEFAULT_OPENCLAW_NETWORK; } private getMosaicApiUrl(): string { return this.config.get("MOSAIC_API_URL") ?? DEFAULT_MOSAIC_API_URL; } private getPortRangeStart(): number { return this.parseInteger( this.config.get("OPENCLAW_PORT_RANGE_START"), DEFAULT_OPENCLAW_PORT_RANGE_START ); } private async resolveContainer(record: ContainerLookup): Promise { if (record.containerId) { const byId = this.docker.getContainer(record.containerId) as unknown as ContainerHandle; if (await this.containerExists(byId)) { return byId; } } const byName = await this.findContainerByName(record.containerName); if (byName) { return byName; } return null; } private async findContainerByName(containerName: string): Promise { const containers = await this.docker.listContainers({ all: true, filters: { name: [containerName], }, }); const match = containers.find((container) => { const names = container.Names; return names.some((name) => name === `/${containerName}` || name.includes(containerName)); }); if (!match?.Id) { return null; } return this.docker.getContainer(match.Id) as unknown as ContainerHandle; } private async containerExists(container: ContainerHandle): Promise { try { await container.inspect(); return true; } catch (error) { if (this.isDockerNotFound(error)) { return false; } throw error; } } private async createContainer( containerRecord: UserContainerRecord, token: string, gatewayPort: number ): Promise { const container = await this.docker.createContainer({ name: containerRecord.containerName, Image: this.getOpenClawImage(), Env: [ `MOSAIC_API_URL=${this.getMosaicApiUrl()}`, `AGENT_TOKEN=${token}`, `AGENT_ID=${containerRecord.id}`, ], ExposedPorts: { [OPENCLAW_GATEWAY_PORT_KEY]: {}, }, HostConfig: { Binds: [`${this.getVolumeName(containerRecord.userId)}:${OPENCLAW_STATE_PATH}`], PortBindings: { [OPENCLAW_GATEWAY_PORT_KEY]: [{ HostPort: String(gatewayPort) }], }, NetworkMode: this.getOpenClawNetwork(), }, }); return container as unknown as ContainerHandle; } private extractGatewayPort(inspect: DockerInspect): number | null { const networkPort = inspect.NetworkSettings?.Ports?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort; if (networkPort) { return this.parseInteger(networkPort, 0) || null; } const hostPort = inspect.HostConfig?.PortBindings?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort; if (hostPort) { return this.parseInteger(hostPort, 0) || null; } return null; } private async findAvailableGatewayPort(): Promise { const usedPorts = await this.prisma.userContainer.findMany({ where: { gatewayPort: { not: null }, }, select: { gatewayPort: true, }, }); const takenPorts = new Set(); for (const entry of usedPorts) { if (entry.gatewayPort !== null) { takenPorts.add(entry.gatewayPort); } } let candidate = this.getPortRangeStart(); while (takenPorts.has(candidate)) { candidate += 1; } return candidate; } private getGatewayToken(storedToken: string): string { if (this.crypto.isEncrypted(storedToken)) { return this.crypto.decrypt(storedToken); } return storedToken; } private parseInteger(value: string | undefined, fallback: number): number { if (!value) { return fallback; } const parsed = Number.parseInt(value, 10); return Number.isFinite(parsed) ? parsed : fallback; } private isDockerNotFound(error: unknown): boolean { return this.getDockerStatusCode(error) === 404; } private isAlreadyStopped(error: unknown): boolean { return this.getDockerStatusCode(error) === 304; } private getDockerStatusCode(error: unknown): number | null { if (typeof error !== "object" || error === null || !("statusCode" in error)) { return null; } const statusCode = error.statusCode; return typeof statusCode === "number" ? statusCode : null; } private getErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; } return "Unknown error"; } }