Files
stack/apps/api/src/container-lifecycle/container-lifecycle.service.ts
Jason Woltje c25e753f35
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
feat(api): ContainerLifecycleService (MS22-P1d) (#610)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 15:24:42 +00:00

533 lines
14 KiB
TypeScript

import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import Docker from "dockerode";
import { PrismaService } from "../prisma/prisma.service";
import { CryptoService } from "../crypto/crypto.service";
const DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock";
const DEFAULT_DOCKER_TCP_PORT = 2375;
const DEFAULT_OPENCLAW_IMAGE = "alpine/openclaw:latest";
const DEFAULT_OPENCLAW_NETWORK = "mosaic-internal";
const DEFAULT_OPENCLAW_PORT_RANGE_START = 19000;
const DEFAULT_MOSAIC_API_URL = "http://mosaic-api:3000/api";
const OPENCLAW_GATEWAY_PORT_KEY = "18789/tcp";
const OPENCLAW_STATE_PATH = "/home/node/.openclaw";
const CONTAINER_STOP_TIMEOUT_SECONDS = 10;
interface ContainerHandle {
inspect(): Promise<DockerInspect>;
start(): Promise<void>;
stop(options?: { t?: number }): Promise<void>;
}
interface DockerInspect {
Id?: string;
State?: {
Running?: boolean;
Health?: {
Status?: string;
};
};
NetworkSettings?: {
Ports?: Record<string, { HostPort?: string }[] | null>;
};
HostConfig?: {
PortBindings?: Record<string, { HostPort?: string }[] | null>;
};
}
interface UserContainerRecord {
id: string;
userId: string;
containerId: string | null;
containerName: string;
gatewayPort: number | null;
gatewayToken: string;
status: string;
lastActiveAt: Date | null;
idleTimeoutMin: number;
}
interface ContainerLookup {
containerId: string | null;
containerName: string;
}
@Injectable()
export class ContainerLifecycleService {
private readonly logger = new Logger(ContainerLifecycleService.name);
private readonly docker: Docker;
constructor(
private readonly prisma: PrismaService,
private readonly crypto: CryptoService,
private readonly config: ConfigService
) {
const dockerHost = this.config.get<string>("DOCKER_HOST");
this.docker = this.createDockerClient(dockerHost);
}
// Ensure a user's container is running. Creates if needed, starts if stopped.
// Returns the container's internal URL and gateway token.
async ensureRunning(userId: string): Promise<{ url: string; token: string }> {
const containerRecord = await this.getOrCreateContainerRecord(userId);
const token = this.getGatewayToken(containerRecord.gatewayToken);
const existingContainer = await this.resolveContainer(containerRecord);
let container: ContainerHandle;
if (existingContainer) {
container = existingContainer;
const inspect = await container.inspect();
if (!inspect.State?.Running) {
await container.start();
}
} else {
const port = await this.findAvailableGatewayPort();
container = await this.createContainer(containerRecord, token, port);
await container.start();
}
const inspect = await container.inspect();
const containerId = inspect.Id;
if (!containerId) {
throw new Error(
`Docker inspect did not return container ID for ${containerRecord.containerName}`
);
}
const gatewayPort = this.extractGatewayPort(inspect);
if (!gatewayPort) {
throw new Error(`Could not determine gateway port for ${containerRecord.containerName}`);
}
const now = new Date();
await this.prisma.userContainer.update({
where: { userId },
data: {
containerId,
gatewayPort,
status: "running",
lastActiveAt: now,
},
});
return {
url: `http://${containerRecord.containerName}:${String(gatewayPort)}`,
token,
};
}
// Stop a user's container
async stop(userId: string): Promise<void> {
const containerRecord = await this.prisma.userContainer.findUnique({
where: { userId },
});
if (!containerRecord) {
return;
}
const container = await this.resolveContainer(containerRecord);
if (container) {
try {
await container.stop({ t: CONTAINER_STOP_TIMEOUT_SECONDS });
} catch (error) {
if (!this.isDockerNotFound(error) && !this.isAlreadyStopped(error)) {
throw error;
}
}
}
await this.prisma.userContainer.update({
where: { userId },
data: {
status: "stopped",
containerId: null,
gatewayPort: null,
},
});
}
// Stop idle containers (called by cron/scheduler)
async reapIdle(): Promise<{ stopped: string[] }> {
const now = Date.now();
const runningContainers = await this.prisma.userContainer.findMany({
where: {
status: "running",
lastActiveAt: { not: null },
},
select: {
userId: true,
lastActiveAt: true,
idleTimeoutMin: true,
},
});
const stopped: string[] = [];
for (const container of runningContainers) {
const lastActiveAt = container.lastActiveAt;
if (!lastActiveAt) {
continue;
}
const idleLimitMs = container.idleTimeoutMin * 60 * 1000;
if (now - lastActiveAt.getTime() < idleLimitMs) {
continue;
}
try {
await this.stop(container.userId);
stopped.push(container.userId);
} catch (error) {
this.logger.warn(
`Failed to stop idle container for user ${container.userId}: ${this.getErrorMessage(error)}`
);
}
}
return { stopped };
}
// Health check all running containers
async healthCheckAll(): Promise<{ userId: string; healthy: boolean; error?: string }[]> {
const runningContainers = await this.prisma.userContainer.findMany({
where: {
status: "running",
},
select: {
userId: true,
containerId: true,
containerName: true,
},
});
const results: { userId: string; healthy: boolean; error?: string }[] = [];
for (const containerRecord of runningContainers) {
const container = await this.resolveContainer(containerRecord);
if (!container) {
results.push({
userId: containerRecord.userId,
healthy: false,
error: "Container not found",
});
continue;
}
try {
const inspect = await container.inspect();
const isRunning = inspect.State?.Running === true;
const healthState = inspect.State?.Health?.Status;
const healthy = isRunning && healthState !== "unhealthy";
if (healthy) {
results.push({
userId: containerRecord.userId,
healthy: true,
});
continue;
}
results.push({
userId: containerRecord.userId,
healthy: false,
error:
healthState === "unhealthy" ? "Container healthcheck failed" : "Container not running",
});
} catch (error) {
results.push({
userId: containerRecord.userId,
healthy: false,
error: this.getErrorMessage(error),
});
}
}
return results;
}
// Restart a container with fresh config (for config updates)
async restart(userId: string): Promise<void> {
await this.stop(userId);
await this.ensureRunning(userId);
}
// Update lastActiveAt timestamp (called on each chat request)
async touch(userId: string): Promise<void> {
await this.prisma.userContainer.updateMany({
where: { userId },
data: {
lastActiveAt: new Date(),
},
});
}
// Get container status for a user
async getStatus(
userId: string
): Promise<{ status: string; port?: number; lastActive?: Date } | null> {
const container = await this.prisma.userContainer.findUnique({
where: { userId },
select: {
status: true,
gatewayPort: true,
lastActiveAt: true,
},
});
if (!container) {
return null;
}
const status: { status: string; port?: number; lastActive?: Date } = {
status: container.status,
};
if (container.gatewayPort !== null) {
status.port = container.gatewayPort;
}
if (container.lastActiveAt !== null) {
status.lastActive = container.lastActiveAt;
}
return status;
}
private createDockerClient(dockerHost?: string): Docker {
if (!dockerHost || dockerHost.trim().length === 0) {
return new Docker({ socketPath: DEFAULT_DOCKER_SOCKET_PATH });
}
if (dockerHost.startsWith("unix://")) {
return new Docker({ socketPath: dockerHost.slice("unix://".length) });
}
if (dockerHost.startsWith("tcp://")) {
const parsed = new URL(dockerHost.replace("tcp://", "http://"));
return new Docker({
host: parsed.hostname,
port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT),
protocol: "http",
});
}
if (dockerHost.startsWith("http://") || dockerHost.startsWith("https://")) {
const parsed = new URL(dockerHost);
const protocol = parsed.protocol.replace(":", "");
return new Docker({
host: parsed.hostname,
port: this.parseInteger(parsed.port, DEFAULT_DOCKER_TCP_PORT),
protocol: protocol === "https" ? "https" : "http",
});
}
return new Docker({ socketPath: dockerHost });
}
private async getOrCreateContainerRecord(userId: string): Promise<UserContainerRecord> {
const existingContainer = await this.prisma.userContainer.findUnique({
where: { userId },
});
if (existingContainer) {
return existingContainer;
}
const token = this.crypto.generateToken();
const containerName = this.getContainerName(userId);
return this.prisma.userContainer.create({
data: {
userId,
containerName,
gatewayToken: this.crypto.encrypt(token),
status: "stopped",
},
});
}
private getContainerName(userId: string): string {
return `mosaic-user-${userId}`;
}
private getVolumeName(userId: string): string {
return `mosaic-user-${userId}-state`;
}
private getOpenClawImage(): string {
return this.config.get<string>("OPENCLAW_IMAGE") ?? DEFAULT_OPENCLAW_IMAGE;
}
private getOpenClawNetwork(): string {
return this.config.get<string>("OPENCLAW_NETWORK") ?? DEFAULT_OPENCLAW_NETWORK;
}
private getMosaicApiUrl(): string {
return this.config.get<string>("MOSAIC_API_URL") ?? DEFAULT_MOSAIC_API_URL;
}
private getPortRangeStart(): number {
return this.parseInteger(
this.config.get<string>("OPENCLAW_PORT_RANGE_START"),
DEFAULT_OPENCLAW_PORT_RANGE_START
);
}
private async resolveContainer(record: ContainerLookup): Promise<ContainerHandle | null> {
if (record.containerId) {
const byId = this.docker.getContainer(record.containerId) as unknown as ContainerHandle;
if (await this.containerExists(byId)) {
return byId;
}
}
const byName = await this.findContainerByName(record.containerName);
if (byName) {
return byName;
}
return null;
}
private async findContainerByName(containerName: string): Promise<ContainerHandle | null> {
const containers = await this.docker.listContainers({
all: true,
filters: {
name: [containerName],
},
});
const match = containers.find((container) => {
const names = container.Names;
return names.some((name) => name === `/${containerName}` || name.includes(containerName));
});
if (!match?.Id) {
return null;
}
return this.docker.getContainer(match.Id) as unknown as ContainerHandle;
}
private async containerExists(container: ContainerHandle): Promise<boolean> {
try {
await container.inspect();
return true;
} catch (error) {
if (this.isDockerNotFound(error)) {
return false;
}
throw error;
}
}
private async createContainer(
containerRecord: UserContainerRecord,
token: string,
gatewayPort: number
): Promise<ContainerHandle> {
const container = await this.docker.createContainer({
name: containerRecord.containerName,
Image: this.getOpenClawImage(),
Env: [
`MOSAIC_API_URL=${this.getMosaicApiUrl()}`,
`AGENT_TOKEN=${token}`,
`AGENT_ID=${containerRecord.id}`,
],
ExposedPorts: {
[OPENCLAW_GATEWAY_PORT_KEY]: {},
},
HostConfig: {
Binds: [`${this.getVolumeName(containerRecord.userId)}:${OPENCLAW_STATE_PATH}`],
PortBindings: {
[OPENCLAW_GATEWAY_PORT_KEY]: [{ HostPort: String(gatewayPort) }],
},
NetworkMode: this.getOpenClawNetwork(),
},
});
return container as unknown as ContainerHandle;
}
private extractGatewayPort(inspect: DockerInspect): number | null {
const networkPort = inspect.NetworkSettings?.Ports?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort;
if (networkPort) {
return this.parseInteger(networkPort, 0) || null;
}
const hostPort = inspect.HostConfig?.PortBindings?.[OPENCLAW_GATEWAY_PORT_KEY]?.[0]?.HostPort;
if (hostPort) {
return this.parseInteger(hostPort, 0) || null;
}
return null;
}
private async findAvailableGatewayPort(): Promise<number> {
const usedPorts = await this.prisma.userContainer.findMany({
where: {
gatewayPort: { not: null },
},
select: {
gatewayPort: true,
},
});
const takenPorts = new Set<number>();
for (const entry of usedPorts) {
if (entry.gatewayPort !== null) {
takenPorts.add(entry.gatewayPort);
}
}
let candidate = this.getPortRangeStart();
while (takenPorts.has(candidate)) {
candidate += 1;
}
return candidate;
}
private getGatewayToken(storedToken: string): string {
if (this.crypto.isEncrypted(storedToken)) {
return this.crypto.decrypt(storedToken);
}
return storedToken;
}
private parseInteger(value: string | undefined, fallback: number): number {
if (!value) {
return fallback;
}
const parsed = Number.parseInt(value, 10);
return Number.isFinite(parsed) ? parsed : fallback;
}
private isDockerNotFound(error: unknown): boolean {
return this.getDockerStatusCode(error) === 404;
}
private isAlreadyStopped(error: unknown): boolean {
return this.getDockerStatusCode(error) === 304;
}
private getDockerStatusCode(error: unknown): number | null {
if (typeof error !== "object" || error === null || !("statusCode" in error)) {
return null;
}
const statusCode = error.statusCode;
return typeof statusCode === "number" ? statusCode : null;
}
private getErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return "Unknown error";
}
}