fix(#329): Harden BudgetService against security review findings
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed

- Fix CRITICAL: Unbounded memory growth via daily record purging
- Fix CRITICAL: Negative/NaN/Infinity token bypass via input clamping
- Fix HIGH: TOCTOU race via atomic trySpawnAgent() method
- Fix HIGH: Phantom agent leak via Set<string> ID tracking (not counter)
- Fix HIGH: isAgentOverBudget now scoped to today only
- Fix HIGH: Config validation clamps invalid values to safe defaults
- Fix MEDIUM: Wire BudgetModule into AppModule
- Fix MEDIUM: Sanitize agentId in log output to prevent log injection
- Fix MEDIUM: Use Date objects for timezone-safe comparisons
- Fix MEDIUM: Reject empty agentId/taskId in recordUsage
- Add tests for negative tokens, NaN, Infinity, empty IDs, config edge cases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jason Woltje
2026-02-05 13:15:33 -06:00
parent 22dc964503
commit 2cb3fe8f5a
4 changed files with 291 additions and 63 deletions

View File

@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
import { HealthModule } from "./api/health/health.module"; import { HealthModule } from "./api/health/health.module";
import { AgentsModule } from "./api/agents/agents.module"; import { AgentsModule } from "./api/agents/agents.module";
import { CoordinatorModule } from "./coordinator/coordinator.module"; import { CoordinatorModule } from "./coordinator/coordinator.module";
import { BudgetModule } from "./budget/budget.module";
import { orchestratorConfig } from "./config/orchestrator.config"; import { orchestratorConfig } from "./config/orchestrator.config";
@Module({ @Module({
@@ -21,6 +22,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
HealthModule, HealthModule,
AgentsModule, AgentsModule,
CoordinatorModule, CoordinatorModule,
BudgetModule,
], ],
}) })
export class AppModule {} export class AppModule {}

View File

@@ -2,7 +2,7 @@
* BudgetService Unit Tests * BudgetService Unit Tests
* *
* Tests usage budget tracking, enforcement, and reporting. * Tests usage budget tracking, enforcement, and reporting.
* Covers issue #329 (ORCH-135) * Covers issue #329 (ORCH-135) including security hardening.
*/ */
import { describe, it, expect, beforeEach, vi } from "vitest"; import { describe, it, expect, beforeEach, vi } from "vitest";
import { BudgetService } from "./budget.service"; import { BudgetService } from "./budget.service";
@@ -12,7 +12,7 @@ describe("BudgetService", () => {
let service: BudgetService; let service: BudgetService;
const mockConfigService = { const mockConfigService = {
get: vi.fn((_key: string, defaultValue?: unknown) => defaultValue), get: vi.fn((_key: string, _defaultValue?: unknown) => undefined),
}; };
beforeEach(() => { beforeEach(() => {
@@ -53,6 +53,46 @@ describe("BudgetService", () => {
expect(budget.maxTaskDurationMinutes).toBe(60); expect(budget.maxTaskDurationMinutes).toBe(60);
expect(budget.enforceHardLimits).toBe(true); expect(budget.enforceHardLimits).toBe(true);
}); });
it("should clamp negative config values to defaults", () => {
const badConfig = {
get: vi.fn((key: string) => {
const config: Record<string, unknown> = {
"orchestrator.budget.dailyTokenLimit": -100,
"orchestrator.budget.perAgentTokenLimit": -50,
"orchestrator.budget.maxConcurrentAgents": -1,
"orchestrator.budget.maxTaskDurationMinutes": 0,
};
return config[key];
}),
};
const badService = new BudgetService(badConfig as unknown as ConfigService);
const budget = badService.getBudget();
expect(budget.dailyTokenLimit).toBe(10_000_000);
expect(budget.perAgentTokenLimit).toBe(2_000_000);
expect(budget.maxConcurrentAgents).toBe(10);
expect(budget.maxTaskDurationMinutes).toBe(120);
});
it("should clamp NaN config values to defaults", () => {
const nanConfig = {
get: vi.fn((key: string) => {
const config: Record<string, unknown> = {
"orchestrator.budget.dailyTokenLimit": NaN,
"orchestrator.budget.maxConcurrentAgents": Infinity,
};
return config[key];
}),
};
const nanService = new BudgetService(nanConfig as unknown as ConfigService);
const budget = nanService.getBudget();
expect(budget.dailyTokenLimit).toBe(10_000_000);
expect(budget.maxConcurrentAgents).toBe(10);
});
}); });
describe("recordUsage", () => { describe("recordUsage", () => {
@@ -84,6 +124,42 @@ describe("BudgetService", () => {
expect(agent1?.totalTokens).toBe(1500); expect(agent1?.totalTokens).toBe(1500);
expect(agent2?.totalTokens).toBe(4500); expect(agent2?.totalTokens).toBe(4500);
}); });
it("should clamp negative token values to 0", () => {
service.recordUsage("agent-1", "task-1", -500, -200);
const summary = service.getUsageSummary();
expect(summary.dailyTokensUsed).toBe(0);
});
it("should clamp NaN token values to 0", () => {
service.recordUsage("agent-1", "task-1", NaN, NaN);
const summary = service.getUsageSummary();
expect(summary.dailyTokensUsed).toBe(0);
});
it("should clamp Infinity token values to 0", () => {
service.recordUsage("agent-1", "task-1", Infinity, -Infinity);
const summary = service.getUsageSummary();
expect(summary.dailyTokensUsed).toBe(0);
});
it("should skip recording when agentId is empty", () => {
service.recordUsage("", "task-1", 1000, 500);
const summary = service.getUsageSummary();
expect(summary.dailyTokensUsed).toBe(0);
expect(summary.agentUsage).toHaveLength(0);
});
it("should skip recording when taskId is empty", () => {
service.recordUsage("agent-1", "", 1000, 500);
const summary = service.getUsageSummary();
expect(summary.dailyTokensUsed).toBe(0);
});
}); });
describe("canSpawnAgent", () => { describe("canSpawnAgent", () => {
@@ -94,7 +170,7 @@ describe("BudgetService", () => {
it("should block spawning when at max concurrent agents", () => { it("should block spawning when at max concurrent agents", () => {
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
service.agentStarted(); service.agentStarted(`agent-${String(i)}`);
} }
const result = service.canSpawnAgent(); const result = service.canSpawnAgent();
@@ -104,12 +180,12 @@ describe("BudgetService", () => {
it("should allow spawning after agent stops", () => { it("should allow spawning after agent stops", () => {
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
service.agentStarted(); service.agentStarted(`agent-${String(i)}`);
} }
expect(service.canSpawnAgent().allowed).toBe(false); expect(service.canSpawnAgent().allowed).toBe(false);
service.agentStopped(); service.agentStopped("agent-0");
expect(service.canSpawnAgent().allowed).toBe(true); expect(service.canSpawnAgent().allowed).toBe(true);
}); });
@@ -141,6 +217,43 @@ describe("BudgetService", () => {
}); });
}); });
describe("trySpawnAgent", () => {
it("should atomically check and reserve slot", () => {
const result = service.trySpawnAgent("agent-1");
expect(result.allowed).toBe(true);
const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(1);
});
it("should block when at max concurrent and not reserve slot", () => {
for (let i = 0; i < 10; i++) {
service.trySpawnAgent(`agent-${String(i)}`);
}
const result = service.trySpawnAgent("agent-11");
expect(result.allowed).toBe(false);
expect(result.reason).toContain("Maximum concurrent agents");
const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(10);
});
it("should reject empty agent ID", () => {
const result = service.trySpawnAgent("");
expect(result.allowed).toBe(false);
expect(result.reason).toContain("Agent ID is required");
});
it("should not double-count same agent ID", () => {
service.trySpawnAgent("agent-1");
service.trySpawnAgent("agent-1");
const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(1);
});
});
describe("isAgentOverBudget", () => { describe("isAgentOverBudget", () => {
it("should return false when agent is within budget", () => { it("should return false when agent is within budget", () => {
service.recordUsage("agent-1", "task-1", 100_000, 50_000); service.recordUsage("agent-1", "task-1", 100_000, 50_000);
@@ -166,31 +279,38 @@ describe("BudgetService", () => {
}); });
describe("agentStarted / agentStopped", () => { describe("agentStarted / agentStopped", () => {
it("should track active agent count", () => { it("should track active agent count by ID", () => {
service.agentStarted(); service.agentStarted("agent-1");
service.agentStarted(); service.agentStarted("agent-2");
service.agentStarted(); service.agentStarted("agent-3");
const summary = service.getUsageSummary(); const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(3); expect(summary.activeAgents).toBe(3);
}); });
it("should decrement active count on stop", () => { it("should decrement active count on stop", () => {
service.agentStarted(); service.agentStarted("agent-1");
service.agentStarted(); service.agentStarted("agent-2");
service.agentStopped(); service.agentStopped("agent-1");
const summary = service.getUsageSummary(); const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(1); expect(summary.activeAgents).toBe(1);
}); });
it("should not go below zero", () => { it("should handle stopping non-existent agent gracefully", () => {
service.agentStopped(); service.agentStopped("non-existent");
service.agentStopped();
const summary = service.getUsageSummary(); const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(0); expect(summary.activeAgents).toBe(0);
}); });
it("should not double-count same agent ID on start", () => {
service.agentStarted("agent-1");
service.agentStarted("agent-1");
const summary = service.getUsageSummary();
expect(summary.activeAgents).toBe(1);
});
}); });
describe("getUsageSummary", () => { describe("getUsageSummary", () => {
@@ -290,7 +410,7 @@ describe("BudgetService", () => {
const budget2 = service.getBudget(); const budget2 = service.getBudget();
expect(budget1).toEqual(budget2); expect(budget1).toEqual(budget2);
expect(budget1).not.toBe(budget2); // Different reference expect(budget1).not.toBe(budget2);
}); });
}); });
}); });

View File

@@ -15,31 +15,21 @@ import type {
} from "./budget.types"; } from "./budget.types";
import { DEFAULT_BUDGET } from "./budget.types"; import { DEFAULT_BUDGET } from "./budget.types";
/** Sanitize strings for safe log output */
function sanitizeForLog(value: string): string {
return value.replace(/[\n\r\t]/g, "_").slice(0, 128);
}
@Injectable() @Injectable()
export class BudgetService { export class BudgetService {
private readonly logger = new Logger(BudgetService.name); private readonly logger = new Logger(BudgetService.name);
private readonly budget: UsageBudget; private readonly budget: UsageBudget;
private readonly records: UsageRecord[] = []; private records: UsageRecord[] = [];
private readonly activeAgentCount = { value: 0 }; private readonly activeAgents = new Set<string>();
private lastPurgeDate = "";
constructor(private readonly configService: ConfigService) { constructor(private readonly configService: ConfigService) {
this.budget = { this.budget = this.loadAndValidateConfig();
dailyTokenLimit:
this.configService.get<number>("orchestrator.budget.dailyTokenLimit") ??
DEFAULT_BUDGET.dailyTokenLimit,
perAgentTokenLimit:
this.configService.get<number>("orchestrator.budget.perAgentTokenLimit") ??
DEFAULT_BUDGET.perAgentTokenLimit,
maxConcurrentAgents:
this.configService.get<number>("orchestrator.budget.maxConcurrentAgents") ??
DEFAULT_BUDGET.maxConcurrentAgents,
maxTaskDurationMinutes:
this.configService.get<number>("orchestrator.budget.maxTaskDurationMinutes") ??
DEFAULT_BUDGET.maxTaskDurationMinutes,
enforceHardLimits:
this.configService.get<boolean>("orchestrator.budget.enforceHardLimits") ??
DEFAULT_BUDGET.enforceHardLimits,
};
this.logger.log( this.logger.log(
`BudgetService initialized: daily=${String(this.budget.dailyTokenLimit)} tokens, ` + `BudgetService initialized: daily=${String(this.budget.dailyTokenLimit)} tokens, ` +
@@ -49,29 +39,68 @@ export class BudgetService {
} }
/** /**
* Record token usage for an agent * Record token usage for an agent.
* Negative and NaN values are clamped to 0.
*/ */
recordUsage(agentId: string, taskId: string, inputTokens: number, outputTokens: number): void { recordUsage(agentId: string, taskId: string, inputTokens: number, outputTokens: number): void {
if (!agentId || !taskId) {
this.logger.warn("recordUsage called with empty agentId or taskId — skipping");
return;
}
const safeInput = Math.max(0, Number.isFinite(inputTokens) ? inputTokens : 0);
const safeOutput = Math.max(0, Number.isFinite(outputTokens) ? outputTokens : 0);
this.purgeStaleRecords();
const record: UsageRecord = { const record: UsageRecord = {
agentId, agentId,
taskId, taskId,
inputTokens, inputTokens: safeInput,
outputTokens, outputTokens: safeOutput,
timestamp: new Date().toISOString(), timestamp: new Date(),
}; };
this.records.push(record); this.records.push(record);
this.logger.debug( this.logger.debug(
`Usage recorded: agent=${agentId} input=${String(inputTokens)} output=${String(outputTokens)}` `Usage recorded: agent=${sanitizeForLog(agentId)} input=${String(safeInput)} output=${String(safeOutput)}`
); );
} }
/** /**
* Check if an agent can be spawned (concurrency and budget check) * Check if an agent can be spawned (concurrency and budget check).
* When allowed, atomically increments the active agent count.
*/
trySpawnAgent(agentId: string): { allowed: boolean; reason?: string } {
if (!agentId) {
return { allowed: false, reason: "Agent ID is required" };
}
if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
return {
allowed: false,
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
};
}
const dailyUsed = this.getDailyTokensUsed();
if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) {
return {
allowed: false,
reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`,
};
}
this.activeAgents.add(agentId);
return { allowed: true };
}
/**
* Check if an agent can be spawned without reserving a slot.
*/ */
canSpawnAgent(): { allowed: boolean; reason?: string } { canSpawnAgent(): { allowed: boolean; reason?: string } {
if (this.activeAgentCount.value >= this.budget.maxConcurrentAgents) { if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
return { return {
allowed: false, allowed: false,
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`, reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
@@ -90,10 +119,13 @@ export class BudgetService {
} }
/** /**
* Check if an agent has exceeded its per-task budget * Check if an agent has exceeded its per-task budget (today only).
*/ */
isAgentOverBudget(agentId: string): { overBudget: boolean; totalTokens: number } { isAgentOverBudget(agentId: string): { overBudget: boolean; totalTokens: number } {
const agentRecords = this.records.filter((r) => r.agentId === agentId); const todayStart = this.getTodayStart();
const agentRecords = this.records.filter(
(r) => r.agentId === agentId && r.timestamp >= todayStart
);
const totalTokens = agentRecords.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0); const totalTokens = agentRecords.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
return { return {
@@ -103,23 +135,24 @@ export class BudgetService {
} }
/** /**
* Notify that an agent has started (increment active count) * Notify that an agent has started (track by ID).
*/ */
agentStarted(): void { agentStarted(agentId: string): void {
this.activeAgentCount.value++; this.activeAgents.add(agentId);
} }
/** /**
* Notify that an agent has stopped (decrement active count) * Notify that an agent has stopped (remove by ID).
*/ */
agentStopped(): void { agentStopped(agentId: string): void {
this.activeAgentCount.value = Math.max(0, this.activeAgentCount.value - 1); this.activeAgents.delete(agentId);
} }
/** /**
* Get comprehensive usage summary * Get comprehensive usage summary
*/ */
getUsageSummary(): UsageSummary { getUsageSummary(): UsageSummary {
this.purgeStaleRecords();
const dailyTokensUsed = this.getDailyTokensUsed(); const dailyTokensUsed = this.getDailyTokensUsed();
const dailyUsagePercent = const dailyUsagePercent =
this.budget.dailyTokenLimit > 0 ? (dailyTokensUsed / this.budget.dailyTokenLimit) * 100 : 0; this.budget.dailyTokenLimit > 0 ? (dailyTokensUsed / this.budget.dailyTokenLimit) * 100 : 0;
@@ -129,7 +162,7 @@ export class BudgetService {
dailyTokenLimit: this.budget.dailyTokenLimit, dailyTokenLimit: this.budget.dailyTokenLimit,
dailyUsagePercent: Math.round(dailyUsagePercent * 100) / 100, dailyUsagePercent: Math.round(dailyUsagePercent * 100) / 100,
agentUsage: this.getAgentUsageSummaries(), agentUsage: this.getAgentUsageSummaries(),
activeAgents: this.activeAgentCount.value, activeAgents: this.activeAgents.size,
maxConcurrentAgents: this.budget.maxConcurrentAgents, maxConcurrentAgents: this.budget.maxConcurrentAgents,
budgetStatus: this.getBudgetStatus(dailyUsagePercent), budgetStatus: this.getBudgetStatus(dailyUsagePercent),
}; };
@@ -143,25 +176,98 @@ export class BudgetService {
} }
/** /**
* Get total tokens used today * Load configuration with validation. Clamps invalid values to defaults.
*/ */
private getDailyTokensUsed(): number { private loadAndValidateConfig(): UsageBudget {
const todayStart = new Date(); const raw = {
todayStart.setHours(0, 0, 0, 0); dailyTokenLimit:
const todayIso = todayStart.toISOString(); this.configService.get<number>("orchestrator.budget.dailyTokenLimit") ??
DEFAULT_BUDGET.dailyTokenLimit,
perAgentTokenLimit:
this.configService.get<number>("orchestrator.budget.perAgentTokenLimit") ??
DEFAULT_BUDGET.perAgentTokenLimit,
maxConcurrentAgents:
this.configService.get<number>("orchestrator.budget.maxConcurrentAgents") ??
DEFAULT_BUDGET.maxConcurrentAgents,
maxTaskDurationMinutes:
this.configService.get<number>("orchestrator.budget.maxTaskDurationMinutes") ??
DEFAULT_BUDGET.maxTaskDurationMinutes,
enforceHardLimits:
this.configService.get<boolean>("orchestrator.budget.enforceHardLimits") ??
DEFAULT_BUDGET.enforceHardLimits,
};
return this.records return {
.filter((r) => r.timestamp >= todayIso) dailyTokenLimit: this.clampPositive(raw.dailyTokenLimit, DEFAULT_BUDGET.dailyTokenLimit),
.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0); perAgentTokenLimit: this.clampPositive(
raw.perAgentTokenLimit,
DEFAULT_BUDGET.perAgentTokenLimit
),
maxConcurrentAgents: this.clampPositiveInt(
raw.maxConcurrentAgents,
DEFAULT_BUDGET.maxConcurrentAgents
),
maxTaskDurationMinutes: this.clampPositiveInt(
raw.maxTaskDurationMinutes,
DEFAULT_BUDGET.maxTaskDurationMinutes
),
enforceHardLimits: raw.enforceHardLimits,
};
}
private clampPositive(value: number, fallback: number): number {
return Number.isFinite(value) && value > 0 ? value : fallback;
}
private clampPositiveInt(value: number, fallback: number): number {
return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
} }
/** /**
* Get per-agent usage summaries * Purge records from previous days to prevent unbounded memory growth.
*/
private purgeStaleRecords(): void {
const todayStr = new Date().toISOString().slice(0, 10);
if (this.lastPurgeDate === todayStr) return;
const todayStart = this.getTodayStart();
const before = this.records.length;
this.records = this.records.filter((r) => r.timestamp >= todayStart);
if (before > this.records.length) {
this.logger.log(
`Purged ${String(before - this.records.length)} stale usage records from previous days`
);
}
this.lastPurgeDate = todayStr;
}
/**
* Get total tokens used today using proper Date comparison.
*/
private getDailyTokensUsed(): number {
const todayStart = this.getTodayStart();
return this.records
.filter((r) => r.timestamp >= todayStart)
.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
}
private getTodayStart(): Date {
const todayStart = new Date();
todayStart.setHours(0, 0, 0, 0);
return todayStart;
}
/**
* Get per-agent usage summaries (today only).
*/ */
private getAgentUsageSummaries(): AgentUsageSummary[] { private getAgentUsageSummaries(): AgentUsageSummary[] {
const todayStart = this.getTodayStart();
const todayRecords = this.records.filter((r) => r.timestamp >= todayStart);
const agentMap = new Map<string, { taskId: string; input: number; output: number }>(); const agentMap = new Map<string, { taskId: string; input: number; output: number }>();
for (const record of this.records) { for (const record of todayRecords) {
const existing = agentMap.get(record.agentId); const existing = agentMap.get(record.agentId);
if (existing) { if (existing) {
existing.input += record.inputTokens; existing.input += record.inputTokens;
@@ -175,7 +281,7 @@ export class BudgetService {
} }
} }
return Array.from(agentMap.entries()).map(([agentId, data]) => { return [...agentMap.entries()].map(([agentId, data]) => {
const totalTokens = data.input + data.output; const totalTokens = data.input + data.output;
const usagePercent = const usagePercent =
this.budget.perAgentTokenLimit > 0 this.budget.perAgentTokenLimit > 0

View File

@@ -28,7 +28,7 @@ export interface UsageRecord {
/** Number of output tokens used */ /** Number of output tokens used */
outputTokens: number; outputTokens: number;
/** Timestamp of usage */ /** Timestamp of usage */
timestamp: string; timestamp: Date;
} }
export interface UsageSummary { export interface UsageSummary {