From 206976a4c79c033c3417d27738ab204c556b95f3 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 12:13:58 -0600 Subject: [PATCH] fix: clean compacted thoughts and batch ingest in chunks --- src/engine.ts | 72 +++++++++++++++++---------- tests/engine.test.ts | 113 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 155 insertions(+), 30 deletions(-) diff --git a/src/engine.ts b/src/engine.ts index 08a1c07..1dc7faf 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -236,23 +236,31 @@ export class OpenBrainContextEngine implements ContextEngine { }): Promise { this.assertNotDisposed(); + const maxConcurrency = 5; let ingestedCount = 0; - for (const message of params.messages) { - const ingestParams: { - sessionId: string; - message: AgentMessage; - isHeartbeat?: boolean; - } = { - sessionId: params.sessionId, - message, - }; - if (params.isHeartbeat !== undefined) { - ingestParams.isHeartbeat = params.isHeartbeat; - } - const result = await this.ingest(ingestParams); + for (let i = 0; i < params.messages.length; i += maxConcurrency) { + const chunk = params.messages.slice(i, i + maxConcurrency); + const results = await Promise.all( + chunk.map((message) => { + const ingestParams: { + sessionId: string; + message: AgentMessage; + isHeartbeat?: boolean; + } = { + sessionId: params.sessionId, + message, + }; + if (params.isHeartbeat !== undefined) { + ingestParams.isHeartbeat = params.isHeartbeat; + } + return this.ingest(ingestParams); + }), + ); - if (result.ingested) { - ingestedCount += 1; + for (const result of results) { + if (result.ingested) { + ingestedCount += 1; + } } } @@ -340,21 +348,22 @@ export class OpenBrainContextEngine implements ContextEngine { }; } + const summarizedThoughts = this.selectSummaryThoughts(recentThoughts); const summary = this.buildSummary( params.customInstructions !== undefined ? { sessionId: params.sessionId, - thoughts: recentThoughts, + thoughts: summarizedThoughts, customInstructions: params.customInstructions, } : { sessionId: params.sessionId, - thoughts: recentThoughts, + thoughts: summarizedThoughts, }, ); const summaryTokens = estimateTextTokens(summary); - const tokensBefore = this.estimateTokensForThoughts(recentThoughts); + const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts); await client.createThought({ content: summary, @@ -367,6 +376,15 @@ export class OpenBrainContextEngine implements ContextEngine { }, }); + const summaryThoughtIds = Array.from( + new Set( + summarizedThoughts + .map((thought) => thought.id.trim()) + .filter((id) => id.length > 0), + ), + ); + await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId))); + return { ok: true, compacted: true, @@ -600,7 +618,7 @@ export class OpenBrainContextEngine implements ContextEngine { } const tokens = estimateTextTokens(serializeContent(message.content)); if (total + tokens > tokenBudget) { - continue; + break; } total += tokens; @@ -630,12 +648,7 @@ export class OpenBrainContextEngine implements ContextEngine { thoughts: OpenBrainThought[]; customInstructions?: string; }): string { - const ordered = [...params.thoughts].sort((a, b) => { - return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0); - }); - - const maxLines = Math.min(ordered.length, 10); - const lines = ordered.slice(Math.max(ordered.length - maxLines, 0)).map((thought) => { + const lines = params.thoughts.map((thought) => { const role = normalizeRole(thought.metadata?.role); const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180); return `- ${role}: ${content}`; @@ -650,6 +663,15 @@ export class OpenBrainContextEngine implements ContextEngine { return `${header}\n${instruction}${lines.join("\n")}`; } + private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] { + const ordered = [...thoughts].sort((a, b) => { + return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0); + }); + + const maxLines = Math.min(ordered.length, 10); + return ordered.slice(Math.max(ordered.length - maxLines, 0)); + } + private buildSubagentSeedContent(params: { parentSessionKey: string; childSessionKey: string; diff --git a/tests/engine.test.ts b/tests/engine.test.ts index 6327921..bdbd195 100644 --- a/tests/engine.test.ts +++ b/tests/engine.test.ts @@ -131,6 +131,54 @@ describe("OpenBrainContextEngine", () => { expect(mockClient.createThought).toHaveBeenCalledTimes(2); }); + it("ingests batches in parallel chunks of five", async () => { + const mockClient = makeMockClient(); + let inFlight = 0; + let maxInFlight = 0; + let createdCount = 0; + + vi.mocked(mockClient.createThought).mockImplementation(async (input: OpenBrainThoughtInput) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => { + setTimeout(resolve, 20); + }); + inFlight -= 1; + createdCount += 1; + return { + id: `thought-${createdCount}`, + content: input.content, + source: input.source, + metadata: input.metadata, + createdAt: new Date().toISOString(), + updatedAt: undefined, + score: undefined, + }; + }); + + const engine = new OpenBrainContextEngine( + { + baseUrl: "https://brain.example.com", + apiKey: "secret", + }, + { createClient: () => mockClient }, + ); + + await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" }); + const result = await engine.ingestBatch({ + sessionId, + messages: Array.from({ length: 10 }, (_, index) => ({ + role: index % 2 === 0 ? "user" : "assistant", + content: `message-${index + 1}`, + timestamp: index + 1, + })), + }); + + expect(result.ingestedCount).toBe(10); + expect(maxInFlight).toBe(5); + expect(mockClient.createThought).toHaveBeenCalledTimes(10); + }); + it("assembles context from recent + semantic search, deduped and budget-aware", async () => { const mockClient = makeMockClient(); vi.mocked(mockClient.listRecent).mockResolvedValue([ @@ -192,12 +240,19 @@ describe("OpenBrainContextEngine", () => { ]); }); - it("compact archives a summary thought", async () => { + it("compact archives a summary thought and deletes summarized inputs", async () => { const mockClient = makeMockClient(); - vi.mocked(mockClient.listRecent).mockResolvedValue([ - makeThought("t1", "first message", sessionId, "user", "2026-03-06T12:00:00.000Z"), - makeThought("t2", "second message", sessionId, "assistant", "2026-03-06T12:01:00.000Z"), - ]); + vi.mocked(mockClient.listRecent).mockResolvedValue( + Array.from({ length: 12 }, (_, index) => { + return makeThought( + `t${index + 1}`, + `message ${index + 1}`, + sessionId, + index % 2 === 0 ? "user" : "assistant", + `2026-03-06T12:${String(index).padStart(2, "0")}:00.000Z`, + ); + }), + ); const engine = new OpenBrainContextEngine( { @@ -226,6 +281,54 @@ describe("OpenBrainContextEngine", () => { }), }), ); + const deletedIds = vi + .mocked(mockClient.deleteThought) + .mock.calls.map(([id]) => id) + .sort((left, right) => left.localeCompare(right)); + expect(deletedIds).toEqual([ + "t10", + "t11", + "t12", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + ]); + }); + + it("stops trimming once the newest message exceeds budget", async () => { + const mockClient = makeMockClient(); + const oversizedNewest = "z".repeat(400); + vi.mocked(mockClient.listRecent).mockResolvedValue([ + makeThought("t1", "small older message", sessionId, "assistant", "2026-03-06T12:00:00.000Z"), + makeThought("t2", oversizedNewest, sessionId, "assistant", "2026-03-06T12:01:00.000Z"), + ]); + + const engine = new OpenBrainContextEngine( + { + baseUrl: "https://brain.example.com", + apiKey: "secret", + }, + { createClient: () => mockClient }, + ); + + await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" }); + const result = await engine.assemble({ + sessionId, + messages: [ + { + role: "user", + content: "query", + timestamp: Date.now(), + }, + ], + tokenBudget: 12, + }); + + expect(result.messages.map((message) => String(message.content))).toEqual([oversizedNewest]); }); it("prepares subagent spawn and rollback deletes seeded context", async () => {