# Queue Module BullMQ-based task queue with priority ordering and retry logic. ## Overview The Queue module provides a robust task queuing system for the orchestrator service using BullMQ and Valkey (Redis-compatible). It supports priority-based task ordering, exponential backoff retry logic, and real-time queue monitoring. ## Features - **Priority-based ordering** (1-10): Higher priority tasks processed first - **Retry logic**: Exponential backoff on failures - **Queue monitoring**: Real-time statistics (pending, active, completed, failed) - **Queue control**: Pause/resume processing - **Event pub/sub**: Task lifecycle events published to Valkey - **Task removal**: Remove tasks from queue ## Usage ### Adding Tasks ```typescript import { QueueService } from "./queue/queue.service"; @Injectable() export class MyService { constructor(private readonly queueService: QueueService) {} async createTask() { const context = { repository: "my-org/my-repo", branch: "main", workItems: ["task-1", "task-2"], }; // Add task with default options (priority 5, maxRetries 3) await this.queueService.addTask("task-123", context); // Add high-priority task with custom retries await this.queueService.addTask("urgent-task", context, { priority: 10, // Highest priority maxRetries: 5, }); // Add delayed task (5 second delay) await this.queueService.addTask("delayed-task", context, { delay: 5000, }); } } ``` ### Monitoring Queue ```typescript async function monitorQueue() { const stats = await this.queueService.getStats(); console.log(stats); // { // pending: 5, // active: 2, // completed: 10, // failed: 1, // delayed: 0 // } } ``` ### Queue Control ```typescript // Pause queue processing await this.queueService.pause(); // Resume queue processing await this.queueService.resume(); // Remove task from queue await this.queueService.removeTask("task-123"); ``` ## Configuration Configure via environment variables: ```bash # Valkey connection ORCHESTRATOR_VALKEY_HOST=localhost ORCHESTRATOR_VALKEY_PORT=6379 ORCHESTRATOR_VALKEY_PASSWORD=secret # Queue configuration ORCHESTRATOR_QUEUE_NAME=orchestrator-tasks ORCHESTRATOR_QUEUE_MAX_RETRIES=3 ORCHESTRATOR_QUEUE_BASE_DELAY=1000 # 1 second ORCHESTRATOR_QUEUE_MAX_DELAY=60000 # 1 minute ORCHESTRATOR_QUEUE_CONCURRENCY=5 # 5 concurrent workers ``` ## Priority Priority range: 1-10 - **10**: Highest priority (processed first) - **5**: Default priority - **1**: Lowest priority (processed last) Internally, priorities are inverted for BullMQ (which uses lower numbers for higher priority). ## Retry Logic Failed tasks are automatically retried with exponential backoff: - **Attempt 1**: Wait 2 seconds (baseDelay \* 2^1) - **Attempt 2**: Wait 4 seconds (baseDelay \* 2^2) - **Attempt 3**: Wait 8 seconds (baseDelay \* 2^3) - **Attempt 4+**: Capped at maxDelay (default 60 seconds) Configure retry behavior: - `maxRetries`: Number of retry attempts (default: 3) - `baseDelay`: Base delay in milliseconds (default: 1000) - `maxDelay`: Maximum delay cap (default: 60000) ## Events The queue publishes events to Valkey pub/sub: - `task.queued`: Task added to queue - `task.processing`: Task started processing - `task.retry`: Task retrying after failure - `task.completed`: Task completed successfully - `task.failed`: Task failed permanently Subscribe to events: ```typescript await valkeyService.subscribeToEvents((event) => { if (event.type === "task.completed") { console.log("Task completed:", event.data.taskId); } }); ``` ## Architecture ``` ┌─────────────┐ │ QueueService│ └──────┬──────┘ │ ├──────────> BullMQ Queue (adds tasks) │ ├──────────> BullMQ Worker (processes tasks) │ └──────────> ValkeyService (state + events) ``` ### Components 1. **QueueService**: Main service for queue operations 2. **BullMQ Queue**: Task queue with priority and retry 3. **BullMQ Worker**: Processes tasks from queue 4. **ValkeyService**: State management and pub/sub ## Types ### QueuedTask ```typescript interface QueuedTask { taskId: string; priority: number; // 1-10 retries: number; maxRetries: number; context: TaskContext; } ``` ### AddTaskOptions ```typescript interface AddTaskOptions { priority?: number; // 1-10, default 5 maxRetries?: number; // default 3 delay?: number; // delay in milliseconds } ``` ### QueueStats ```typescript interface QueueStats { pending: number; active: number; completed: number; failed: number; delayed: number; } ``` ## Error Handling Validation errors: - `Priority must be between 1 and 10`: Invalid priority value - `maxRetries must be non-negative`: Negative retry count Task processing errors: - Automatically retried up to `maxRetries` - Published as `task.failed` event after final failure - Error details stored in Valkey state ## Testing ### Unit Tests ```bash pnpm test queue.service.spec.ts ``` Tests pure functions (calculateBackoffDelay, configuration). ### Integration Tests Integration tests require a running Valkey instance: ```bash # Start Valkey docker run -p 6379:6379 valkey/valkey:latest # Run integration tests pnpm test queue.integration.spec.ts ``` ## Dependencies - `bullmq`: Task queue - `ioredis`: Redis/Valkey client (via ValkeyService) - `@nestjs/common`: NestJS dependency injection - `@nestjs/config`: Configuration management ## Related - `ValkeyModule`: State management and pub/sub - `ORCH-107`: Valkey client implementation - `ORCH-109`: Agent lifecycle management (uses queue)