From c640d2239473ceab5c7f91c77ec1212e6f4f99fb Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 28 Feb 2026 21:01:03 -0600 Subject: [PATCH 1/2] chore(orchestrator): add MS22 Phase 0 tasks to TASKS.md --- docs/TASKS.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/TASKS.md b/docs/TASKS.md index 03f45c7..ba9d35f 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -52,3 +52,22 @@ | **Total** | **31** | **15** | **~371K** | **~175K** | Remaining estimate: ~143K tokens (Codex budget). + +## MS22 — Fleet Evolution (Phase 0: Knowledge Layer) + +| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | +| --------------- | ----------- | ------------ | ------------------------------------------------------------ | -------- | ----- | ------------------------------ | --------------------------------------------------------- | ------------- | ------------ | ---------- | ------------ | -------- | ---- | --------------------------------------------- | +| MS22-PLAN-001 | done | p0-knowledge | PRD + mission bootstrap + TASKS.md | TASKS:P0 | stack | feat/ms22-knowledge-schema | — | MS22-DB-001 | orchestrator | 2026-02-28 | 2026-02-28 | 10K | 8K | PRD-MS22.md, mission fleet-evolution-20260228 | +| MS22-DB-001 | done | p0-knowledge | Findings module (pgvector, CRUD, similarity search) | TASKS:P0 | api | feat/ms22-findings | MS22-PLAN-001 | — | codex | 2026-02-28 | 2026-02-28 | 20K | ~22K | PR #585 merged, CI green | +| MS22-API-001 | done | p0-knowledge | Findings API endpoints | TASKS:P0 | api | feat/ms22-findings | MS22-DB-001 | — | codex | 2026-02-28 | 2026-02-28 | — | — | Combined with DB-001 | +| MS22-DB-002 | done | p0-knowledge | AgentMemory module (key/value store, upsert) | TASKS:P0 | api | feat/ms22-agent-memory | MS22-DB-001 | — | codex | 2026-02-28 | 2026-02-28 | 15K | ~16K | PR #586 merged, CI green | +| MS22-API-002 | done | p0-knowledge | AgentMemory API endpoints | TASKS:P0 | api | feat/ms22-agent-memory | MS22-DB-002 | — | codex | 2026-02-28 | 2026-02-28 | — | — | Combined with DB-002 | +| MS22-DB-004 | done | p0-knowledge | ConversationArchive module (pgvector, ingest, search) | TASKS:P0 | api | feat/ms22-conversation-archive | MS22-DB-001 | — | codex | 2026-02-28 | 2026-02-28 | 20K | ~18K | PR #587 merged, CI green | +| MS22-API-004 | done | p0-knowledge | ConversationArchive API endpoints | TASKS:P0 | api | feat/ms22-conversation-archive | MS22-DB-004 | — | codex | 2026-02-28 | 2026-02-28 | — | — | Combined with DB-004 | +| MS22-API-005 | done | p0-knowledge | EmbeddingService (reuse existing KnowledgeModule) | TASKS:P0 | api | — | — | — | orchestrator | 2026-02-28 | 2026-02-28 | 0 | 0 | Already existed; no work needed | +| MS22-DB-003 | not-started | p0-knowledge | Task model: add assigned_agent field + migration | TASKS:P0 | api | feat/ms22-task-agent | MS22-DB-001 | MS22-API-003 | — | — | — | 8K | — | Small schema + migration only | +| MS22-API-003 | not-started | p0-knowledge | Task API: expose assigned_agent in CRUD | TASKS:P0 | api | feat/ms22-task-agent | MS22-DB-003 | MS22-TEST-001 | — | — | — | 8K | — | Extend existing TaskModule | +| MS22-TEST-001 | not-started | p0-knowledge | Integration tests: Findings + AgentMemory + ConvArchive | TASKS:P0 | api | test/ms22-integration | MS22-API-001,MS22-API-002,MS22-API-004 | MS22-VER-P0 | — | — | — | 20K | — | E2E with live postgres | +| MS22-SKILL-001 | not-started | p0-knowledge | OpenClaw mosaic skill (agents read/write findings/memory) | TASKS:P0 | stack | feat/ms22-openclaw-skill | MS22-API-001,MS22-API-002 | MS22-VER-P0 | — | — | — | 15K | — | Skill in ~/.agents/skills/mosaic/ | +| MS22-INGEST-001 | not-started | p0-knowledge | Session log ingestion pipeline (OpenClaw logs → ConvArchive) | TASKS:P0 | stack | feat/ms22-ingest | MS22-API-004 | MS22-VER-P0 | — | — | — | 20K | — | Script to batch-ingest existing logs | +| MS22-VER-P0 | not-started | p0-knowledge | Phase 0 verification: all modules deployed + smoke tested | TASKS:P0 | stack | — | MS22-TEST-001,MS22-SKILL-001,MS22-INGEST-001,MS22-API-003 | — | — | — | — | 5K | — | | -- 2.49.1 From 7c7a821b0fcb83c76ae3621cefc8211336cd9fc4 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 28 Feb 2026 21:18:08 -0600 Subject: [PATCH 2/2] feat: add OpenClaw session log ingestion script (MS22-INGEST-001) --- package.json | 3 +- scripts/ingest-openclaw-sessions.ts | 621 ++++++++++++++++++++++++++++ 2 files changed, 623 insertions(+), 1 deletion(-) create mode 100644 scripts/ingest-openclaw-sessions.ts diff --git a/package.json b/package.json index 0a16e68..1b6a785 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,8 @@ "docker:ps": "docker compose ps", "docker:build": "docker compose build", "docker:restart": "docker compose restart", - "prepare": "husky || true" + "prepare": "husky || true", + "ingest:sessions": "tsx scripts/ingest-openclaw-sessions.ts" }, "devDependencies": { "@typescript-eslint/eslint-plugin": "^8.26.0", diff --git a/scripts/ingest-openclaw-sessions.ts b/scripts/ingest-openclaw-sessions.ts new file mode 100644 index 0000000..2d7c93d --- /dev/null +++ b/scripts/ingest-openclaw-sessions.ts @@ -0,0 +1,621 @@ +import { createReadStream, constants as fsConstants } from "node:fs"; +import { access, readdir, stat } from "node:fs/promises"; +import { homedir } from "node:os"; +import * as path from "node:path"; +import * as process from "node:process"; +import { createInterface } from "node:readline"; + +const DEFAULT_ENDPOINT = "https://mosaic-api.woltje.com/conversation-archive/ingest"; + +type IngestRole = "user" | "assistant"; + +interface IngestMessage { + role: IngestRole; + content: string; + timestamp?: string; +} + +interface IngestPayload { + sessionId: string; + workspaceId: string; + title: string; + messages: IngestMessage[]; + agentId?: string; +} + +interface CliOptions { + workspaceId: string; + agentId?: string; + since?: Date; + sessionsDir?: string; + endpoint: string; +} + +interface ParsedSession { + sessionId: string; + title: string; + messages: IngestMessage[]; + startedAt?: string; + endedAt?: string; + parseErrors: number; + inferredAgentId?: string; +} + +interface SendResult { + ok: boolean; + status: number; + body: string; +} + +interface IngestSummary { + discovered: number; + processed: number; + ingested: number; + skippedSince: number; + skippedEmpty: number; + skippedDuplicate: number; + failed: number; +} + +function printUsage(): void { + console.log( + [ + "Usage:", + " pnpm ingest:sessions --workspace-id [--agent-id ] [--since ] [--sessions-dir ] [--endpoint ]", + "", + "Required:", + " --workspace-id Target Mosaic workspace ID", + "", + "Optional:", + " --agent-id Agent ID to include in each ingest payload", + " --since Skip sessions before this date/time (ISO8601 or YYYY-MM-DD)", + " --sessions-dir Override session directory path", + ` --endpoint Ingest endpoint (default: ${DEFAULT_ENDPOINT})`, + ].join("\n") + ); +} + +function expandHomePath(inputPath: string): string { + if (inputPath === "~") { + return homedir(); + } + if (inputPath.startsWith("~/")) { + return path.join(homedir(), inputPath.slice(2)); + } + return inputPath; +} + +function parseSinceDate(rawDate: string): Date { + const parsed = new Date(rawDate); + if (Number.isNaN(parsed.getTime())) { + throw new Error(`Invalid --since date: "${rawDate}". Use ISO8601 or YYYY-MM-DD.`); + } + return parsed; +} + +function parseCliArgs(args: string[]): CliOptions { + let workspaceId: string | null = null; + let agentId: string | undefined; + let since: Date | undefined; + let sessionsDir: string | undefined; + let endpoint = DEFAULT_ENDPOINT; + + for (let index = 0; index < args.length; index += 1) { + const arg = args[index]; + + if (arg === "--help" || arg === "-h") { + printUsage(); + process.exit(0); + } + + if (arg.startsWith("--workspace-id=")) { + workspaceId = arg.slice("--workspace-id=".length); + continue; + } + if (arg === "--workspace-id") { + const value = args[index + 1]; + if (!value) { + throw new Error("Missing value for --workspace-id"); + } + workspaceId = value; + index += 1; + continue; + } + + if (arg.startsWith("--agent-id=")) { + agentId = arg.slice("--agent-id=".length); + continue; + } + if (arg === "--agent-id") { + const value = args[index + 1]; + if (!value) { + throw new Error("Missing value for --agent-id"); + } + agentId = value; + index += 1; + continue; + } + + if (arg.startsWith("--since=")) { + since = parseSinceDate(arg.slice("--since=".length)); + continue; + } + if (arg === "--since") { + const value = args[index + 1]; + if (!value) { + throw new Error("Missing value for --since"); + } + since = parseSinceDate(value); + index += 1; + continue; + } + + if (arg.startsWith("--sessions-dir=")) { + sessionsDir = arg.slice("--sessions-dir=".length); + continue; + } + if (arg === "--sessions-dir") { + const value = args[index + 1]; + if (!value) { + throw new Error("Missing value for --sessions-dir"); + } + sessionsDir = value; + index += 1; + continue; + } + + if (arg.startsWith("--endpoint=")) { + endpoint = arg.slice("--endpoint=".length); + continue; + } + if (arg === "--endpoint") { + const value = args[index + 1]; + if (!value) { + throw new Error("Missing value for --endpoint"); + } + endpoint = value; + index += 1; + continue; + } + + throw new Error(`Unknown flag: ${arg}`); + } + + if (!workspaceId || workspaceId.trim().length === 0) { + throw new Error("--workspace-id is required"); + } + + return { + workspaceId: workspaceId.trim(), + agentId: agentId?.trim(), + since, + sessionsDir: sessionsDir ? path.resolve(expandHomePath(sessionsDir)) : undefined, + endpoint, + }; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null; +} + +function asString(value: unknown): string | null { + return typeof value === "string" ? value : null; +} + +function normalizeIsoTimestamp(value: unknown): string | null { + if (typeof value === "string") { + const parsed = new Date(value); + if (!Number.isNaN(parsed.getTime())) { + return parsed.toISOString(); + } + return null; + } + + if (typeof value === "number" && Number.isFinite(value)) { + const millis = value >= 1_000_000_000_000 ? value : value * 1000; + const parsed = new Date(millis); + if (!Number.isNaN(parsed.getTime())) { + return parsed.toISOString(); + } + } + + return null; +} + +function truncate(value: string, maxLength: number): string { + if (value.length <= maxLength) { + return value; + } + return `${value.slice(0, maxLength - 3)}...`; +} + +function deriveTitle(content: string, fallbackSessionId: string): string { + const firstLine = content + .split(/\r?\n/u) + .map((line) => line.trim()) + .find((line) => line.length > 0); + + if (!firstLine) { + return `OpenClaw session ${fallbackSessionId}`; + } + + const normalized = firstLine.replace(/\s+/gu, " ").trim(); + return truncate(normalized, 140); +} + +function extractTextContent(content: unknown): string { + if (typeof content === "string") { + return content.trim(); + } + + if (Array.isArray(content)) { + const parts: string[] = []; + for (const item of content) { + if (typeof item === "string") { + const trimmed = item.trim(); + if (trimmed.length > 0) { + parts.push(trimmed); + } + continue; + } + if (!isRecord(item)) { + continue; + } + const itemType = asString(item.type); + if (itemType !== null && itemType !== "text") { + continue; + } + const textValue = asString(item.text); + if (textValue && textValue.trim().length > 0) { + parts.push(textValue.trim()); + } + } + return parts.join("\n\n").trim(); + } + + if (isRecord(content)) { + const textValue = asString(content.text); + if (textValue) { + return textValue.trim(); + } + } + + return ""; +} + +function inferAgentIdFromPath(filePath: string): string | null { + const pathParts = filePath.split(path.sep); + const agentsIndex = pathParts.lastIndexOf("agents"); + if (agentsIndex < 0) { + return null; + } + const candidate = pathParts[agentsIndex + 1]; + return candidate && candidate.trim().length > 0 ? candidate : null; +} + +async function parseSessionFile(filePath: string): Promise { + const fallbackSessionId = path.basename(filePath, path.extname(filePath)); + const inferredAgentId = inferAgentIdFromPath(filePath) ?? undefined; + + let sessionId = fallbackSessionId; + let title: string | null = null; + let startedAt: string | undefined; + let endedAt: string | undefined; + let parseErrors = 0; + const messages: IngestMessage[] = []; + + const readStream = createReadStream(filePath, { encoding: "utf8" }); + const lineReader = createInterface({ + input: readStream, + crlfDelay: Number.POSITIVE_INFINITY, + }); + + for await (const rawLine of lineReader) { + const line = rawLine.trim(); + if (line.length === 0) { + continue; + } + + let parsedLine: unknown; + try { + parsedLine = JSON.parse(line) as unknown; + } catch { + parseErrors += 1; + continue; + } + + if (!isRecord(parsedLine)) { + parseErrors += 1; + continue; + } + + const eventType = asString(parsedLine.type); + if (eventType === "session") { + const rawSessionId = asString(parsedLine.id); + if (rawSessionId && rawSessionId.trim().length > 0) { + sessionId = rawSessionId; + } + + const sessionTimestamp = normalizeIsoTimestamp(parsedLine.timestamp); + if (sessionTimestamp) { + startedAt ??= sessionTimestamp; + } + continue; + } + + if (eventType !== "message") { + continue; + } + + const messageRecord = parsedLine.message; + if (!isRecord(messageRecord)) { + continue; + } + + const role = asString(messageRecord.role); + if (role !== "user" && role !== "assistant") { + continue; + } + + const content = extractTextContent(messageRecord.content); + if (content.length === 0) { + continue; + } + + const timestamp = + normalizeIsoTimestamp(messageRecord.timestamp) ?? normalizeIsoTimestamp(parsedLine.timestamp); + + const message: IngestMessage = { + role, + content, + timestamp: timestamp ?? undefined, + }; + messages.push(message); + + if (!title && role === "user") { + title = deriveTitle(content, sessionId); + } + if (!startedAt && timestamp) { + startedAt = timestamp; + } + if (timestamp) { + endedAt = timestamp; + } + } + + return { + sessionId, + title: title ?? `OpenClaw session ${sessionId}`, + messages, + startedAt, + endedAt, + parseErrors, + inferredAgentId, + }; +} + +async function pathExists(candidatePath: string): Promise { + try { + await access(candidatePath, fsConstants.F_OK); + return true; + } catch { + return false; + } +} + +async function discoverSessionDirectories(overrideDir?: string): Promise { + if (overrideDir) { + if (!(await pathExists(overrideDir))) { + throw new Error(`Provided --sessions-dir does not exist: ${overrideDir}`); + } + return [overrideDir]; + } + + const defaultDir = path.join(homedir(), ".openclaw", "sessions"); + if (await pathExists(defaultDir)) { + return [defaultDir]; + } + + const agentsRoot = path.join(homedir(), ".openclaw", "agents"); + if (!(await pathExists(agentsRoot))) { + return []; + } + + const agentEntries = await readdir(agentsRoot, { withFileTypes: true }); + const directories: string[] = []; + for (const entry of agentEntries) { + if (!entry.isDirectory()) { + continue; + } + const sessionsDir = path.join(agentsRoot, entry.name, "sessions"); + if (await pathExists(sessionsDir)) { + directories.push(sessionsDir); + } + } + + return directories.sort((left, right) => left.localeCompare(right)); +} + +async function discoverSessionFiles(overrideDir?: string): Promise { + const directories = await discoverSessionDirectories(overrideDir); + const files: string[] = []; + + for (const directory of directories) { + const entries = await readdir(directory, { withFileTypes: true }); + for (const entry of entries) { + if (!entry.isFile() || !entry.name.endsWith(".jsonl")) { + continue; + } + files.push(path.join(directory, entry.name)); + } + } + + return files.sort((left, right) => left.localeCompare(right)); +} + +async function resolveSessionTimestamp(session: ParsedSession, filePath: string): Promise { + const sessionTimestamp = session.startedAt ?? session.endedAt; + if (sessionTimestamp) { + const parsed = new Date(sessionTimestamp); + if (!Number.isNaN(parsed.getTime())) { + return parsed; + } + } + + const fileStat = await stat(filePath); + return fileStat.mtime; +} + +function buildPayload( + options: CliOptions, + session: ParsedSession, + fallbackAgentId: string | undefined +): IngestPayload { + const payload: IngestPayload = { + sessionId: session.sessionId, + workspaceId: options.workspaceId, + title: session.title, + messages: session.messages, + }; + + const selectedAgentId = options.agentId ?? fallbackAgentId; + if (selectedAgentId && selectedAgentId.trim().length > 0) { + payload.agentId = selectedAgentId.trim(); + } + + return payload; +} + +async function sendIngestRequest( + endpoint: string, + token: string, + payload: IngestPayload +): Promise { + const response = await fetch(endpoint, { + method: "POST", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify(payload), + }); + + const body = await response.text(); + return { + ok: response.ok, + status: response.status, + body, + }; +} + +function summarizeFailureBody(body: string): string { + const compact = body.replace(/\s+/gu, " ").trim(); + if (compact.length === 0) { + return "(empty response body)"; + } + return truncate(compact, 220); +} + +async function main(): Promise { + const options = parseCliArgs(process.argv.slice(2)); + const token = process.env.MOSAIC_API_TOKEN; + if (!token || token.trim().length === 0) { + throw new Error("MOSAIC_API_TOKEN environment variable is required."); + } + + const sessionFiles = await discoverSessionFiles(options.sessionsDir); + if (sessionFiles.length === 0) { + console.log("No OpenClaw session files found."); + return; + } + + console.log(`Discovered ${sessionFiles.length} session file(s).`); + if (options.since) { + console.log(`Applying --since filter at ${options.since.toISOString()}.`); + } + + const summary: IngestSummary = { + discovered: sessionFiles.length, + processed: 0, + ingested: 0, + skippedSince: 0, + skippedEmpty: 0, + skippedDuplicate: 0, + failed: 0, + }; + + for (const [index, filePath] of sessionFiles.entries()) { + const position = `${index + 1}/${sessionFiles.length}`; + const parsedSession = await parseSessionFile(filePath); + summary.processed += 1; + + if (parsedSession.messages.length === 0) { + summary.skippedEmpty += 1; + console.log( + `[${position}] Skipped ${parsedSession.sessionId}: no user/assistant text messages.` + ); + continue; + } + + const sessionDate = await resolveSessionTimestamp(parsedSession, filePath); + if (options.since && sessionDate.getTime() < options.since.getTime()) { + summary.skippedSince += 1; + console.log( + `[${position}] Skipped ${parsedSession.sessionId}: session is before --since (${sessionDate.toISOString()}).` + ); + continue; + } + + const payload = buildPayload(options, parsedSession, parsedSession.inferredAgentId); + let result: SendResult; + + try { + result = await sendIngestRequest(options.endpoint, token, payload); + } catch (error) { + summary.failed += 1; + const message = error instanceof Error ? error.message : String(error); + console.error(`[${position}] Failed ${parsedSession.sessionId}: request error: ${message}`); + continue; + } + + if (result.ok) { + summary.ingested += 1; + const note = + parsedSession.parseErrors > 0 ? ` (parse warnings: ${parsedSession.parseErrors})` : ""; + console.log( + `[${position}] Ingested ${parsedSession.sessionId} (${parsedSession.messages.length} messages)${note}.` + ); + continue; + } + + if (result.status === 409) { + summary.skippedDuplicate += 1; + console.log(`[${position}] Skipped ${parsedSession.sessionId}: already exists (409).`); + continue; + } + + summary.failed += 1; + console.error( + `[${position}] Failed ${parsedSession.sessionId}: HTTP ${result.status} ${summarizeFailureBody(result.body)}` + ); + } + + console.log("\nIngestion summary:"); + console.log(` Discovered: ${summary.discovered}`); + console.log(` Processed: ${summary.processed}`); + console.log(` Ingested: ${summary.ingested}`); + console.log(` Skipped (--since): ${summary.skippedSince}`); + console.log(` Skipped (empty): ${summary.skippedEmpty}`); + console.log(` Skipped (duplicate): ${summary.skippedDuplicate}`); + console.log(` Failed: ${summary.failed}`); + + if (summary.failed > 0) { + process.exit(1); + } +} + +main().catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + console.error(`Fatal error: ${message}`); + process.exit(1); +}); -- 2.49.1