fix: compact cleanup, ingestBatch parallelism, trimToBudget early exit #5

Merged
jason.woltje merged 1 commits from fix/compact-ingest-trim into main 2026-03-06 18:14:56 +00:00
2 changed files with 155 additions and 30 deletions

View File

@@ -236,23 +236,31 @@ export class OpenBrainContextEngine implements ContextEngine {
}): Promise<IngestBatchResult> { }): Promise<IngestBatchResult> {
this.assertNotDisposed(); this.assertNotDisposed();
const maxConcurrency = 5;
let ingestedCount = 0; let ingestedCount = 0;
for (const message of params.messages) { for (let i = 0; i < params.messages.length; i += maxConcurrency) {
const ingestParams: { const chunk = params.messages.slice(i, i + maxConcurrency);
sessionId: string; const results = await Promise.all(
message: AgentMessage; chunk.map((message) => {
isHeartbeat?: boolean; const ingestParams: {
} = { sessionId: string;
sessionId: params.sessionId, message: AgentMessage;
message, isHeartbeat?: boolean;
}; } = {
if (params.isHeartbeat !== undefined) { sessionId: params.sessionId,
ingestParams.isHeartbeat = params.isHeartbeat; message,
} };
const result = await this.ingest(ingestParams); if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
return this.ingest(ingestParams);
}),
);
if (result.ingested) { for (const result of results) {
ingestedCount += 1; if (result.ingested) {
ingestedCount += 1;
}
} }
} }
@@ -340,21 +348,22 @@ export class OpenBrainContextEngine implements ContextEngine {
}; };
} }
const summarizedThoughts = this.selectSummaryThoughts(recentThoughts);
const summary = this.buildSummary( const summary = this.buildSummary(
params.customInstructions !== undefined params.customInstructions !== undefined
? { ? {
sessionId: params.sessionId, sessionId: params.sessionId,
thoughts: recentThoughts, thoughts: summarizedThoughts,
customInstructions: params.customInstructions, customInstructions: params.customInstructions,
} }
: { : {
sessionId: params.sessionId, sessionId: params.sessionId,
thoughts: recentThoughts, thoughts: summarizedThoughts,
}, },
); );
const summaryTokens = estimateTextTokens(summary); const summaryTokens = estimateTextTokens(summary);
const tokensBefore = this.estimateTokensForThoughts(recentThoughts); const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts);
await client.createThought({ await client.createThought({
content: summary, content: summary,
@@ -367,6 +376,15 @@ export class OpenBrainContextEngine implements ContextEngine {
}, },
}); });
const summaryThoughtIds = Array.from(
new Set(
summarizedThoughts
.map((thought) => thought.id.trim())
.filter((id) => id.length > 0),
),
);
await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId)));
return { return {
ok: true, ok: true,
compacted: true, compacted: true,
@@ -600,7 +618,7 @@ export class OpenBrainContextEngine implements ContextEngine {
} }
const tokens = estimateTextTokens(serializeContent(message.content)); const tokens = estimateTextTokens(serializeContent(message.content));
if (total + tokens > tokenBudget) { if (total + tokens > tokenBudget) {
continue; break;
} }
total += tokens; total += tokens;
@@ -630,12 +648,7 @@ export class OpenBrainContextEngine implements ContextEngine {
thoughts: OpenBrainThought[]; thoughts: OpenBrainThought[];
customInstructions?: string; customInstructions?: string;
}): string { }): string {
const ordered = [...params.thoughts].sort((a, b) => { const lines = params.thoughts.map((thought) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
const lines = ordered.slice(Math.max(ordered.length - maxLines, 0)).map((thought) => {
const role = normalizeRole(thought.metadata?.role); const role = normalizeRole(thought.metadata?.role);
const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180); const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180);
return `- ${role}: ${content}`; return `- ${role}: ${content}`;
@@ -650,6 +663,15 @@ export class OpenBrainContextEngine implements ContextEngine {
return `${header}\n${instruction}${lines.join("\n")}`; return `${header}\n${instruction}${lines.join("\n")}`;
} }
private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] {
const ordered = [...thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
return ordered.slice(Math.max(ordered.length - maxLines, 0));
}
private buildSubagentSeedContent(params: { private buildSubagentSeedContent(params: {
parentSessionKey: string; parentSessionKey: string;
childSessionKey: string; childSessionKey: string;

View File

@@ -131,6 +131,54 @@ describe("OpenBrainContextEngine", () => {
expect(mockClient.createThought).toHaveBeenCalledTimes(2); expect(mockClient.createThought).toHaveBeenCalledTimes(2);
}); });
it("ingests batches in parallel chunks of five", async () => {
const mockClient = makeMockClient();
let inFlight = 0;
let maxInFlight = 0;
let createdCount = 0;
vi.mocked(mockClient.createThought).mockImplementation(async (input: OpenBrainThoughtInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => {
setTimeout(resolve, 20);
});
inFlight -= 1;
createdCount += 1;
return {
id: `thought-${createdCount}`,
content: input.content,
source: input.source,
metadata: input.metadata,
createdAt: new Date().toISOString(),
updatedAt: undefined,
score: undefined,
};
});
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.ingestBatch({
sessionId,
messages: Array.from({ length: 10 }, (_, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index + 1}`,
timestamp: index + 1,
})),
});
expect(result.ingestedCount).toBe(10);
expect(maxInFlight).toBe(5);
expect(mockClient.createThought).toHaveBeenCalledTimes(10);
});
it("assembles context from recent + semantic search, deduped and budget-aware", async () => { it("assembles context from recent + semantic search, deduped and budget-aware", async () => {
const mockClient = makeMockClient(); const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([ vi.mocked(mockClient.listRecent).mockResolvedValue([
@@ -192,12 +240,19 @@ describe("OpenBrainContextEngine", () => {
]); ]);
}); });
it("compact archives a summary thought", async () => { it("compact archives a summary thought and deletes summarized inputs", async () => {
const mockClient = makeMockClient(); const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([ vi.mocked(mockClient.listRecent).mockResolvedValue(
makeThought("t1", "first message", sessionId, "user", "2026-03-06T12:00:00.000Z"), Array.from({ length: 12 }, (_, index) => {
makeThought("t2", "second message", sessionId, "assistant", "2026-03-06T12:01:00.000Z"), return makeThought(
]); `t${index + 1}`,
`message ${index + 1}`,
sessionId,
index % 2 === 0 ? "user" : "assistant",
`2026-03-06T12:${String(index).padStart(2, "0")}:00.000Z`,
);
}),
);
const engine = new OpenBrainContextEngine( const engine = new OpenBrainContextEngine(
{ {
@@ -226,6 +281,54 @@ describe("OpenBrainContextEngine", () => {
}), }),
}), }),
); );
const deletedIds = vi
.mocked(mockClient.deleteThought)
.mock.calls.map(([id]) => id)
.sort((left, right) => left.localeCompare(right));
expect(deletedIds).toEqual([
"t10",
"t11",
"t12",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
]);
});
it("stops trimming once the newest message exceeds budget", async () => {
const mockClient = makeMockClient();
const oversizedNewest = "z".repeat(400);
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "small older message", sessionId, "assistant", "2026-03-06T12:00:00.000Z"),
makeThought("t2", oversizedNewest, sessionId, "assistant", "2026-03-06T12:01:00.000Z"),
]);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.assemble({
sessionId,
messages: [
{
role: "user",
content: "query",
timestamp: Date.now(),
},
],
tokenBudget: 12,
});
expect(result.messages.map((message) => String(message.content))).toEqual([oversizedNewest]);
}); });
it("prepares subagent spawn and rollback deletes seeded context", async () => { it("prepares subagent spawn and rollback deletes seeded context", async () => {