fix: compact cleanup, ingestBatch parallelism, trimToBudget early exit (#5)

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #5.
This commit is contained in:
2026-03-06 18:14:55 +00:00
committed by jason.woltje
parent ddeeee8a44
commit 0be2181e33
2 changed files with 155 additions and 30 deletions

View File

@@ -236,23 +236,31 @@ export class OpenBrainContextEngine implements ContextEngine {
}): Promise<IngestBatchResult> {
this.assertNotDisposed();
const maxConcurrency = 5;
let ingestedCount = 0;
for (const message of params.messages) {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
const result = await this.ingest(ingestParams);
for (let i = 0; i < params.messages.length; i += maxConcurrency) {
const chunk = params.messages.slice(i, i + maxConcurrency);
const results = await Promise.all(
chunk.map((message) => {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
return this.ingest(ingestParams);
}),
);
if (result.ingested) {
ingestedCount += 1;
for (const result of results) {
if (result.ingested) {
ingestedCount += 1;
}
}
}
@@ -340,21 +348,22 @@ export class OpenBrainContextEngine implements ContextEngine {
};
}
const summarizedThoughts = this.selectSummaryThoughts(recentThoughts);
const summary = this.buildSummary(
params.customInstructions !== undefined
? {
sessionId: params.sessionId,
thoughts: recentThoughts,
thoughts: summarizedThoughts,
customInstructions: params.customInstructions,
}
: {
sessionId: params.sessionId,
thoughts: recentThoughts,
thoughts: summarizedThoughts,
},
);
const summaryTokens = estimateTextTokens(summary);
const tokensBefore = this.estimateTokensForThoughts(recentThoughts);
const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts);
await client.createThought({
content: summary,
@@ -367,6 +376,15 @@ export class OpenBrainContextEngine implements ContextEngine {
},
});
const summaryThoughtIds = Array.from(
new Set(
summarizedThoughts
.map((thought) => thought.id.trim())
.filter((id) => id.length > 0),
),
);
await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId)));
return {
ok: true,
compacted: true,
@@ -600,7 +618,7 @@ export class OpenBrainContextEngine implements ContextEngine {
}
const tokens = estimateTextTokens(serializeContent(message.content));
if (total + tokens > tokenBudget) {
continue;
break;
}
total += tokens;
@@ -630,12 +648,7 @@ export class OpenBrainContextEngine implements ContextEngine {
thoughts: OpenBrainThought[];
customInstructions?: string;
}): string {
const ordered = [...params.thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
const lines = ordered.slice(Math.max(ordered.length - maxLines, 0)).map((thought) => {
const lines = params.thoughts.map((thought) => {
const role = normalizeRole(thought.metadata?.role);
const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180);
return `- ${role}: ${content}`;
@@ -650,6 +663,15 @@ export class OpenBrainContextEngine implements ContextEngine {
return `${header}\n${instruction}${lines.join("\n")}`;
}
private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] {
const ordered = [...thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
return ordered.slice(Math.max(ordered.length - maxLines, 0));
}
private buildSubagentSeedContent(params: {
parentSessionKey: string;
childSessionKey: string;