Merge pull request 'feat(#329): Add usage budget management and cost governance' (#336) from feature/329-usage-budget into develop
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Reviewed-on: #336
This commit was merged in pull request #336.
This commit is contained in:
@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
|
||||
import { HealthModule } from "./api/health/health.module";
|
||||
import { AgentsModule } from "./api/agents/agents.module";
|
||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||
import { BudgetModule } from "./budget/budget.module";
|
||||
import { orchestratorConfig } from "./config/orchestrator.config";
|
||||
|
||||
@Module({
|
||||
@@ -21,6 +22,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
||||
HealthModule,
|
||||
AgentsModule,
|
||||
CoordinatorModule,
|
||||
BudgetModule,
|
||||
],
|
||||
})
|
||||
export class AppModule {}
|
||||
|
||||
10
apps/orchestrator/src/budget/budget.module.ts
Normal file
10
apps/orchestrator/src/budget/budget.module.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { BudgetService } from "./budget.service";
|
||||
|
||||
@Module({
|
||||
imports: [ConfigModule],
|
||||
providers: [BudgetService],
|
||||
exports: [BudgetService],
|
||||
})
|
||||
export class BudgetModule {}
|
||||
416
apps/orchestrator/src/budget/budget.service.spec.ts
Normal file
416
apps/orchestrator/src/budget/budget.service.spec.ts
Normal file
@@ -0,0 +1,416 @@
|
||||
/**
|
||||
* BudgetService Unit Tests
|
||||
*
|
||||
* Tests usage budget tracking, enforcement, and reporting.
|
||||
* Covers issue #329 (ORCH-135) including security hardening.
|
||||
*/
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { BudgetService } from "./budget.service";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
|
||||
describe("BudgetService", () => {
|
||||
let service: BudgetService;
|
||||
|
||||
const mockConfigService = {
|
||||
get: vi.fn((_key: string, _defaultValue?: unknown) => undefined),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
service = new BudgetService(mockConfigService as unknown as ConfigService);
|
||||
});
|
||||
|
||||
describe("initialization", () => {
|
||||
it("should initialize with default budget values", () => {
|
||||
const budget = service.getBudget();
|
||||
expect(budget.dailyTokenLimit).toBe(10_000_000);
|
||||
expect(budget.perAgentTokenLimit).toBe(2_000_000);
|
||||
expect(budget.maxConcurrentAgents).toBe(10);
|
||||
expect(budget.maxTaskDurationMinutes).toBe(120);
|
||||
expect(budget.enforceHardLimits).toBe(false);
|
||||
});
|
||||
|
||||
it("should use config values when provided", () => {
|
||||
const customConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 5_000_000,
|
||||
"orchestrator.budget.perAgentTokenLimit": 1_000_000,
|
||||
"orchestrator.budget.maxConcurrentAgents": 5,
|
||||
"orchestrator.budget.maxTaskDurationMinutes": 60,
|
||||
"orchestrator.budget.enforceHardLimits": true,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const customService = new BudgetService(customConfig as unknown as ConfigService);
|
||||
const budget = customService.getBudget();
|
||||
|
||||
expect(budget.dailyTokenLimit).toBe(5_000_000);
|
||||
expect(budget.perAgentTokenLimit).toBe(1_000_000);
|
||||
expect(budget.maxConcurrentAgents).toBe(5);
|
||||
expect(budget.maxTaskDurationMinutes).toBe(60);
|
||||
expect(budget.enforceHardLimits).toBe(true);
|
||||
});
|
||||
|
||||
it("should clamp negative config values to defaults", () => {
|
||||
const badConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": -100,
|
||||
"orchestrator.budget.perAgentTokenLimit": -50,
|
||||
"orchestrator.budget.maxConcurrentAgents": -1,
|
||||
"orchestrator.budget.maxTaskDurationMinutes": 0,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const badService = new BudgetService(badConfig as unknown as ConfigService);
|
||||
const budget = badService.getBudget();
|
||||
|
||||
expect(budget.dailyTokenLimit).toBe(10_000_000);
|
||||
expect(budget.perAgentTokenLimit).toBe(2_000_000);
|
||||
expect(budget.maxConcurrentAgents).toBe(10);
|
||||
expect(budget.maxTaskDurationMinutes).toBe(120);
|
||||
});
|
||||
|
||||
it("should clamp NaN config values to defaults", () => {
|
||||
const nanConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": NaN,
|
||||
"orchestrator.budget.maxConcurrentAgents": Infinity,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const nanService = new BudgetService(nanConfig as unknown as ConfigService);
|
||||
const budget = nanService.getBudget();
|
||||
|
||||
expect(budget.dailyTokenLimit).toBe(10_000_000);
|
||||
expect(budget.maxConcurrentAgents).toBe(10);
|
||||
});
|
||||
});
|
||||
|
||||
describe("recordUsage", () => {
|
||||
it("should record token usage", () => {
|
||||
service.recordUsage("agent-1", "task-1", 1000, 500);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(1500);
|
||||
});
|
||||
|
||||
it("should accumulate usage across multiple records", () => {
|
||||
service.recordUsage("agent-1", "task-1", 1000, 500);
|
||||
service.recordUsage("agent-1", "task-1", 2000, 1000);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(4500);
|
||||
});
|
||||
|
||||
it("should track usage per agent", () => {
|
||||
service.recordUsage("agent-1", "task-1", 1000, 500);
|
||||
service.recordUsage("agent-2", "task-2", 3000, 1500);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.agentUsage).toHaveLength(2);
|
||||
|
||||
const agent1 = summary.agentUsage.find((a) => a.agentId === "agent-1");
|
||||
const agent2 = summary.agentUsage.find((a) => a.agentId === "agent-2");
|
||||
|
||||
expect(agent1?.totalTokens).toBe(1500);
|
||||
expect(agent2?.totalTokens).toBe(4500);
|
||||
});
|
||||
|
||||
it("should clamp negative token values to 0", () => {
|
||||
service.recordUsage("agent-1", "task-1", -500, -200);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
});
|
||||
|
||||
it("should clamp NaN token values to 0", () => {
|
||||
service.recordUsage("agent-1", "task-1", NaN, NaN);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
});
|
||||
|
||||
it("should clamp Infinity token values to 0", () => {
|
||||
service.recordUsage("agent-1", "task-1", Infinity, -Infinity);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
});
|
||||
|
||||
it("should skip recording when agentId is empty", () => {
|
||||
service.recordUsage("", "task-1", 1000, 500);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
expect(summary.agentUsage).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("should skip recording when taskId is empty", () => {
|
||||
service.recordUsage("agent-1", "", 1000, 500);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("canSpawnAgent", () => {
|
||||
it("should allow spawning when under limits", () => {
|
||||
const result = service.canSpawnAgent();
|
||||
expect(result.allowed).toBe(true);
|
||||
});
|
||||
|
||||
it("should block spawning when at max concurrent agents", () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
service.agentStarted(`agent-${String(i)}`);
|
||||
}
|
||||
|
||||
const result = service.canSpawnAgent();
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.reason).toContain("Maximum concurrent agents");
|
||||
});
|
||||
|
||||
it("should allow spawning after agent stops", () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
service.agentStarted(`agent-${String(i)}`);
|
||||
}
|
||||
|
||||
expect(service.canSpawnAgent().allowed).toBe(false);
|
||||
|
||||
service.agentStopped("agent-0");
|
||||
|
||||
expect(service.canSpawnAgent().allowed).toBe(true);
|
||||
});
|
||||
|
||||
it("should block spawning when daily budget exceeded with hard limits", () => {
|
||||
const strictConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 1000,
|
||||
"orchestrator.budget.enforceHardLimits": true,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const strictService = new BudgetService(strictConfig as unknown as ConfigService);
|
||||
strictService.recordUsage("agent-1", "task-1", 800, 300);
|
||||
|
||||
const result = strictService.canSpawnAgent();
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.reason).toContain("Daily token budget exceeded");
|
||||
});
|
||||
|
||||
it("should allow spawning when over budget without hard limits", () => {
|
||||
service.recordUsage("agent-1", "task-1", 5_000_000, 5_000_000);
|
||||
|
||||
const result = service.canSpawnAgent();
|
||||
expect(result.allowed).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("trySpawnAgent", () => {
|
||||
it("should atomically check and reserve slot", () => {
|
||||
const result = service.trySpawnAgent("agent-1");
|
||||
expect(result.allowed).toBe(true);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(1);
|
||||
});
|
||||
|
||||
it("should block when at max concurrent and not reserve slot", () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
service.trySpawnAgent(`agent-${String(i)}`);
|
||||
}
|
||||
|
||||
const result = service.trySpawnAgent("agent-11");
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.reason).toContain("Maximum concurrent agents");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(10);
|
||||
});
|
||||
|
||||
it("should reject empty agent ID", () => {
|
||||
const result = service.trySpawnAgent("");
|
||||
expect(result.allowed).toBe(false);
|
||||
expect(result.reason).toContain("Agent ID is required");
|
||||
});
|
||||
|
||||
it("should not double-count same agent ID", () => {
|
||||
service.trySpawnAgent("agent-1");
|
||||
service.trySpawnAgent("agent-1");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isAgentOverBudget", () => {
|
||||
it("should return false when agent is within budget", () => {
|
||||
service.recordUsage("agent-1", "task-1", 100_000, 50_000);
|
||||
|
||||
const result = service.isAgentOverBudget("agent-1");
|
||||
expect(result.overBudget).toBe(false);
|
||||
expect(result.totalTokens).toBe(150_000);
|
||||
});
|
||||
|
||||
it("should return true when agent exceeds per-agent limit", () => {
|
||||
service.recordUsage("agent-1", "task-1", 1_000_000, 1_000_000);
|
||||
|
||||
const result = service.isAgentOverBudget("agent-1");
|
||||
expect(result.overBudget).toBe(true);
|
||||
expect(result.totalTokens).toBe(2_000_000);
|
||||
});
|
||||
|
||||
it("should return false for unknown agent", () => {
|
||||
const result = service.isAgentOverBudget("non-existent");
|
||||
expect(result.overBudget).toBe(false);
|
||||
expect(result.totalTokens).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("agentStarted / agentStopped", () => {
|
||||
it("should track active agent count by ID", () => {
|
||||
service.agentStarted("agent-1");
|
||||
service.agentStarted("agent-2");
|
||||
service.agentStarted("agent-3");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(3);
|
||||
});
|
||||
|
||||
it("should decrement active count on stop", () => {
|
||||
service.agentStarted("agent-1");
|
||||
service.agentStarted("agent-2");
|
||||
service.agentStopped("agent-1");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(1);
|
||||
});
|
||||
|
||||
it("should handle stopping non-existent agent gracefully", () => {
|
||||
service.agentStopped("non-existent");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(0);
|
||||
});
|
||||
|
||||
it("should not double-count same agent ID on start", () => {
|
||||
service.agentStarted("agent-1");
|
||||
service.agentStarted("agent-1");
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
expect(summary.activeAgents).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getUsageSummary", () => {
|
||||
it("should return complete summary with no usage", () => {
|
||||
const summary = service.getUsageSummary();
|
||||
|
||||
expect(summary.dailyTokensUsed).toBe(0);
|
||||
expect(summary.dailyTokenLimit).toBe(10_000_000);
|
||||
expect(summary.dailyUsagePercent).toBe(0);
|
||||
expect(summary.agentUsage).toHaveLength(0);
|
||||
expect(summary.activeAgents).toBe(0);
|
||||
expect(summary.maxConcurrentAgents).toBe(10);
|
||||
expect(summary.budgetStatus).toBe("within_budget");
|
||||
});
|
||||
|
||||
it("should calculate usage percentage correctly", () => {
|
||||
const customConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 10_000,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const customService = new BudgetService(customConfig as unknown as ConfigService);
|
||||
customService.recordUsage("agent-1", "task-1", 5000, 0);
|
||||
|
||||
const summary = customService.getUsageSummary();
|
||||
expect(summary.dailyUsagePercent).toBe(50);
|
||||
});
|
||||
|
||||
it("should report 'approaching_limit' at 80%", () => {
|
||||
const customConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 10_000,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const customService = new BudgetService(customConfig as unknown as ConfigService);
|
||||
customService.recordUsage("agent-1", "task-1", 8500, 0);
|
||||
|
||||
const summary = customService.getUsageSummary();
|
||||
expect(summary.budgetStatus).toBe("approaching_limit");
|
||||
});
|
||||
|
||||
it("should report 'at_limit' at 95%", () => {
|
||||
const customConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 10_000,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const customService = new BudgetService(customConfig as unknown as ConfigService);
|
||||
customService.recordUsage("agent-1", "task-1", 9600, 0);
|
||||
|
||||
const summary = customService.getUsageSummary();
|
||||
expect(summary.budgetStatus).toBe("at_limit");
|
||||
});
|
||||
|
||||
it("should report 'exceeded' over 100%", () => {
|
||||
const customConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, unknown> = {
|
||||
"orchestrator.budget.dailyTokenLimit": 10_000,
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
const customService = new BudgetService(customConfig as unknown as ConfigService);
|
||||
customService.recordUsage("agent-1", "task-1", 11_000, 0);
|
||||
|
||||
const summary = customService.getUsageSummary();
|
||||
expect(summary.budgetStatus).toBe("exceeded");
|
||||
});
|
||||
|
||||
it("should calculate per-agent usage percentage", () => {
|
||||
service.recordUsage("agent-1", "task-1", 500_000, 500_000);
|
||||
|
||||
const summary = service.getUsageSummary();
|
||||
const agent = summary.agentUsage.find((a) => a.agentId === "agent-1");
|
||||
|
||||
expect(agent?.usagePercent).toBe(50);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getBudget", () => {
|
||||
it("should return a copy of the budget", () => {
|
||||
const budget1 = service.getBudget();
|
||||
const budget2 = service.getBudget();
|
||||
|
||||
expect(budget1).toEqual(budget2);
|
||||
expect(budget1).not.toBe(budget2);
|
||||
});
|
||||
});
|
||||
});
|
||||
311
apps/orchestrator/src/budget/budget.service.ts
Normal file
311
apps/orchestrator/src/budget/budget.service.ts
Normal file
@@ -0,0 +1,311 @@
|
||||
/**
|
||||
* Usage Budget Management Service
|
||||
*
|
||||
* Tracks token usage per agent and enforces budget limits.
|
||||
* Provides real-time usage summaries and budget status checks.
|
||||
*/
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import type {
|
||||
UsageBudget,
|
||||
UsageRecord,
|
||||
UsageSummary,
|
||||
AgentUsageSummary,
|
||||
BudgetStatus,
|
||||
} from "./budget.types";
|
||||
import { DEFAULT_BUDGET } from "./budget.types";
|
||||
|
||||
/** Sanitize strings for safe log output */
|
||||
function sanitizeForLog(value: string): string {
|
||||
return value.replace(/[\n\r\t]/g, "_").slice(0, 128);
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class BudgetService {
|
||||
private readonly logger = new Logger(BudgetService.name);
|
||||
private readonly budget: UsageBudget;
|
||||
private records: UsageRecord[] = [];
|
||||
private readonly activeAgents = new Set<string>();
|
||||
private lastPurgeDate = "";
|
||||
|
||||
constructor(private readonly configService: ConfigService) {
|
||||
this.budget = this.loadAndValidateConfig();
|
||||
|
||||
this.logger.log(
|
||||
`BudgetService initialized: daily=${String(this.budget.dailyTokenLimit)} tokens, ` +
|
||||
`perAgent=${String(this.budget.perAgentTokenLimit)} tokens, ` +
|
||||
`maxConcurrent=${String(this.budget.maxConcurrentAgents)}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record token usage for an agent.
|
||||
* Negative and NaN values are clamped to 0.
|
||||
*/
|
||||
recordUsage(agentId: string, taskId: string, inputTokens: number, outputTokens: number): void {
|
||||
if (!agentId || !taskId) {
|
||||
this.logger.warn("recordUsage called with empty agentId or taskId — skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
const safeInput = Math.max(0, Number.isFinite(inputTokens) ? inputTokens : 0);
|
||||
const safeOutput = Math.max(0, Number.isFinite(outputTokens) ? outputTokens : 0);
|
||||
|
||||
this.purgeStaleRecords();
|
||||
|
||||
const record: UsageRecord = {
|
||||
agentId,
|
||||
taskId,
|
||||
inputTokens: safeInput,
|
||||
outputTokens: safeOutput,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
this.records.push(record);
|
||||
|
||||
this.logger.debug(
|
||||
`Usage recorded: agent=${sanitizeForLog(agentId)} input=${String(safeInput)} output=${String(safeOutput)}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an agent can be spawned (concurrency and budget check).
|
||||
* When allowed, atomically increments the active agent count.
|
||||
*/
|
||||
trySpawnAgent(agentId: string): { allowed: boolean; reason?: string } {
|
||||
if (!agentId) {
|
||||
return { allowed: false, reason: "Agent ID is required" };
|
||||
}
|
||||
|
||||
if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
|
||||
};
|
||||
}
|
||||
|
||||
const dailyUsed = this.getDailyTokensUsed();
|
||||
if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`,
|
||||
};
|
||||
}
|
||||
|
||||
this.activeAgents.add(agentId);
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an agent can be spawned without reserving a slot.
|
||||
*/
|
||||
canSpawnAgent(): { allowed: boolean; reason?: string } {
|
||||
if (this.activeAgents.size >= this.budget.maxConcurrentAgents) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`,
|
||||
};
|
||||
}
|
||||
|
||||
const dailyUsed = this.getDailyTokensUsed();
|
||||
if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`,
|
||||
};
|
||||
}
|
||||
|
||||
return { allowed: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an agent has exceeded its per-task budget (today only).
|
||||
*/
|
||||
isAgentOverBudget(agentId: string): { overBudget: boolean; totalTokens: number } {
|
||||
const todayStart = this.getTodayStart();
|
||||
const agentRecords = this.records.filter(
|
||||
(r) => r.agentId === agentId && r.timestamp >= todayStart
|
||||
);
|
||||
const totalTokens = agentRecords.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
|
||||
|
||||
return {
|
||||
overBudget: totalTokens >= this.budget.perAgentTokenLimit,
|
||||
totalTokens,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that an agent has started (track by ID).
|
||||
*/
|
||||
agentStarted(agentId: string): void {
|
||||
this.activeAgents.add(agentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify that an agent has stopped (remove by ID).
|
||||
*/
|
||||
agentStopped(agentId: string): void {
|
||||
this.activeAgents.delete(agentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get comprehensive usage summary
|
||||
*/
|
||||
getUsageSummary(): UsageSummary {
|
||||
this.purgeStaleRecords();
|
||||
const dailyTokensUsed = this.getDailyTokensUsed();
|
||||
const dailyUsagePercent =
|
||||
this.budget.dailyTokenLimit > 0 ? (dailyTokensUsed / this.budget.dailyTokenLimit) * 100 : 0;
|
||||
|
||||
return {
|
||||
dailyTokensUsed,
|
||||
dailyTokenLimit: this.budget.dailyTokenLimit,
|
||||
dailyUsagePercent: Math.round(dailyUsagePercent * 100) / 100,
|
||||
agentUsage: this.getAgentUsageSummaries(),
|
||||
activeAgents: this.activeAgents.size,
|
||||
maxConcurrentAgents: this.budget.maxConcurrentAgents,
|
||||
budgetStatus: this.getBudgetStatus(dailyUsagePercent),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configured budget
|
||||
*/
|
||||
getBudget(): UsageBudget {
|
||||
return { ...this.budget };
|
||||
}
|
||||
|
||||
/**
|
||||
* Load configuration with validation. Clamps invalid values to defaults.
|
||||
*/
|
||||
private loadAndValidateConfig(): UsageBudget {
|
||||
const raw = {
|
||||
dailyTokenLimit:
|
||||
this.configService.get<number>("orchestrator.budget.dailyTokenLimit") ??
|
||||
DEFAULT_BUDGET.dailyTokenLimit,
|
||||
perAgentTokenLimit:
|
||||
this.configService.get<number>("orchestrator.budget.perAgentTokenLimit") ??
|
||||
DEFAULT_BUDGET.perAgentTokenLimit,
|
||||
maxConcurrentAgents:
|
||||
this.configService.get<number>("orchestrator.budget.maxConcurrentAgents") ??
|
||||
DEFAULT_BUDGET.maxConcurrentAgents,
|
||||
maxTaskDurationMinutes:
|
||||
this.configService.get<number>("orchestrator.budget.maxTaskDurationMinutes") ??
|
||||
DEFAULT_BUDGET.maxTaskDurationMinutes,
|
||||
enforceHardLimits:
|
||||
this.configService.get<boolean>("orchestrator.budget.enforceHardLimits") ??
|
||||
DEFAULT_BUDGET.enforceHardLimits,
|
||||
};
|
||||
|
||||
return {
|
||||
dailyTokenLimit: this.clampPositive(raw.dailyTokenLimit, DEFAULT_BUDGET.dailyTokenLimit),
|
||||
perAgentTokenLimit: this.clampPositive(
|
||||
raw.perAgentTokenLimit,
|
||||
DEFAULT_BUDGET.perAgentTokenLimit
|
||||
),
|
||||
maxConcurrentAgents: this.clampPositiveInt(
|
||||
raw.maxConcurrentAgents,
|
||||
DEFAULT_BUDGET.maxConcurrentAgents
|
||||
),
|
||||
maxTaskDurationMinutes: this.clampPositiveInt(
|
||||
raw.maxTaskDurationMinutes,
|
||||
DEFAULT_BUDGET.maxTaskDurationMinutes
|
||||
),
|
||||
enforceHardLimits: raw.enforceHardLimits,
|
||||
};
|
||||
}
|
||||
|
||||
private clampPositive(value: number, fallback: number): number {
|
||||
return Number.isFinite(value) && value > 0 ? value : fallback;
|
||||
}
|
||||
|
||||
private clampPositiveInt(value: number, fallback: number): number {
|
||||
return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge records from previous days to prevent unbounded memory growth.
|
||||
*/
|
||||
private purgeStaleRecords(): void {
|
||||
const todayStr = new Date().toISOString().slice(0, 10);
|
||||
if (this.lastPurgeDate === todayStr) return;
|
||||
|
||||
const todayStart = this.getTodayStart();
|
||||
const before = this.records.length;
|
||||
this.records = this.records.filter((r) => r.timestamp >= todayStart);
|
||||
|
||||
if (before > this.records.length) {
|
||||
this.logger.log(
|
||||
`Purged ${String(before - this.records.length)} stale usage records from previous days`
|
||||
);
|
||||
}
|
||||
this.lastPurgeDate = todayStr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total tokens used today using proper Date comparison.
|
||||
*/
|
||||
private getDailyTokensUsed(): number {
|
||||
const todayStart = this.getTodayStart();
|
||||
|
||||
return this.records
|
||||
.filter((r) => r.timestamp >= todayStart)
|
||||
.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0);
|
||||
}
|
||||
|
||||
private getTodayStart(): Date {
|
||||
const todayStart = new Date();
|
||||
todayStart.setHours(0, 0, 0, 0);
|
||||
return todayStart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get per-agent usage summaries (today only).
|
||||
*/
|
||||
private getAgentUsageSummaries(): AgentUsageSummary[] {
|
||||
const todayStart = this.getTodayStart();
|
||||
const todayRecords = this.records.filter((r) => r.timestamp >= todayStart);
|
||||
const agentMap = new Map<string, { taskId: string; input: number; output: number }>();
|
||||
|
||||
for (const record of todayRecords) {
|
||||
const existing = agentMap.get(record.agentId);
|
||||
if (existing) {
|
||||
existing.input += record.inputTokens;
|
||||
existing.output += record.outputTokens;
|
||||
} else {
|
||||
agentMap.set(record.agentId, {
|
||||
taskId: record.taskId,
|
||||
input: record.inputTokens,
|
||||
output: record.outputTokens,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return [...agentMap.entries()].map(([agentId, data]) => {
|
||||
const totalTokens = data.input + data.output;
|
||||
const usagePercent =
|
||||
this.budget.perAgentTokenLimit > 0
|
||||
? Math.round((totalTokens / this.budget.perAgentTokenLimit) * 10000) / 100
|
||||
: 0;
|
||||
|
||||
return {
|
||||
agentId,
|
||||
taskId: data.taskId,
|
||||
inputTokens: data.input,
|
||||
outputTokens: data.output,
|
||||
totalTokens,
|
||||
usagePercent,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine overall budget status
|
||||
*/
|
||||
private getBudgetStatus(dailyUsagePercent: number): BudgetStatus {
|
||||
if (dailyUsagePercent >= 100) return "exceeded";
|
||||
if (dailyUsagePercent >= 95) return "at_limit";
|
||||
if (dailyUsagePercent >= 80) return "approaching_limit";
|
||||
return "within_budget";
|
||||
}
|
||||
}
|
||||
69
apps/orchestrator/src/budget/budget.types.ts
Normal file
69
apps/orchestrator/src/budget/budget.types.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Usage Budget Management types
|
||||
*
|
||||
* Defines types for tracking and enforcing agent usage budgets
|
||||
* including token limits, cost caps, and time-based constraints.
|
||||
*/
|
||||
|
||||
export interface UsageBudget {
|
||||
/** Daily token limit across all agents */
|
||||
dailyTokenLimit: number;
|
||||
/** Per-agent token limit per task */
|
||||
perAgentTokenLimit: number;
|
||||
/** Maximum concurrent agents */
|
||||
maxConcurrentAgents: number;
|
||||
/** Maximum task duration in minutes */
|
||||
maxTaskDurationMinutes: number;
|
||||
/** Whether to hard-stop agents exceeding budget */
|
||||
enforceHardLimits: boolean;
|
||||
}
|
||||
|
||||
export interface UsageRecord {
|
||||
/** Agent that consumed tokens */
|
||||
agentId: string;
|
||||
/** Task being worked on */
|
||||
taskId: string;
|
||||
/** Number of input tokens used */
|
||||
inputTokens: number;
|
||||
/** Number of output tokens used */
|
||||
outputTokens: number;
|
||||
/** Timestamp of usage */
|
||||
timestamp: Date;
|
||||
}
|
||||
|
||||
export interface UsageSummary {
|
||||
/** Total tokens used today */
|
||||
dailyTokensUsed: number;
|
||||
/** Daily token limit */
|
||||
dailyTokenLimit: number;
|
||||
/** Percentage of daily budget used */
|
||||
dailyUsagePercent: number;
|
||||
/** Per-agent usage breakdown */
|
||||
agentUsage: AgentUsageSummary[];
|
||||
/** Number of currently active agents */
|
||||
activeAgents: number;
|
||||
/** Maximum concurrent agents allowed */
|
||||
maxConcurrentAgents: number;
|
||||
/** Whether any budget thresholds are approaching */
|
||||
budgetStatus: BudgetStatus;
|
||||
}
|
||||
|
||||
export interface AgentUsageSummary {
|
||||
agentId: string;
|
||||
taskId: string;
|
||||
inputTokens: number;
|
||||
outputTokens: number;
|
||||
totalTokens: number;
|
||||
/** Percentage of per-agent limit used */
|
||||
usagePercent: number;
|
||||
}
|
||||
|
||||
export type BudgetStatus = "within_budget" | "approaching_limit" | "at_limit" | "exceeded";
|
||||
|
||||
export const DEFAULT_BUDGET: UsageBudget = {
|
||||
dailyTokenLimit: 10_000_000,
|
||||
perAgentTokenLimit: 2_000_000,
|
||||
maxConcurrentAgents: 10,
|
||||
maxTaskDurationMinutes: 120,
|
||||
enforceHardLimits: false,
|
||||
};
|
||||
Reference in New Issue
Block a user