Files
mosaic/plugins/openclaw-context/src/engine.ts
2026-03-07 00:33:20 +00:00

775 lines
21 KiB
TypeScript

import { OPENBRAIN_CONTEXT_ENGINE_ID, OPENBRAIN_PLUGIN_VERSION } from "./constants.js";
import { OpenBrainConfigError } from "./errors.js";
import type {
AgentMessage,
AssembleResult,
BootstrapResult,
CompactResult,
ContextEngine,
ContextEngineInfo,
IngestBatchResult,
IngestResult,
PluginLogger,
SubagentEndReason,
SubagentSpawnPreparation,
} from "./openclaw-types.js";
import {
OpenBrainClient,
type OpenBrainClientLike,
type OpenBrainSearchInput,
type OpenBrainThought,
type OpenBrainThoughtMetadata,
} from "./openbrain-client.js";
export type OpenBrainContextEngineConfig = {
baseUrl?: string;
apiKey?: string;
recentMessages?: number;
semanticSearchLimit?: number;
source?: string;
subagentRecentMessages?: number;
};
type ResolvedOpenBrainContextEngineConfig = {
baseUrl: string;
apiKey: string;
recentMessages: number;
semanticSearchLimit: number;
source: string;
subagentRecentMessages: number;
};
export type OpenBrainContextEngineDeps = {
createClient?: (config: ResolvedOpenBrainContextEngineConfig) => OpenBrainClientLike;
now?: () => number;
logger?: PluginLogger;
};
type SubagentState = {
parentSessionKey: string;
seedThoughtId?: string;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function parsePositiveInteger(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const rounded = Math.floor(value);
return rounded > 0 ? rounded : fallback;
}
function normalizeRole(role: unknown): string {
if (typeof role !== "string" || role.length === 0) {
return "assistant";
}
if (role === "user" || role === "assistant" || role === "tool" || role === "system") {
return role;
}
return "assistant";
}
function serializeContent(value: unknown): string {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value
.map((part) => serializeContent(part))
.filter((part) => part.length > 0)
.join("\n")
.trim();
}
if (isRecord(value) && typeof value.text === "string") {
return value.text;
}
if (value === undefined || value === null) {
return "";
}
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
function estimateTextTokens(text: string): number {
const normalized = text.trim();
if (normalized.length === 0) {
return 1;
}
return Math.max(1, Math.ceil(normalized.length / 4) + 4);
}
function thoughtTimestamp(thought: OpenBrainThought, fallbackTimestamp: number): number {
const createdAt =
thought.createdAt ??
(typeof thought.created_at === "string" ? thought.created_at : undefined);
if (createdAt === undefined) {
return fallbackTimestamp;
}
const parsed = Date.parse(createdAt);
return Number.isFinite(parsed) ? parsed : fallbackTimestamp;
}
function thoughtFingerprint(thought: OpenBrainThought): string {
const role = typeof thought.metadata?.role === "string" ? thought.metadata.role : "assistant";
return `${role}\n${thought.content}`;
}
function truncateLine(value: string, maxLength: number): string {
if (value.length <= maxLength) {
return value;
}
return `${value.slice(0, maxLength - 3)}...`;
}
export class OpenBrainContextEngine implements ContextEngine {
readonly info: ContextEngineInfo = {
id: OPENBRAIN_CONTEXT_ENGINE_ID,
name: "OpenBrain Context Engine",
version: OPENBRAIN_PLUGIN_VERSION,
ownsCompaction: true,
};
private readonly rawConfig: unknown;
private readonly createClientFn:
| ((config: ResolvedOpenBrainContextEngineConfig) => OpenBrainClientLike)
| undefined;
private readonly now: () => number;
private readonly logger: PluginLogger | undefined;
private config: ResolvedOpenBrainContextEngineConfig | undefined;
private client: OpenBrainClientLike | undefined;
private readonly sessionTurns = new Map<string, number>();
private readonly subagentState = new Map<string, SubagentState>();
private disposed = false;
constructor(rawConfig: unknown, deps?: OpenBrainContextEngineDeps) {
this.rawConfig = rawConfig;
this.createClientFn = deps?.createClient;
this.now = deps?.now ?? (() => Date.now());
this.logger = deps?.logger;
}
async bootstrap(params: { sessionId: string; sessionFile: string }): Promise<BootstrapResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = await client.listRecent({
limit: config.recentMessages,
source,
});
const sessionThoughts = this.filterSessionThoughts(recentThoughts, params.sessionId);
let maxTurn = -1;
for (const thought of sessionThoughts) {
const turn = thought.metadata?.turn;
if (typeof turn === "number" && Number.isFinite(turn) && turn > maxTurn) {
maxTurn = turn;
}
}
this.sessionTurns.set(params.sessionId, maxTurn + 1);
return {
bootstrapped: true,
importedMessages: sessionThoughts.length,
};
}
async ingest(params: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
}): Promise<IngestResult> {
this.assertNotDisposed();
const client = this.getClient();
const content = serializeContent(params.message.content).trim();
if (content.length === 0) {
return { ingested: false };
}
const metadata: OpenBrainThoughtMetadata = {
sessionId: params.sessionId,
turn: this.nextTurn(params.sessionId),
role: normalizeRole(params.message.role),
type: "message",
};
if (params.isHeartbeat === true) {
metadata.isHeartbeat = true;
}
await client.createThought({
content,
source: this.sourceForSession(params.sessionId),
metadata,
});
return { ingested: true };
}
async ingestBatch(params: {
sessionId: string;
messages: AgentMessage[];
isHeartbeat?: boolean;
}): Promise<IngestBatchResult> {
this.assertNotDisposed();
const maxConcurrency = 5;
let ingestedCount = 0;
for (let i = 0; i < params.messages.length; i += maxConcurrency) {
const chunk = params.messages.slice(i, i + maxConcurrency);
const results = await Promise.all(
chunk.map((message) => {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
return this.ingest(ingestParams);
}),
);
for (const result of results) {
if (result.ingested) {
ingestedCount += 1;
}
}
}
return { ingestedCount };
}
async assemble(params: {
sessionId: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.recentMessages,
source,
}),
params.sessionId,
);
const semanticThoughts = await this.searchSemanticThoughts({
client,
source,
config,
sessionId: params.sessionId,
messages: params.messages,
});
const mergedThoughts = this.mergeThoughts(recentThoughts, semanticThoughts);
const mergedMessages =
mergedThoughts.length > 0
? mergedThoughts.map((thought, index) => this.toAgentMessage(thought, index))
: params.messages;
const tokenBudget = params.tokenBudget;
const budgetedMessages =
typeof tokenBudget === "number" && tokenBudget > 0
? this.trimToBudget(mergedMessages, tokenBudget)
: mergedMessages;
return {
messages: budgetedMessages,
estimatedTokens: this.estimateTokensForMessages(budgetedMessages),
};
}
async compact(params: {
sessionId: string;
sessionFile: string;
tokenBudget?: number;
force?: boolean;
currentTokenCount?: number;
compactionTarget?: "budget" | "threshold";
customInstructions?: string;
legacyParams?: Record<string, unknown>;
}): Promise<CompactResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: Math.max(config.recentMessages, config.subagentRecentMessages),
source,
}),
params.sessionId,
);
if (recentThoughts.length === 0) {
return {
ok: true,
compacted: false,
reason: "no-session-context",
result: {
tokensBefore: 0,
tokensAfter: 0,
},
};
}
const summarizedThoughts = this.selectSummaryThoughts(recentThoughts);
const summary = this.buildSummary(
params.customInstructions !== undefined
? {
sessionId: params.sessionId,
thoughts: summarizedThoughts,
customInstructions: params.customInstructions,
}
: {
sessionId: params.sessionId,
thoughts: summarizedThoughts,
},
);
const summaryTokens = estimateTextTokens(summary);
const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts);
await client.createThought({
content: summary,
source,
metadata: {
sessionId: params.sessionId,
turn: this.nextTurn(params.sessionId),
role: "assistant",
type: "summary",
},
});
const summaryThoughtIds = Array.from(
new Set(
summarizedThoughts
.map((thought) => thought.id.trim())
.filter((id) => id.length > 0),
),
);
await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId)));
return {
ok: true,
compacted: true,
reason: "summary-archived",
result: {
summary,
tokensBefore,
tokensAfter: summaryTokens,
},
};
}
async prepareSubagentSpawn(params: {
parentSessionKey: string;
childSessionKey: string;
ttlMs?: number;
}): Promise<SubagentSpawnPreparation | undefined> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const parentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.subagentRecentMessages,
source: this.sourceForSession(params.parentSessionKey),
}),
params.parentSessionKey,
);
const seedContent = this.buildSubagentSeedContent({
parentSessionKey: params.parentSessionKey,
childSessionKey: params.childSessionKey,
thoughts: parentThoughts,
});
const createdThought = await client.createThought({
content: seedContent,
source: this.sourceForSession(params.childSessionKey),
metadata: {
sessionId: params.childSessionKey,
role: "assistant",
type: "summary",
parentSessionId: params.parentSessionKey,
ttlMs: params.ttlMs,
},
});
this.subagentState.set(params.childSessionKey, {
parentSessionKey: params.parentSessionKey,
seedThoughtId: createdThought.id,
});
return {
rollback: async () => {
const state = this.subagentState.get(params.childSessionKey);
this.subagentState.delete(params.childSessionKey);
if (state?.seedThoughtId !== undefined && state.seedThoughtId.length > 0) {
await client.deleteThought(state.seedThoughtId);
}
},
};
}
async onSubagentEnded(params: {
childSessionKey: string;
reason: SubagentEndReason;
}): Promise<void> {
this.assertNotDisposed();
const state = this.subagentState.get(params.childSessionKey);
if (state === undefined) {
return;
}
const client = this.getClient();
const config = this.getConfig();
const childThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.subagentRecentMessages,
source: this.sourceForSession(params.childSessionKey),
}),
params.childSessionKey,
);
const summary = this.buildSubagentResultSummary({
childSessionKey: params.childSessionKey,
reason: params.reason,
thoughts: childThoughts,
});
await client.createThought({
content: summary,
source: this.sourceForSession(state.parentSessionKey),
metadata: {
sessionId: state.parentSessionKey,
turn: this.nextTurn(state.parentSessionKey),
role: "tool",
type: "subagent-result",
childSessionId: params.childSessionKey,
reason: params.reason,
},
});
this.subagentState.delete(params.childSessionKey);
}
async dispose(): Promise<void> {
this.sessionTurns.clear();
this.subagentState.clear();
this.disposed = true;
}
private searchSemanticThoughts(params: {
client: OpenBrainClientLike;
source: string;
config: ResolvedOpenBrainContextEngineConfig;
sessionId: string;
messages: AgentMessage[];
}): Promise<OpenBrainThought[]> {
const query = this.pickSemanticQuery(params.messages);
if (query === undefined || query.length === 0 || params.config.semanticSearchLimit <= 0) {
return Promise.resolve([]);
}
const request: OpenBrainSearchInput = {
query,
limit: params.config.semanticSearchLimit,
source: params.source,
};
return params.client
.search(request)
.then((results) => this.filterSessionThoughts(results, params.sessionId))
.catch((error) => {
this.logger?.warn?.("OpenBrain semantic search failed", error);
return [];
});
}
private pickSemanticQuery(messages: AgentMessage[]): string | undefined {
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
if (normalizeRole(message.role) !== "user") {
continue;
}
const content = serializeContent(message.content).trim();
if (content.length > 0) {
return content;
}
}
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
const content = serializeContent(message.content).trim();
if (content.length > 0) {
return content;
}
}
return undefined;
}
private mergeThoughts(recentThoughts: OpenBrainThought[], semanticThoughts: OpenBrainThought[]): OpenBrainThought[] {
const merged: OpenBrainThought[] = [];
const seenIds = new Set<string>();
const seenFingerprints = new Set<string>();
for (const thought of [...recentThoughts, ...semanticThoughts]) {
const id = thought.id.trim();
const fingerprint = thoughtFingerprint(thought);
if (id.length > 0 && seenIds.has(id)) {
continue;
}
if (seenFingerprints.has(fingerprint)) {
continue;
}
if (id.length > 0) {
seenIds.add(id);
}
seenFingerprints.add(fingerprint);
merged.push(thought);
}
return merged;
}
private filterSessionThoughts(thoughts: OpenBrainThought[], sessionId: string): OpenBrainThought[] {
return thoughts.filter((thought) => {
const thoughtSessionId = thought.metadata?.sessionId;
if (typeof thoughtSessionId === "string" && thoughtSessionId.length > 0) {
return thoughtSessionId === sessionId;
}
return thought.source === this.sourceForSession(sessionId);
});
}
private toAgentMessage(thought: OpenBrainThought, index: number): AgentMessage {
return {
role: normalizeRole(thought.metadata?.role),
content: thought.content,
timestamp: thoughtTimestamp(thought, this.now() + index),
};
}
private trimToBudget(messages: AgentMessage[], tokenBudget: number): AgentMessage[] {
if (messages.length === 0 || tokenBudget <= 0) {
return [];
}
let total = 0;
const budgeted: AgentMessage[] = [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
const tokens = estimateTextTokens(serializeContent(message.content));
if (total + tokens > tokenBudget) {
break;
}
total += tokens;
budgeted.unshift(message);
}
if (budgeted.length === 0) {
const lastMessage = messages[messages.length - 1];
return lastMessage === undefined ? [] : [lastMessage];
}
return budgeted;
}
private estimateTokensForMessages(messages: AgentMessage[]): number {
return messages.reduce((total, message) => {
return total + estimateTextTokens(serializeContent(message.content));
}, 0);
}
private estimateTokensForThoughts(thoughts: OpenBrainThought[]): number {
return thoughts.reduce((total, thought) => total + estimateTextTokens(thought.content), 0);
}
private buildSummary(params: {
sessionId: string;
thoughts: OpenBrainThought[];
customInstructions?: string;
}): string {
const lines = params.thoughts.map((thought) => {
const role = normalizeRole(thought.metadata?.role);
const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180);
return `- ${role}: ${content}`;
});
const header = `Context summary for session ${params.sessionId}`;
const instruction =
params.customInstructions !== undefined && params.customInstructions.trim().length > 0
? `Custom instructions: ${params.customInstructions.trim()}\n`
: "";
return `${header}\n${instruction}${lines.join("\n")}`;
}
private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] {
const ordered = [...thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
return ordered.slice(Math.max(ordered.length - maxLines, 0));
}
private buildSubagentSeedContent(params: {
parentSessionKey: string;
childSessionKey: string;
thoughts: OpenBrainThought[];
}): string {
const lines = params.thoughts.slice(-5).map((thought) => {
const role = normalizeRole(thought.metadata?.role);
return `- ${role}: ${truncateLine(thought.content.replace(/\s+/g, " ").trim(), 160)}`;
});
const contextBlock = lines.length > 0 ? lines.join("\n") : "- (no prior context found)";
return [
`Subagent context seed`,
`Parent session: ${params.parentSessionKey}`,
`Child session: ${params.childSessionKey}`,
contextBlock,
].join("\n");
}
private buildSubagentResultSummary(params: {
childSessionKey: string;
reason: SubagentEndReason;
thoughts: OpenBrainThought[];
}): string {
const lines = params.thoughts.slice(-5).map((thought) => {
const role = normalizeRole(thought.metadata?.role);
return `- ${role}: ${truncateLine(thought.content.replace(/\s+/g, " ").trim(), 160)}`;
});
const contextBlock = lines.length > 0 ? lines.join("\n") : "- (no child messages found)";
return [
`Subagent ended (${params.reason})`,
`Child session: ${params.childSessionKey}`,
contextBlock,
].join("\n");
}
private sourceForSession(sessionId: string): string {
return `${this.getConfig().source}:${sessionId}`;
}
private nextTurn(sessionId: string): number {
const next = this.sessionTurns.get(sessionId) ?? 0;
this.sessionTurns.set(sessionId, next + 1);
return next;
}
private getClient(): OpenBrainClientLike {
if (this.client !== undefined) {
return this.client;
}
const config = this.getConfig();
this.client =
this.createClientFn?.(config) ??
new OpenBrainClient({
baseUrl: config.baseUrl,
apiKey: config.apiKey,
});
return this.client;
}
private getConfig(): ResolvedOpenBrainContextEngineConfig {
if (this.config !== undefined) {
return this.config;
}
const raw = isRecord(this.rawConfig) ? this.rawConfig : {};
const baseUrl = typeof raw.baseUrl === "string" ? raw.baseUrl.trim() : "";
if (baseUrl.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: baseUrl");
}
const apiKey = typeof raw.apiKey === "string" ? raw.apiKey.trim() : "";
if (apiKey.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: apiKey");
}
this.config = {
baseUrl,
apiKey,
recentMessages: parsePositiveInteger(raw.recentMessages, 20),
semanticSearchLimit: parsePositiveInteger(raw.semanticSearchLimit, 10),
source: typeof raw.source === "string" && raw.source.trim().length > 0 ? raw.source.trim() : "openclaw",
subagentRecentMessages: parsePositiveInteger(raw.subagentRecentMessages, 8),
};
return this.config;
}
private assertNotDisposed(): void {
if (this.disposed) {
throw new Error("OpenBrainContextEngine has already been disposed");
}
}
}