Files
stack/apps/orchestrator/src/budget/budget.service.ts
Jason Woltje 2cb3fe8f5a
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed
fix(#329): Harden BudgetService against security review findings
- Fix CRITICAL: Unbounded memory growth via daily record purging
- Fix CRITICAL: Negative/NaN/Infinity token bypass via input clamping
- Fix HIGH: TOCTOU race via atomic trySpawnAgent() method
- Fix HIGH: Phantom agent leak via Set<string> ID tracking (not counter)
- Fix HIGH: isAgentOverBudget now scoped to today only
- Fix HIGH: Config validation clamps invalid values to safe defaults
- Fix MEDIUM: Wire BudgetModule into AppModule
- Fix MEDIUM: Sanitize agentId in log output to prevent log injection
- Fix MEDIUM: Use Date objects for timezone-safe comparisons
- Fix MEDIUM: Reject empty agentId/taskId in recordUsage
- Add tests for negative tokens, NaN, Infinity, empty IDs, config edge cases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:15:33 -06:00

312 lines
9.5 KiB
TypeScript

/**
* Usage Budget Management Service
*
* Tracks token usage per agent and enforces budget limits.
* Provides real-time usage summaries and budget status checks.
*/
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import type {
UsageBudget,
UsageRecord,
UsageSummary,
AgentUsageSummary,
BudgetStatus,
} from "./budget.types";
import { DEFAULT_BUDGET } from "./budget.types";
/** Sanitize strings for safe log output */
function sanitizeForLog(value: string): string {
return value.replace(/[\n\r\t]/g, "_").slice(0, 128);
}
@Injectable()
export class BudgetService {
private readonly logger = new Logger(BudgetService.name);
private readonly budget: UsageBudget;
private records: UsageRecord[] = [];
private readonly activeAgents = new Set<string>();
private lastPurgeDate = "";
constructor(private readonly configService: ConfigService) {
this.budget = this.loadAndValidateConfig();
this.logger.log(
`BudgetService initialized: daily=${String(this.budget.dailyTokenLimit)} tokens, ` +
`perAgent=${String(this.budget.perAgentTokenLimit)} tokens, ` +
`maxConcurrent=${String(this.budget.maxConcurrentAgents)}`
);
}
/**
* Record token usage for an agent.
* Negative and NaN values are clamped to 0.
*/
recordUsage(agentId: string, taskId: string, inputTokens: number, outputTokens: number): void {
if (!agentId || !taskId) {
this.logger.warn("recordUsage called with empty agentId or taskId — skipping");
return;
}
const safeInput = Math.max(0, Number.isFinite(inputTokens) ? inputTokens : 0);
const safeOutput = Math.max(0, Number.isFinite(outputTokens) ? outputTokens : 0);
this.purgeStaleRecords();
const record: UsageRecord = {
agentId,
taskId,
inputTokens: safeInput,
outputTokens: safeOutput,
timestamp: new Date(),
};
this.records.push(record);
this.logger.debug(
`Usage recorded: agent=${sanitizeForLog(agentId)} input=${String(safeInput)} output=${String(safeOutput)}`
);
}
/**
* Check if an agent can be spawned (concurrency and budget check).
* When allowed, atomically increments the active agent count.
*/
trySpawnAgent(agentId: string): { allowed: boolean; reason?: string } {
if (!agentId) {
return { allowed: false, reason: "Agent ID is required" };
}
if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
return {
allowed: false,
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
};
}
const dailyUsed = this.getDailyTokensUsed();
if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) {
return {
allowed: false,
reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`,
};
}
this.activeAgents.add(agentId);
return { allowed: true };
}
/**
* Check if an agent can be spawned without reserving a slot.
*/
canSpawnAgent(): { allowed: boolean; reason?: string } {
if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
return {
allowed: false,
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
};
}
const dailyUsed = this.getDailyTokensUsed();
if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) {
return {
allowed: false,
reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`,
};
}
return { allowed: true };
}
/**
* Check if an agent has exceeded its per-task budget (today only).
*/
isAgentOverBudget(agentId: string): { overBudget: boolean; totalTokens: number } {
const todayStart = this.getTodayStart();
const agentRecords = this.records.filter(
(r) => r.agentId === agentId && r.timestamp >= todayStart
);
const totalTokens = agentRecords.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
return {
overBudget: totalTokens >= this.budget.perAgentTokenLimit,
totalTokens,
};
}
/**
* Notify that an agent has started (track by ID).
*/
agentStarted(agentId: string): void {
this.activeAgents.add(agentId);
}
/**
* Notify that an agent has stopped (remove by ID).
*/
agentStopped(agentId: string): void {
this.activeAgents.delete(agentId);
}
/**
* Get comprehensive usage summary
*/
getUsageSummary(): UsageSummary {
this.purgeStaleRecords();
const dailyTokensUsed = this.getDailyTokensUsed();
const dailyUsagePercent =
this.budget.dailyTokenLimit > 0 ? (dailyTokensUsed / this.budget.dailyTokenLimit) * 100 : 0;
return {
dailyTokensUsed,
dailyTokenLimit: this.budget.dailyTokenLimit,
dailyUsagePercent: Math.round(dailyUsagePercent * 100) / 100,
agentUsage: this.getAgentUsageSummaries(),
activeAgents: this.activeAgents.size,
maxConcurrentAgents: this.budget.maxConcurrentAgents,
budgetStatus: this.getBudgetStatus(dailyUsagePercent),
};
}
/**
* Get the configured budget
*/
getBudget(): UsageBudget {
return { ...this.budget };
}
/**
* Load configuration with validation. Clamps invalid values to defaults.
*/
private loadAndValidateConfig(): UsageBudget {
const raw = {
dailyTokenLimit:
this.configService.get<number>("orchestrator.budget.dailyTokenLimit") ??
DEFAULT_BUDGET.dailyTokenLimit,
perAgentTokenLimit:
this.configService.get<number>("orchestrator.budget.perAgentTokenLimit") ??
DEFAULT_BUDGET.perAgentTokenLimit,
maxConcurrentAgents:
this.configService.get<number>("orchestrator.budget.maxConcurrentAgents") ??
DEFAULT_BUDGET.maxConcurrentAgents,
maxTaskDurationMinutes:
this.configService.get<number>("orchestrator.budget.maxTaskDurationMinutes") ??
DEFAULT_BUDGET.maxTaskDurationMinutes,
enforceHardLimits:
this.configService.get<boolean>("orchestrator.budget.enforceHardLimits") ??
DEFAULT_BUDGET.enforceHardLimits,
};
return {
dailyTokenLimit: this.clampPositive(raw.dailyTokenLimit, DEFAULT_BUDGET.dailyTokenLimit),
perAgentTokenLimit: this.clampPositive(
raw.perAgentTokenLimit,
DEFAULT_BUDGET.perAgentTokenLimit
),
maxConcurrentAgents: this.clampPositiveInt(
raw.maxConcurrentAgents,
DEFAULT_BUDGET.maxConcurrentAgents
),
maxTaskDurationMinutes: this.clampPositiveInt(
raw.maxTaskDurationMinutes,
DEFAULT_BUDGET.maxTaskDurationMinutes
),
enforceHardLimits: raw.enforceHardLimits,
};
}
private clampPositive(value: number, fallback: number): number {
return Number.isFinite(value) && value > 0 ? value : fallback;
}
private clampPositiveInt(value: number, fallback: number): number {
return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
}
/**
* Purge records from previous days to prevent unbounded memory growth.
*/
private purgeStaleRecords(): void {
const todayStr = new Date().toISOString().slice(0, 10);
if (this.lastPurgeDate === todayStr) return;
const todayStart = this.getTodayStart();
const before = this.records.length;
this.records = this.records.filter((r) => r.timestamp >= todayStart);
if (before > this.records.length) {
this.logger.log(
`Purged ${String(before - this.records.length)} stale usage records from previous days`
);
}
this.lastPurgeDate = todayStr;
}
/**
* Get total tokens used today using proper Date comparison.
*/
private getDailyTokensUsed(): number {
const todayStart = this.getTodayStart();
return this.records
.filter((r) => r.timestamp >= todayStart)
.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
}
private getTodayStart(): Date {
const todayStart = new Date();
todayStart.setHours(0, 0, 0, 0);
return todayStart;
}
/**
* Get per-agent usage summaries (today only).
*/
private getAgentUsageSummaries(): AgentUsageSummary[] {
const todayStart = this.getTodayStart();
const todayRecords = this.records.filter((r) => r.timestamp >= todayStart);
const agentMap = new Map<string, { taskId: string; input: number; output: number }>();
for (const record of todayRecords) {
const existing = agentMap.get(record.agentId);
if (existing) {
existing.input += record.inputTokens;
existing.output += record.outputTokens;
} else {
agentMap.set(record.agentId, {
taskId: record.taskId,
input: record.inputTokens,
output: record.outputTokens,
});
}
}
return [...agentMap.entries()].map(([agentId, data]) => {
const totalTokens = data.input + data.output;
const usagePercent =
this.budget.perAgentTokenLimit > 0
? Math.round((totalTokens / this.budget.perAgentTokenLimit) * 10000) / 100
: 0;
return {
agentId,
taskId: data.taskId,
inputTokens: data.input,
outputTokens: data.output,
totalTokens,
usagePercent,
};
});
}
/**
* Determine overall budget status
*/
private getBudgetStatus(dailyUsagePercent: number): BudgetStatus {
if (dailyUsagePercent >= 100) return "exceeded";
if (dailyUsagePercent >= 95) return "at_limit";
if (dailyUsagePercent >= 80) return "approaching_limit";
return "within_budget";
}
}