import { Injectable, OnModuleDestroy, OnModuleInit, Optional, Logger } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { Queue, Worker, Job } from "bullmq"; import { ValkeyService } from "../valkey/valkey.service"; import { AgentSpawnerService } from "../spawner/agent-spawner.service"; import { AgentLifecycleService } from "../spawner/agent-lifecycle.service"; import type { TaskContext } from "../valkey/types"; import type { QueuedTask, QueueStats, AddTaskOptions, RetryConfig, TaskProcessingResult, } from "./types"; /** * Queue service for managing task queue with priority and retry logic */ @Injectable() export class QueueService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(QueueService.name); private queue!: Queue; private worker!: Worker; private readonly queueName: string; private readonly retryConfig: RetryConfig; constructor( private readonly valkeyService: ValkeyService, private readonly configService: ConfigService, @Optional() private readonly spawnerService?: AgentSpawnerService, @Optional() private readonly lifecycleService?: AgentLifecycleService ) { this.queueName = this.configService.get( "orchestrator.queue.name", "orchestrator-tasks" ); this.retryConfig = { maxRetries: this.configService.get("orchestrator.queue.maxRetries", 3), baseDelay: this.configService.get("orchestrator.queue.baseDelay", 1000), maxDelay: this.configService.get("orchestrator.queue.maxDelay", 60000), }; } onModuleInit(): void { // Initialize BullMQ with Valkey connection const connection = { host: this.configService.get("orchestrator.valkey.host", "localhost"), port: this.configService.get("orchestrator.valkey.port", 6379), password: this.configService.get("orchestrator.valkey.password"), }; // Read retention config const completedRetentionAge = this.configService.get( "orchestrator.queue.completedRetentionAgeSeconds", 3600 ); const completedRetentionCount = this.configService.get( "orchestrator.queue.completedRetentionCount", 100 ); const failedRetentionAge = this.configService.get( "orchestrator.queue.failedRetentionAgeSeconds", 86400 ); const failedRetentionCount = this.configService.get( "orchestrator.queue.failedRetentionCount", 1000 ); // Create queue this.queue = new Queue(this.queueName, { connection, defaultJobOptions: { removeOnComplete: { age: completedRetentionAge, count: completedRetentionCount, }, removeOnFail: { age: failedRetentionAge, count: failedRetentionCount, }, }, }); // Create worker this.worker = new Worker( this.queueName, async (job: Job) => { return this.processTask(job); }, { connection, concurrency: this.configService.get("orchestrator.queue.concurrency", 5), } ); // Setup error handlers this.worker.on("failed", (job, err) => { if (job) { void this.handleTaskFailure(job.data.taskId, err); } }); this.worker.on("completed", (job) => { void this.handleTaskCompletion(job.data.taskId); }); } async onModuleDestroy(): Promise { await this.worker.close(); await this.queue.close(); } /** * Add task to queue */ async addTask(taskId: string, context: TaskContext, options?: AddTaskOptions): Promise { // Validate options const priority = options?.priority ?? 5; const maxRetries = options?.maxRetries ?? this.retryConfig.maxRetries; const delay = options?.delay ?? 0; if (priority < 1 || priority > 10) { throw new Error("Priority must be between 1 and 10"); } if (maxRetries < 0) { throw new Error("maxRetries must be non-negative"); } const queuedTask: QueuedTask = { taskId, priority, retries: 0, maxRetries, context, }; // Ensure task state exists before queue lifecycle updates. const getTaskState = (this.valkeyService as Partial).getTaskState; const createTask = (this.valkeyService as Partial).createTask; if (typeof getTaskState === "function" && typeof createTask === "function") { const existingTask = await getTaskState.call(this.valkeyService, taskId); if (!existingTask) { await createTask.call(this.valkeyService, taskId, context); } } // Add to BullMQ queue await this.queue.add(taskId, queuedTask, { priority: 10 - priority + 1, // BullMQ: lower number = higher priority, so invert attempts: maxRetries + 1, // +1 for initial attempt backoff: { type: "custom", }, delay, }); // Update task state in Valkey await this.valkeyService.updateTaskStatus(taskId, "pending"); // Publish event await this.valkeyService.publishEvent({ type: "task.queued", timestamp: new Date().toISOString(), taskId, data: { priority }, }); } /** * Get queue statistics */ async getStats(): Promise { const counts = await this.queue.getJobCounts( "waiting", "active", "completed", "failed", "delayed" ); return { pending: counts.waiting || 0, active: counts.active || 0, completed: counts.completed || 0, failed: counts.failed || 0, delayed: counts.delayed || 0, }; } /** * Calculate exponential backoff delay */ calculateBackoffDelay(attemptNumber: number, baseDelay: number, maxDelay: number): number { const delay = baseDelay * Math.pow(2, attemptNumber); return Math.min(delay, maxDelay); } /** * Pause queue processing */ async pause(): Promise { await this.queue.pause(); } /** * Resume queue processing */ async resume(): Promise { await this.queue.resume(); } /** * Remove task from queue */ async removeTask(taskId: string): Promise { const job = await this.queue.getJob(taskId); if (job) { await job.remove(); } } /** * Process task (called by worker) */ private async processTask(job: Job): Promise { const { taskId } = job.data; try { const session = this.spawnerService?.findAgentSessionByTaskId(taskId); const agentId = session?.agentId; if (agentId) { if (this.lifecycleService) { await this.lifecycleService.transitionToRunning(agentId); } this.spawnerService?.setSessionState(agentId, "running"); } // Update task state to executing await this.valkeyService.updateTaskStatus(taskId, "executing", agentId); // Publish event await this.valkeyService.publishEvent({ type: "task.executing", timestamp: new Date().toISOString(), taskId, agentId, data: { attempt: job.attemptsMade + 1, dispatchedByQueue: true, }, }); return { success: true, metadata: { attempt: job.attemptsMade + 1, ...(agentId && { agentId }), }, }; } catch (error) { // Handle retry logic const shouldRetry = job.attemptsMade < job.data.maxRetries; if (shouldRetry) { // Calculate backoff delay for next retry const delay = this.calculateBackoffDelay( job.attemptsMade + 1, this.retryConfig.baseDelay, this.retryConfig.maxDelay ); // BullMQ will automatically retry with the backoff await job.updateData({ ...job.data, retries: job.attemptsMade + 1, }); await this.valkeyService.publishEvent({ type: "task.retry", timestamp: new Date().toISOString(), taskId, data: { attempt: job.attemptsMade + 1, nextDelay: delay, }, }); } throw error; } } /** * Handle task failure */ private async handleTaskFailure(taskId: string, error: Error): Promise { const session = this.spawnerService?.findAgentSessionByTaskId(taskId); if (session) { this.spawnerService?.setSessionState(session.agentId, "failed", error.message, new Date()); if (this.lifecycleService) { await this.lifecycleService.transitionToFailed(session.agentId, error.message); } } await this.valkeyService.updateTaskStatus(taskId, "failed", undefined, error.message); await this.valkeyService.publishEvent({ type: "task.failed", timestamp: new Date().toISOString(), taskId, error: error.message, }); } /** * Handle task completion */ private async handleTaskCompletion(taskId: string): Promise { const session = this.spawnerService?.findAgentSessionByTaskId(taskId); if (session) { this.spawnerService?.setSessionState(session.agentId, "completed", undefined, new Date()); if (this.lifecycleService) { await this.lifecycleService.transitionToCompleted(session.agentId); } } else { this.logger.warn( `Queue completed task ${taskId} but no session was found; using queue-only completion state` ); } await this.valkeyService.updateTaskStatus(taskId, "completed"); await this.valkeyService.publishEvent({ type: "task.completed", timestamp: new Date().toISOString(), taskId, ...(session && { agentId: session.agentId }), }); } }