# Valkey Task Queue Module This module provides Redis-compatible task queue functionality using Valkey (Redis fork) for the Mosaic Stack application. ## Overview The `ValkeyModule` is a global NestJS module that provides task queue operations with a simple FIFO (First-In-First-Out) queue implementation. It uses ioredis for Redis compatibility and is automatically available throughout the application. ## Features - ✅ **FIFO Queue**: Tasks are processed in the order they are enqueued - ✅ **Task Status Tracking**: Monitor task lifecycle (PENDING → PROCESSING → COMPLETED/FAILED) - ✅ **Metadata Storage**: Store and retrieve task data with 24-hour TTL - ✅ **Health Monitoring**: Built-in health check for Valkey connectivity - ✅ **Type Safety**: Fully typed DTOs with validation - ✅ **Global Module**: No need to import in every module ## Architecture ### Components 1. **ValkeyModule** (`valkey.module.ts`) - Global module that provides `ValkeyService` - Auto-registered in `app.module.ts` 2. **ValkeyService** (`valkey.service.ts`) - Core service with queue operations - Lifecycle hooks for connection management - Methods: `enqueue()`, `dequeue()`, `getStatus()`, `updateStatus()` 3. **DTOs** (`dto/task.dto.ts`) - `TaskDto`: Complete task representation - `EnqueueTaskDto`: Input for creating tasks - `UpdateTaskStatusDto`: Input for status updates - `TaskStatus`: Enum of task states ## Configuration ### Environment Variables Add to `.env`: ```bash VALKEY_URL=redis://localhost:6379 ``` ### Docker Compose Valkey service is already configured in `docker-compose.yml`: ```yaml valkey: image: valkey/valkey:8-alpine container_name: mosaic-valkey ports: - "6379:6379" volumes: - valkey_data:/data ``` Start Valkey: ```bash docker compose up -d valkey ``` ## Usage ### 1. Inject the Service ```typescript import { Injectable } from "@nestjs/common"; import { ValkeyService } from "./valkey/valkey.service"; @Injectable() export class MyService { constructor(private readonly valkeyService: ValkeyService) {} } ``` ### 2. Enqueue a Task ```typescript const task = await this.valkeyService.enqueue({ type: "send-email", data: { to: "user@example.com", subject: "Welcome!", body: "Hello, welcome to Mosaic Stack", }, }); console.log(task.id); // UUID console.log(task.status); // 'pending' ``` ### 3. Dequeue and Process ```typescript // Worker picks up next task const task = await this.valkeyService.dequeue(); if (task) { console.log(task.status); // 'processing' try { // Do work... await sendEmail(task.data); // Mark as completed await this.valkeyService.updateStatus(task.id, { status: TaskStatus.COMPLETED, result: { sentAt: new Date().toISOString() }, }); } catch (error) { // Mark as failed await this.valkeyService.updateStatus(task.id, { status: TaskStatus.FAILED, error: error.message, }); } } ``` ### 4. Check Task Status ```typescript const status = await this.valkeyService.getStatus(taskId); if (status) { console.log(status.status); // 'completed' | 'failed' | 'processing' | 'pending' console.log(status.data); // Task metadata console.log(status.error); // Error message if failed } ``` ### 5. Queue Management ```typescript // Get queue length const length = await this.valkeyService.getQueueLength(); console.log(`${length} tasks in queue`); // Health check const healthy = await this.valkeyService.healthCheck(); console.log(`Valkey is ${healthy ? "healthy" : "down"}`); // Clear queue (use with caution!) await this.valkeyService.clearQueue(); ``` ## Task Lifecycle ``` PENDING → PROCESSING → COMPLETED ↘ FAILED ``` 1. **PENDING**: Task is enqueued and waiting to be processed 2. **PROCESSING**: Task has been dequeued and is being worked on 3. **COMPLETED**: Task finished successfully 4. **FAILED**: Task encountered an error ## Data Storage - **Queue**: Redis list at key `mosaic:task:queue` - **Task Metadata**: Redis strings at `mosaic:task:{taskId}` - **TTL**: Tasks expire after 24 hours (configurable via `TASK_TTL`) ## Examples ### Background Job Processing ```typescript @Injectable() export class EmailWorker { constructor(private readonly valkeyService: ValkeyService) { this.startWorker(); } private async startWorker() { while (true) { const task = await this.valkeyService.dequeue(); if (task) { await this.processTask(task); } else { // No tasks, wait 5 seconds await new Promise((resolve) => setTimeout(resolve, 5000)); } } } private async processTask(task: TaskDto) { try { switch (task.type) { case "send-email": await this.sendEmail(task.data); break; case "generate-report": await this.generateReport(task.data); break; } await this.valkeyService.updateStatus(task.id, { status: TaskStatus.COMPLETED, }); } catch (error) { await this.valkeyService.updateStatus(task.id, { status: TaskStatus.FAILED, error: error.message, }); } } } ``` ### Scheduled Tasks with Cron ```typescript @Injectable() export class ScheduledTasks { constructor(private readonly valkeyService: ValkeyService) {} @Cron("0 0 * * *") // Daily at midnight async dailyReport() { await this.valkeyService.enqueue({ type: "daily-report", data: { date: new Date().toISOString() }, }); } } ``` ## Testing The module includes comprehensive tests with an in-memory Redis mock: ```bash pnpm test valkey.service.spec.ts ``` Tests cover: - ✅ Connection and initialization - ✅ Enqueue operations - ✅ Dequeue FIFO behavior - ✅ Status tracking and updates - ✅ Queue management - ✅ Complete task lifecycle - ✅ Concurrent task handling ## API Reference ### ValkeyService Methods #### `enqueue(task: EnqueueTaskDto): Promise` Add a task to the queue. **Parameters:** - `task.type` (string): Task type identifier - `task.data` (object): Task metadata **Returns:** Created task with ID and status --- #### `dequeue(): Promise` Get the next task from the queue (FIFO). **Returns:** Next task with status updated to PROCESSING, or null if queue is empty --- #### `getStatus(taskId: string): Promise` Retrieve task status and metadata. **Parameters:** - `taskId` (string): Task UUID **Returns:** Task data or null if not found --- #### `updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise` Update task status and optionally add results or errors. **Parameters:** - `taskId` (string): Task UUID - `update.status` (TaskStatus): New status - `update.error` (string, optional): Error message for failed tasks - `update.result` (object, optional): Result data to merge **Returns:** Updated task or null if not found --- #### `getQueueLength(): Promise` Get the number of tasks in queue. **Returns:** Queue length --- #### `clearQueue(): Promise` Remove all tasks from queue (metadata remains until TTL). --- #### `healthCheck(): Promise` Verify Valkey connectivity. **Returns:** true if connected, false otherwise ## Migration Notes If upgrading from BullMQ or another queue system: 1. Task IDs are UUIDs (not incremental) 2. No built-in retry mechanism (implement in worker) 3. No job priorities (strict FIFO) 4. Tasks expire after 24 hours For advanced features like retries, priorities, or scheduled jobs, consider wrapping this service or using BullMQ alongside it. ## Troubleshooting ### Connection Issues ```typescript // Check Valkey connectivity const healthy = await this.valkeyService.healthCheck(); if (!healthy) { console.error("Valkey is not responding"); } ``` ### Queue Stuck ```bash # Check queue length docker exec -it mosaic-valkey valkey-cli LLEN mosaic:task:queue # Inspect tasks docker exec -it mosaic-valkey valkey-cli KEYS "mosaic:task:*" # Clear stuck queue docker exec -it mosaic-valkey valkey-cli DEL mosaic:task:queue ``` ### Debug Logging The service logs all operations at `info` level. Check application logs for: - Task enqueue/dequeue operations - Status updates - Connection events ## Future Enhancements Potential improvements for consideration: - [ ] Task priorities (weighted queues) - [ ] Retry mechanism with exponential backoff - [ ] Delayed/scheduled tasks - [ ] Task progress tracking - [ ] Queue metrics and monitoring - [ ] Multi-queue support - [ ] Dead letter queue for failed tasks ## License Part of the Mosaic Stack project.