diff --git a/apps/orchestrator/src/app.module.ts b/apps/orchestrator/src/app.module.ts index 20eb134..55b7e24 100644 --- a/apps/orchestrator/src/app.module.ts +++ b/apps/orchestrator/src/app.module.ts @@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq"; import { HealthModule } from "./api/health/health.module"; import { AgentsModule } from "./api/agents/agents.module"; import { CoordinatorModule } from "./coordinator/coordinator.module"; +import { BudgetModule } from "./budget/budget.module"; import { orchestratorConfig } from "./config/orchestrator.config"; @Module({ @@ -21,6 +22,7 @@ import { orchestratorConfig } from "./config/orchestrator.config"; HealthModule, AgentsModule, CoordinatorModule, + BudgetModule, ], }) export class AppModule {} diff --git a/apps/orchestrator/src/budget/budget.service.spec.ts b/apps/orchestrator/src/budget/budget.service.spec.ts index 40b1f2d..da742c0 100644 --- a/apps/orchestrator/src/budget/budget.service.spec.ts +++ b/apps/orchestrator/src/budget/budget.service.spec.ts @@ -2,7 +2,7 @@ * BudgetService Unit Tests * * Tests usage budget tracking, enforcement, and reporting. - * Covers issue #329 (ORCH-135) + * Covers issue #329 (ORCH-135) including security hardening. */ import { describe, it, expect, beforeEach, vi } from "vitest"; import { BudgetService } from "./budget.service"; @@ -12,7 +12,7 @@ describe("BudgetService", () => { let service: BudgetService; const mockConfigService = { - get: vi.fn((_key: string, defaultValue?: unknown) => defaultValue), + get: vi.fn((_key: string, _defaultValue?: unknown) => undefined), }; beforeEach(() => { @@ -53,6 +53,46 @@ describe("BudgetService", () => { expect(budget.maxTaskDurationMinutes).toBe(60); expect(budget.enforceHardLimits).toBe(true); }); + + it("should clamp negative config values to defaults", () => { + const badConfig = { + get: vi.fn((key: string) => { + const config: Record = { + "orchestrator.budget.dailyTokenLimit": -100, + "orchestrator.budget.perAgentTokenLimit": -50, + "orchestrator.budget.maxConcurrentAgents": -1, + "orchestrator.budget.maxTaskDurationMinutes": 0, + }; + return config[key]; + }), + }; + + const badService = new BudgetService(badConfig as unknown as ConfigService); + const budget = badService.getBudget(); + + expect(budget.dailyTokenLimit).toBe(10_000_000); + expect(budget.perAgentTokenLimit).toBe(2_000_000); + expect(budget.maxConcurrentAgents).toBe(10); + expect(budget.maxTaskDurationMinutes).toBe(120); + }); + + it("should clamp NaN config values to defaults", () => { + const nanConfig = { + get: vi.fn((key: string) => { + const config: Record = { + "orchestrator.budget.dailyTokenLimit": NaN, + "orchestrator.budget.maxConcurrentAgents": Infinity, + }; + return config[key]; + }), + }; + + const nanService = new BudgetService(nanConfig as unknown as ConfigService); + const budget = nanService.getBudget(); + + expect(budget.dailyTokenLimit).toBe(10_000_000); + expect(budget.maxConcurrentAgents).toBe(10); + }); }); describe("recordUsage", () => { @@ -84,6 +124,42 @@ describe("BudgetService", () => { expect(agent1?.totalTokens).toBe(1500); expect(agent2?.totalTokens).toBe(4500); }); + + it("should clamp negative token values to 0", () => { + service.recordUsage("agent-1", "task-1", -500, -200); + + const summary = service.getUsageSummary(); + expect(summary.dailyTokensUsed).toBe(0); + }); + + it("should clamp NaN token values to 0", () => { + service.recordUsage("agent-1", "task-1", NaN, NaN); + + const summary = service.getUsageSummary(); + expect(summary.dailyTokensUsed).toBe(0); + }); + + it("should clamp Infinity token values to 0", () => { + service.recordUsage("agent-1", "task-1", Infinity, -Infinity); + + const summary = service.getUsageSummary(); + expect(summary.dailyTokensUsed).toBe(0); + }); + + it("should skip recording when agentId is empty", () => { + service.recordUsage("", "task-1", 1000, 500); + + const summary = service.getUsageSummary(); + expect(summary.dailyTokensUsed).toBe(0); + expect(summary.agentUsage).toHaveLength(0); + }); + + it("should skip recording when taskId is empty", () => { + service.recordUsage("agent-1", "", 1000, 500); + + const summary = service.getUsageSummary(); + expect(summary.dailyTokensUsed).toBe(0); + }); }); describe("canSpawnAgent", () => { @@ -94,7 +170,7 @@ describe("BudgetService", () => { it("should block spawning when at max concurrent agents", () => { for (let i = 0; i < 10; i++) { - service.agentStarted(); + service.agentStarted(`agent-${String(i)}`); } const result = service.canSpawnAgent(); @@ -104,12 +180,12 @@ describe("BudgetService", () => { it("should allow spawning after agent stops", () => { for (let i = 0; i < 10; i++) { - service.agentStarted(); + service.agentStarted(`agent-${String(i)}`); } expect(service.canSpawnAgent().allowed).toBe(false); - service.agentStopped(); + service.agentStopped("agent-0"); expect(service.canSpawnAgent().allowed).toBe(true); }); @@ -141,6 +217,43 @@ describe("BudgetService", () => { }); }); + describe("trySpawnAgent", () => { + it("should atomically check and reserve slot", () => { + const result = service.trySpawnAgent("agent-1"); + expect(result.allowed).toBe(true); + + const summary = service.getUsageSummary(); + expect(summary.activeAgents).toBe(1); + }); + + it("should block when at max concurrent and not reserve slot", () => { + for (let i = 0; i < 10; i++) { + service.trySpawnAgent(`agent-${String(i)}`); + } + + const result = service.trySpawnAgent("agent-11"); + expect(result.allowed).toBe(false); + expect(result.reason).toContain("Maximum concurrent agents"); + + const summary = service.getUsageSummary(); + expect(summary.activeAgents).toBe(10); + }); + + it("should reject empty agent ID", () => { + const result = service.trySpawnAgent(""); + expect(result.allowed).toBe(false); + expect(result.reason).toContain("Agent ID is required"); + }); + + it("should not double-count same agent ID", () => { + service.trySpawnAgent("agent-1"); + service.trySpawnAgent("agent-1"); + + const summary = service.getUsageSummary(); + expect(summary.activeAgents).toBe(1); + }); + }); + describe("isAgentOverBudget", () => { it("should return false when agent is within budget", () => { service.recordUsage("agent-1", "task-1", 100_000, 50_000); @@ -166,31 +279,38 @@ describe("BudgetService", () => { }); describe("agentStarted / agentStopped", () => { - it("should track active agent count", () => { - service.agentStarted(); - service.agentStarted(); - service.agentStarted(); + it("should track active agent count by ID", () => { + service.agentStarted("agent-1"); + service.agentStarted("agent-2"); + service.agentStarted("agent-3"); const summary = service.getUsageSummary(); expect(summary.activeAgents).toBe(3); }); it("should decrement active count on stop", () => { - service.agentStarted(); - service.agentStarted(); - service.agentStopped(); + service.agentStarted("agent-1"); + service.agentStarted("agent-2"); + service.agentStopped("agent-1"); const summary = service.getUsageSummary(); expect(summary.activeAgents).toBe(1); }); - it("should not go below zero", () => { - service.agentStopped(); - service.agentStopped(); + it("should handle stopping non-existent agent gracefully", () => { + service.agentStopped("non-existent"); const summary = service.getUsageSummary(); expect(summary.activeAgents).toBe(0); }); + + it("should not double-count same agent ID on start", () => { + service.agentStarted("agent-1"); + service.agentStarted("agent-1"); + + const summary = service.getUsageSummary(); + expect(summary.activeAgents).toBe(1); + }); }); describe("getUsageSummary", () => { @@ -290,7 +410,7 @@ describe("BudgetService", () => { const budget2 = service.getBudget(); expect(budget1).toEqual(budget2); - expect(budget1).not.toBe(budget2); // Different reference + expect(budget1).not.toBe(budget2); }); }); }); diff --git a/apps/orchestrator/src/budget/budget.service.ts b/apps/orchestrator/src/budget/budget.service.ts index 4bd16e6..7c8e82e 100644 --- a/apps/orchestrator/src/budget/budget.service.ts +++ b/apps/orchestrator/src/budget/budget.service.ts @@ -15,31 +15,21 @@ import type { } from "./budget.types"; import { DEFAULT_BUDGET } from "./budget.types"; +/** Sanitize strings for safe log output */ +function sanitizeForLog(value: string): string { + return value.replace(/[\n\r\t]/g, "_").slice(0, 128); +} + @Injectable() export class BudgetService { private readonly logger = new Logger(BudgetService.name); private readonly budget: UsageBudget; - private readonly records: UsageRecord[] = []; - private readonly activeAgentCount = { value: 0 }; + private records: UsageRecord[] = []; + private readonly activeAgents = new Set(); + private lastPurgeDate = ""; constructor(private readonly configService: ConfigService) { - this.budget = { - dailyTokenLimit: - this.configService.get("orchestrator.budget.dailyTokenLimit") ?? - DEFAULT_BUDGET.dailyTokenLimit, - perAgentTokenLimit: - this.configService.get("orchestrator.budget.perAgentTokenLimit") ?? - DEFAULT_BUDGET.perAgentTokenLimit, - maxConcurrentAgents: - this.configService.get("orchestrator.budget.maxConcurrentAgents") ?? - DEFAULT_BUDGET.maxConcurrentAgents, - maxTaskDurationMinutes: - this.configService.get("orchestrator.budget.maxTaskDurationMinutes") ?? - DEFAULT_BUDGET.maxTaskDurationMinutes, - enforceHardLimits: - this.configService.get("orchestrator.budget.enforceHardLimits") ?? - DEFAULT_BUDGET.enforceHardLimits, - }; + this.budget = this.loadAndValidateConfig(); this.logger.log( `BudgetService initialized: daily=${String(this.budget.dailyTokenLimit)} tokens, ` + @@ -49,29 +39,68 @@ export class BudgetService { } /** - * Record token usage for an agent + * Record token usage for an agent. + * Negative and NaN values are clamped to 0. */ recordUsage(agentId: string, taskId: string, inputTokens: number, outputTokens: number): void { + if (!agentId || !taskId) { + this.logger.warn("recordUsage called with empty agentId or taskId — skipping"); + return; + } + + const safeInput = Math.max(0, Number.isFinite(inputTokens) ? inputTokens : 0); + const safeOutput = Math.max(0, Number.isFinite(outputTokens) ? outputTokens : 0); + + this.purgeStaleRecords(); + const record: UsageRecord = { agentId, taskId, - inputTokens, - outputTokens, - timestamp: new Date().toISOString(), + inputTokens: safeInput, + outputTokens: safeOutput, + timestamp: new Date(), }; this.records.push(record); this.logger.debug( - `Usage recorded: agent=${agentId} input=${String(inputTokens)} output=${String(outputTokens)}` + `Usage recorded: agent=${sanitizeForLog(agentId)} input=${String(safeInput)} output=${String(safeOutput)}` ); } /** - * Check if an agent can be spawned (concurrency and budget check) + * Check if an agent can be spawned (concurrency and budget check). + * When allowed, atomically increments the active agent count. + */ + trySpawnAgent(agentId: string): { allowed: boolean; reason?: string } { + if (!agentId) { + return { allowed: false, reason: "Agent ID is required" }; + } + + if (this.activeAgents.size >= this.budget.maxConcurrentAgents) { + return { + allowed: false, + reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`, + }; + } + + const dailyUsed = this.getDailyTokensUsed(); + if (this.budget.enforceHardLimits && dailyUsed >= this.budget.dailyTokenLimit) { + return { + allowed: false, + reason: `Daily token budget exceeded (${String(dailyUsed)}/${String(this.budget.dailyTokenLimit)})`, + }; + } + + this.activeAgents.add(agentId); + return { allowed: true }; + } + + /** + * Check if an agent can be spawned without reserving a slot. */ canSpawnAgent(): { allowed: boolean; reason?: string } { - if (this.activeAgentCount.value >= this.budget.maxConcurrentAgents) { + if (this.activeAgents.size >= this.budget.maxConcurrentAgents) { return { allowed: false, reason: `Maximum concurrent agents reached (${String(this.budget.maxConcurrentAgents)})`, @@ -90,10 +119,13 @@ export class BudgetService { } /** - * Check if an agent has exceeded its per-task budget + * Check if an agent has exceeded its per-task budget (today only). */ isAgentOverBudget(agentId: string): { overBudget: boolean; totalTokens: number } { - const agentRecords = this.records.filter((r) => r.agentId === agentId); + const todayStart = this.getTodayStart(); + const agentRecords = this.records.filter( + (r) => r.agentId === agentId && r.timestamp >= todayStart + ); const totalTokens = agentRecords.reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0); return { @@ -103,23 +135,24 @@ export class BudgetService { } /** - * Notify that an agent has started (increment active count) + * Notify that an agent has started (track by ID). */ - agentStarted(): void { - this.activeAgentCount.value++; + agentStarted(agentId: string): void { + this.activeAgents.add(agentId); } /** - * Notify that an agent has stopped (decrement active count) + * Notify that an agent has stopped (remove by ID). */ - agentStopped(): void { - this.activeAgentCount.value = Math.max(0, this.activeAgentCount.value - 1); + agentStopped(agentId: string): void { + this.activeAgents.delete(agentId); } /** * Get comprehensive usage summary */ getUsageSummary(): UsageSummary { + this.purgeStaleRecords(); const dailyTokensUsed = this.getDailyTokensUsed(); const dailyUsagePercent = this.budget.dailyTokenLimit > 0 ? (dailyTokensUsed / this.budget.dailyTokenLimit) * 100 : 0; @@ -129,7 +162,7 @@ export class BudgetService { dailyTokenLimit: this.budget.dailyTokenLimit, dailyUsagePercent: Math.round(dailyUsagePercent * 100) / 100, agentUsage: this.getAgentUsageSummaries(), - activeAgents: this.activeAgentCount.value, + activeAgents: this.activeAgents.size, maxConcurrentAgents: this.budget.maxConcurrentAgents, budgetStatus: this.getBudgetStatus(dailyUsagePercent), }; @@ -143,25 +176,98 @@ export class BudgetService { } /** - * Get total tokens used today + * Load configuration with validation. Clamps invalid values to defaults. */ - private getDailyTokensUsed(): number { - const todayStart = new Date(); - todayStart.setHours(0, 0, 0, 0); - const todayIso = todayStart.toISOString(); + private loadAndValidateConfig(): UsageBudget { + const raw = { + dailyTokenLimit: + this.configService.get("orchestrator.budget.dailyTokenLimit") ?? + DEFAULT_BUDGET.dailyTokenLimit, + perAgentTokenLimit: + this.configService.get("orchestrator.budget.perAgentTokenLimit") ?? + DEFAULT_BUDGET.perAgentTokenLimit, + maxConcurrentAgents: + this.configService.get("orchestrator.budget.maxConcurrentAgents") ?? + DEFAULT_BUDGET.maxConcurrentAgents, + maxTaskDurationMinutes: + this.configService.get("orchestrator.budget.maxTaskDurationMinutes") ?? + DEFAULT_BUDGET.maxTaskDurationMinutes, + enforceHardLimits: + this.configService.get("orchestrator.budget.enforceHardLimits") ?? + DEFAULT_BUDGET.enforceHardLimits, + }; - return this.records - .filter((r) => r.timestamp >= todayIso) - .reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0); + return { + dailyTokenLimit: this.clampPositive(raw.dailyTokenLimit, DEFAULT_BUDGET.dailyTokenLimit), + perAgentTokenLimit: this.clampPositive( + raw.perAgentTokenLimit, + DEFAULT_BUDGET.perAgentTokenLimit + ), + maxConcurrentAgents: this.clampPositiveInt( + raw.maxConcurrentAgents, + DEFAULT_BUDGET.maxConcurrentAgents + ), + maxTaskDurationMinutes: this.clampPositiveInt( + raw.maxTaskDurationMinutes, + DEFAULT_BUDGET.maxTaskDurationMinutes + ), + enforceHardLimits: raw.enforceHardLimits, + }; + } + + private clampPositive(value: number, fallback: number): number { + return Number.isFinite(value) && value > 0 ? value : fallback; + } + + private clampPositiveInt(value: number, fallback: number): number { + return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback; } /** - * Get per-agent usage summaries + * Purge records from previous days to prevent unbounded memory growth. + */ + private purgeStaleRecords(): void { + const todayStr = new Date().toISOString().slice(0, 10); + if (this.lastPurgeDate === todayStr) return; + + const todayStart = this.getTodayStart(); + const before = this.records.length; + this.records = this.records.filter((r) => r.timestamp >= todayStart); + + if (before > this.records.length) { + this.logger.log( + `Purged ${String(before - this.records.length)} stale usage records from previous days` + ); + } + this.lastPurgeDate = todayStr; + } + + /** + * Get total tokens used today using proper Date comparison. + */ + private getDailyTokensUsed(): number { + const todayStart = this.getTodayStart(); + + return this.records + .filter((r) => r.timestamp >= todayStart) + .reduce((sum, r) => sum + r.inputTokens + r.outputTokens, 0); + } + + private getTodayStart(): Date { + const todayStart = new Date(); + todayStart.setHours(0, 0, 0, 0); + return todayStart; + } + + /** + * Get per-agent usage summaries (today only). */ private getAgentUsageSummaries(): AgentUsageSummary[] { + const todayStart = this.getTodayStart(); + const todayRecords = this.records.filter((r) => r.timestamp >= todayStart); const agentMap = new Map(); - for (const record of this.records) { + for (const record of todayRecords) { const existing = agentMap.get(record.agentId); if (existing) { existing.input += record.inputTokens; @@ -175,7 +281,7 @@ export class BudgetService { } } - return Array.from(agentMap.entries()).map(([agentId, data]) => { + return [...agentMap.entries()].map(([agentId, data]) => { const totalTokens = data.input + data.output; const usagePercent = this.budget.perAgentTokenLimit > 0 diff --git a/apps/orchestrator/src/budget/budget.types.ts b/apps/orchestrator/src/budget/budget.types.ts index 44678f6..86a0c57 100644 --- a/apps/orchestrator/src/budget/budget.types.ts +++ b/apps/orchestrator/src/budget/budget.types.ts @@ -28,7 +28,7 @@ export interface UsageRecord { /** Number of output tokens used */ outputTokens: number; /** Timestamp of usage */ - timestamp: string; + timestamp: Date; } export interface UsageSummary {