fix: compact cleanup, ingestBatch parallelism, trimToBudget early exit #5

Merged
jason.woltje merged 1 commits from fix/compact-ingest-trim into main 2026-03-06 18:14:56 +00:00
2 changed files with 155 additions and 30 deletions

View File

@@ -236,23 +236,31 @@ export class OpenBrainContextEngine implements ContextEngine {
}): Promise<IngestBatchResult> {
this.assertNotDisposed();
const maxConcurrency = 5;
let ingestedCount = 0;
for (const message of params.messages) {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
const result = await this.ingest(ingestParams);
for (let i = 0; i < params.messages.length; i += maxConcurrency) {
const chunk = params.messages.slice(i, i + maxConcurrency);
const results = await Promise.all(
chunk.map((message) => {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
return this.ingest(ingestParams);
}),
);
if (result.ingested) {
ingestedCount += 1;
for (const result of results) {
if (result.ingested) {
ingestedCount += 1;
}
}
}
@@ -340,21 +348,22 @@ export class OpenBrainContextEngine implements ContextEngine {
};
}
const summarizedThoughts = this.selectSummaryThoughts(recentThoughts);
const summary = this.buildSummary(
params.customInstructions !== undefined
? {
sessionId: params.sessionId,
thoughts: recentThoughts,
thoughts: summarizedThoughts,
customInstructions: params.customInstructions,
}
: {
sessionId: params.sessionId,
thoughts: recentThoughts,
thoughts: summarizedThoughts,
},
);
const summaryTokens = estimateTextTokens(summary);
const tokensBefore = this.estimateTokensForThoughts(recentThoughts);
const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts);
await client.createThought({
content: summary,
@@ -367,6 +376,15 @@ export class OpenBrainContextEngine implements ContextEngine {
},
});
const summaryThoughtIds = Array.from(
new Set(
summarizedThoughts
.map((thought) => thought.id.trim())
.filter((id) => id.length > 0),
),
);
await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId)));
return {
ok: true,
compacted: true,
@@ -600,7 +618,7 @@ export class OpenBrainContextEngine implements ContextEngine {
}
const tokens = estimateTextTokens(serializeContent(message.content));
if (total + tokens > tokenBudget) {
continue;
break;
}
total += tokens;
@@ -630,12 +648,7 @@ export class OpenBrainContextEngine implements ContextEngine {
thoughts: OpenBrainThought[];
customInstructions?: string;
}): string {
const ordered = [...params.thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
const lines = ordered.slice(Math.max(ordered.length - maxLines, 0)).map((thought) => {
const lines = params.thoughts.map((thought) => {
const role = normalizeRole(thought.metadata?.role);
const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180);
return `- ${role}: ${content}`;
@@ -650,6 +663,15 @@ export class OpenBrainContextEngine implements ContextEngine {
return `${header}\n${instruction}${lines.join("\n")}`;
}
private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] {
const ordered = [...thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
return ordered.slice(Math.max(ordered.length - maxLines, 0));
}
private buildSubagentSeedContent(params: {
parentSessionKey: string;
childSessionKey: string;

View File

@@ -131,6 +131,54 @@ describe("OpenBrainContextEngine", () => {
expect(mockClient.createThought).toHaveBeenCalledTimes(2);
});
it("ingests batches in parallel chunks of five", async () => {
const mockClient = makeMockClient();
let inFlight = 0;
let maxInFlight = 0;
let createdCount = 0;
vi.mocked(mockClient.createThought).mockImplementation(async (input: OpenBrainThoughtInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => {
setTimeout(resolve, 20);
});
inFlight -= 1;
createdCount += 1;
return {
id: `thought-${createdCount}`,
content: input.content,
source: input.source,
metadata: input.metadata,
createdAt: new Date().toISOString(),
updatedAt: undefined,
score: undefined,
};
});
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.ingestBatch({
sessionId,
messages: Array.from({ length: 10 }, (_, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index + 1}`,
timestamp: index + 1,
})),
});
expect(result.ingestedCount).toBe(10);
expect(maxInFlight).toBe(5);
expect(mockClient.createThought).toHaveBeenCalledTimes(10);
});
it("assembles context from recent + semantic search, deduped and budget-aware", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([
@@ -192,12 +240,19 @@ describe("OpenBrainContextEngine", () => {
]);
});
it("compact archives a summary thought", async () => {
it("compact archives a summary thought and deletes summarized inputs", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "first message", sessionId, "user", "2026-03-06T12:00:00.000Z"),
makeThought("t2", "second message", sessionId, "assistant", "2026-03-06T12:01:00.000Z"),
]);
vi.mocked(mockClient.listRecent).mockResolvedValue(
Array.from({ length: 12 }, (_, index) => {
return makeThought(
`t${index + 1}`,
`message ${index + 1}`,
sessionId,
index % 2 === 0 ? "user" : "assistant",
`2026-03-06T12:${String(index).padStart(2, "0")}:00.000Z`,
);
}),
);
const engine = new OpenBrainContextEngine(
{
@@ -226,6 +281,54 @@ describe("OpenBrainContextEngine", () => {
}),
}),
);
const deletedIds = vi
.mocked(mockClient.deleteThought)
.mock.calls.map(([id]) => id)
.sort((left, right) => left.localeCompare(right));
expect(deletedIds).toEqual([
"t10",
"t11",
"t12",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
]);
});
it("stops trimming once the newest message exceeds budget", async () => {
const mockClient = makeMockClient();
const oversizedNewest = "z".repeat(400);
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "small older message", sessionId, "assistant", "2026-03-06T12:00:00.000Z"),
makeThought("t2", oversizedNewest, sessionId, "assistant", "2026-03-06T12:01:00.000Z"),
]);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.assemble({
sessionId,
messages: [
{
role: "user",
content: "query",
timestamp: Date.now(),
},
],
tokenBudget: 12,
});
expect(result.messages.map((message) => String(message.content))).toEqual([oversizedNewest]);
});
it("prepares subagent spawn and rollback deletes seeded context", async () => {