import { createReadStream, constants as fsConstants } from "node:fs"; import { access, readdir, stat } from "node:fs/promises"; import { homedir } from "node:os"; import * as path from "node:path"; import * as process from "node:process"; import { createInterface } from "node:readline"; const DEFAULT_ENDPOINT = "https://mosaic-api.woltje.com/conversation-archive/ingest"; type IngestRole = "user" | "assistant"; interface IngestMessage { role: IngestRole; content: string; timestamp?: string; } interface IngestPayload { sessionId: string; workspaceId: string; title: string; messages: IngestMessage[]; agentId?: string; } interface CliOptions { workspaceId: string; agentId?: string; since?: Date; sessionsDir?: string; endpoint: string; } interface ParsedSession { sessionId: string; title: string; messages: IngestMessage[]; startedAt?: string; endedAt?: string; parseErrors: number; inferredAgentId?: string; } interface SendResult { ok: boolean; status: number; body: string; } interface IngestSummary { discovered: number; processed: number; ingested: number; skippedSince: number; skippedEmpty: number; skippedDuplicate: number; failed: number; } function printUsage(): void { console.log( [ "Usage:", " pnpm ingest:sessions --workspace-id [--agent-id ] [--since ] [--sessions-dir ] [--endpoint ]", "", "Required:", " --workspace-id Target Mosaic workspace ID", "", "Optional:", " --agent-id Agent ID to include in each ingest payload", " --since Skip sessions before this date/time (ISO8601 or YYYY-MM-DD)", " --sessions-dir Override session directory path", ` --endpoint Ingest endpoint (default: ${DEFAULT_ENDPOINT})`, ].join("\n") ); } function expandHomePath(inputPath: string): string { if (inputPath === "~") { return homedir(); } if (inputPath.startsWith("~/")) { return path.join(homedir(), inputPath.slice(2)); } return inputPath; } function parseSinceDate(rawDate: string): Date { const parsed = new Date(rawDate); if (Number.isNaN(parsed.getTime())) { throw new Error(`Invalid --since date: "${rawDate}". Use ISO8601 or YYYY-MM-DD.`); } return parsed; } function parseCliArgs(args: string[]): CliOptions { let workspaceId: string | null = null; let agentId: string | undefined; let since: Date | undefined; let sessionsDir: string | undefined; let endpoint = DEFAULT_ENDPOINT; for (let index = 0; index < args.length; index += 1) { const arg = args[index]; if (arg === "--help" || arg === "-h") { printUsage(); process.exit(0); } if (arg.startsWith("--workspace-id=")) { workspaceId = arg.slice("--workspace-id=".length); continue; } if (arg === "--workspace-id") { const value = args[index + 1]; if (!value) { throw new Error("Missing value for --workspace-id"); } workspaceId = value; index += 1; continue; } if (arg.startsWith("--agent-id=")) { agentId = arg.slice("--agent-id=".length); continue; } if (arg === "--agent-id") { const value = args[index + 1]; if (!value) { throw new Error("Missing value for --agent-id"); } agentId = value; index += 1; continue; } if (arg.startsWith("--since=")) { since = parseSinceDate(arg.slice("--since=".length)); continue; } if (arg === "--since") { const value = args[index + 1]; if (!value) { throw new Error("Missing value for --since"); } since = parseSinceDate(value); index += 1; continue; } if (arg.startsWith("--sessions-dir=")) { sessionsDir = arg.slice("--sessions-dir=".length); continue; } if (arg === "--sessions-dir") { const value = args[index + 1]; if (!value) { throw new Error("Missing value for --sessions-dir"); } sessionsDir = value; index += 1; continue; } if (arg.startsWith("--endpoint=")) { endpoint = arg.slice("--endpoint=".length); continue; } if (arg === "--endpoint") { const value = args[index + 1]; if (!value) { throw new Error("Missing value for --endpoint"); } endpoint = value; index += 1; continue; } throw new Error(`Unknown flag: ${arg}`); } if (!workspaceId || workspaceId.trim().length === 0) { throw new Error("--workspace-id is required"); } return { workspaceId: workspaceId.trim(), agentId: agentId?.trim(), since, sessionsDir: sessionsDir ? path.resolve(expandHomePath(sessionsDir)) : undefined, endpoint, }; } function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null; } function asString(value: unknown): string | null { return typeof value === "string" ? value : null; } function normalizeIsoTimestamp(value: unknown): string | null { if (typeof value === "string") { const parsed = new Date(value); if (!Number.isNaN(parsed.getTime())) { return parsed.toISOString(); } return null; } if (typeof value === "number" && Number.isFinite(value)) { const millis = value >= 1_000_000_000_000 ? value : value * 1000; const parsed = new Date(millis); if (!Number.isNaN(parsed.getTime())) { return parsed.toISOString(); } } return null; } function truncate(value: string, maxLength: number): string { if (value.length <= maxLength) { return value; } return `${value.slice(0, maxLength - 3)}...`; } function deriveTitle(content: string, fallbackSessionId: string): string { const firstLine = content .split(/\r?\n/u) .map((line) => line.trim()) .find((line) => line.length > 0); if (!firstLine) { return `OpenClaw session ${fallbackSessionId}`; } const normalized = firstLine.replace(/\s+/gu, " ").trim(); return truncate(normalized, 140); } function extractTextContent(content: unknown): string { if (typeof content === "string") { return content.trim(); } if (Array.isArray(content)) { const parts: string[] = []; for (const item of content) { if (typeof item === "string") { const trimmed = item.trim(); if (trimmed.length > 0) { parts.push(trimmed); } continue; } if (!isRecord(item)) { continue; } const itemType = asString(item.type); if (itemType !== null && itemType !== "text") { continue; } const textValue = asString(item.text); if (textValue && textValue.trim().length > 0) { parts.push(textValue.trim()); } } return parts.join("\n\n").trim(); } if (isRecord(content)) { const textValue = asString(content.text); if (textValue) { return textValue.trim(); } } return ""; } function inferAgentIdFromPath(filePath: string): string | null { const pathParts = filePath.split(path.sep); const agentsIndex = pathParts.lastIndexOf("agents"); if (agentsIndex < 0) { return null; } const candidate = pathParts[agentsIndex + 1]; return candidate && candidate.trim().length > 0 ? candidate : null; } async function parseSessionFile(filePath: string): Promise { const fallbackSessionId = path.basename(filePath, path.extname(filePath)); const inferredAgentId = inferAgentIdFromPath(filePath) ?? undefined; let sessionId = fallbackSessionId; let title: string | null = null; let startedAt: string | undefined; let endedAt: string | undefined; let parseErrors = 0; const messages: IngestMessage[] = []; const readStream = createReadStream(filePath, { encoding: "utf8" }); const lineReader = createInterface({ input: readStream, crlfDelay: Number.POSITIVE_INFINITY, }); for await (const rawLine of lineReader) { const line = rawLine.trim(); if (line.length === 0) { continue; } let parsedLine: unknown; try { parsedLine = JSON.parse(line) as unknown; } catch { parseErrors += 1; continue; } if (!isRecord(parsedLine)) { parseErrors += 1; continue; } const eventType = asString(parsedLine.type); if (eventType === "session") { const rawSessionId = asString(parsedLine.id); if (rawSessionId && rawSessionId.trim().length > 0) { sessionId = rawSessionId; } const sessionTimestamp = normalizeIsoTimestamp(parsedLine.timestamp); if (sessionTimestamp) { startedAt ??= sessionTimestamp; } continue; } if (eventType !== "message") { continue; } const messageRecord = parsedLine.message; if (!isRecord(messageRecord)) { continue; } const role = asString(messageRecord.role); if (role !== "user" && role !== "assistant") { continue; } const content = extractTextContent(messageRecord.content); if (content.length === 0) { continue; } const timestamp = normalizeIsoTimestamp(messageRecord.timestamp) ?? normalizeIsoTimestamp(parsedLine.timestamp); const message: IngestMessage = { role, content, timestamp: timestamp ?? undefined, }; messages.push(message); if (!title && role === "user") { title = deriveTitle(content, sessionId); } if (!startedAt && timestamp) { startedAt = timestamp; } if (timestamp) { endedAt = timestamp; } } return { sessionId, title: title ?? `OpenClaw session ${sessionId}`, messages, startedAt, endedAt, parseErrors, inferredAgentId, }; } async function pathExists(candidatePath: string): Promise { try { await access(candidatePath, fsConstants.F_OK); return true; } catch { return false; } } async function discoverSessionDirectories(overrideDir?: string): Promise { if (overrideDir) { if (!(await pathExists(overrideDir))) { throw new Error(`Provided --sessions-dir does not exist: ${overrideDir}`); } return [overrideDir]; } const defaultDir = path.join(homedir(), ".openclaw", "sessions"); if (await pathExists(defaultDir)) { return [defaultDir]; } const agentsRoot = path.join(homedir(), ".openclaw", "agents"); if (!(await pathExists(agentsRoot))) { return []; } const agentEntries = await readdir(agentsRoot, { withFileTypes: true }); const directories: string[] = []; for (const entry of agentEntries) { if (!entry.isDirectory()) { continue; } const sessionsDir = path.join(agentsRoot, entry.name, "sessions"); if (await pathExists(sessionsDir)) { directories.push(sessionsDir); } } return directories.sort((left, right) => left.localeCompare(right)); } async function discoverSessionFiles(overrideDir?: string): Promise { const directories = await discoverSessionDirectories(overrideDir); const files: string[] = []; for (const directory of directories) { const entries = await readdir(directory, { withFileTypes: true }); for (const entry of entries) { if (!entry.isFile() || !entry.name.endsWith(".jsonl")) { continue; } files.push(path.join(directory, entry.name)); } } return files.sort((left, right) => left.localeCompare(right)); } async function resolveSessionTimestamp(session: ParsedSession, filePath: string): Promise { const sessionTimestamp = session.startedAt ?? session.endedAt; if (sessionTimestamp) { const parsed = new Date(sessionTimestamp); if (!Number.isNaN(parsed.getTime())) { return parsed; } } const fileStat = await stat(filePath); return fileStat.mtime; } function buildPayload( options: CliOptions, session: ParsedSession, fallbackAgentId: string | undefined ): IngestPayload { const payload: IngestPayload = { sessionId: session.sessionId, workspaceId: options.workspaceId, title: session.title, messages: session.messages, }; const selectedAgentId = options.agentId ?? fallbackAgentId; if (selectedAgentId && selectedAgentId.trim().length > 0) { payload.agentId = selectedAgentId.trim(); } return payload; } async function sendIngestRequest( endpoint: string, token: string, payload: IngestPayload ): Promise { const response = await fetch(endpoint, { method: "POST", headers: { Authorization: `Bearer ${token}`, "Content-Type": "application/json", }, body: JSON.stringify(payload), }); const body = await response.text(); return { ok: response.ok, status: response.status, body, }; } function summarizeFailureBody(body: string): string { const compact = body.replace(/\s+/gu, " ").trim(); if (compact.length === 0) { return "(empty response body)"; } return truncate(compact, 220); } async function main(): Promise { const options = parseCliArgs(process.argv.slice(2)); const token = process.env.MOSAIC_API_TOKEN; if (!token || token.trim().length === 0) { throw new Error("MOSAIC_API_TOKEN environment variable is required."); } const sessionFiles = await discoverSessionFiles(options.sessionsDir); if (sessionFiles.length === 0) { console.log("No OpenClaw session files found."); return; } console.log(`Discovered ${sessionFiles.length} session file(s).`); if (options.since) { console.log(`Applying --since filter at ${options.since.toISOString()}.`); } const summary: IngestSummary = { discovered: sessionFiles.length, processed: 0, ingested: 0, skippedSince: 0, skippedEmpty: 0, skippedDuplicate: 0, failed: 0, }; for (const [index, filePath] of sessionFiles.entries()) { const position = `${index + 1}/${sessionFiles.length}`; const parsedSession = await parseSessionFile(filePath); summary.processed += 1; if (parsedSession.messages.length === 0) { summary.skippedEmpty += 1; console.log( `[${position}] Skipped ${parsedSession.sessionId}: no user/assistant text messages.` ); continue; } const sessionDate = await resolveSessionTimestamp(parsedSession, filePath); if (options.since && sessionDate.getTime() < options.since.getTime()) { summary.skippedSince += 1; console.log( `[${position}] Skipped ${parsedSession.sessionId}: session is before --since (${sessionDate.toISOString()}).` ); continue; } const payload = buildPayload(options, parsedSession, parsedSession.inferredAgentId); let result: SendResult; try { result = await sendIngestRequest(options.endpoint, token, payload); } catch (error) { summary.failed += 1; const message = error instanceof Error ? error.message : String(error); console.error(`[${position}] Failed ${parsedSession.sessionId}: request error: ${message}`); continue; } if (result.ok) { summary.ingested += 1; const note = parsedSession.parseErrors > 0 ? ` (parse warnings: ${parsedSession.parseErrors})` : ""; console.log( `[${position}] Ingested ${parsedSession.sessionId} (${parsedSession.messages.length} messages)${note}.` ); continue; } if (result.status === 409) { summary.skippedDuplicate += 1; console.log(`[${position}] Skipped ${parsedSession.sessionId}: already exists (409).`); continue; } summary.failed += 1; console.error( `[${position}] Failed ${parsedSession.sessionId}: HTTP ${result.status} ${summarizeFailureBody(result.body)}` ); } console.log("\nIngestion summary:"); console.log(` Discovered: ${summary.discovered}`); console.log(` Processed: ${summary.processed}`); console.log(` Ingested: ${summary.ingested}`); console.log(` Skipped (--since): ${summary.skippedSince}`); console.log(` Skipped (empty): ${summary.skippedEmpty}`); console.log(` Skipped (duplicate): ${summary.skippedDuplicate}`); console.log(` Failed: ${summary.failed}`); if (summary.failed > 0) { process.exit(1); } } main().catch((error: unknown) => { const message = error instanceof Error ? error.message : String(error); console.error(`Fatal error: ${message}`); process.exit(1); });