Add removeAllListeners() call before quit() to prevent memory leaks from lingering event listeners on the Redis client. Also update test mock to include removeAllListeners method. Refs #339 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Valkey Task Queue Module
This module provides Redis-compatible task queue functionality using Valkey (Redis fork) for the Mosaic Stack application.
Overview
The ValkeyModule is a global NestJS module that provides task queue operations with a simple FIFO (First-In-First-Out) queue implementation. It uses ioredis for Redis compatibility and is automatically available throughout the application.
Features
- ✅ FIFO Queue: Tasks are processed in the order they are enqueued
- ✅ Task Status Tracking: Monitor task lifecycle (PENDING → PROCESSING → COMPLETED/FAILED)
- ✅ Metadata Storage: Store and retrieve task data with 24-hour TTL
- ✅ Health Monitoring: Built-in health check for Valkey connectivity
- ✅ Type Safety: Fully typed DTOs with validation
- ✅ Global Module: No need to import in every module
Architecture
Components
-
ValkeyModule (
valkey.module.ts)- Global module that provides
ValkeyService - Auto-registered in
app.module.ts
- Global module that provides
-
ValkeyService (
valkey.service.ts)- Core service with queue operations
- Lifecycle hooks for connection management
- Methods:
enqueue(),dequeue(),getStatus(),updateStatus()
-
DTOs (
dto/task.dto.ts)TaskDto: Complete task representationEnqueueTaskDto: Input for creating tasksUpdateTaskStatusDto: Input for status updatesTaskStatus: Enum of task states
Configuration
Environment Variables
Add to .env:
VALKEY_URL=redis://localhost:6379
Docker Compose
Valkey service is already configured in docker-compose.yml:
valkey:
image: valkey/valkey:8-alpine
container_name: mosaic-valkey
ports:
- "6379:6379"
volumes:
- valkey_data:/data
Start Valkey:
docker compose up -d valkey
Usage
1. Inject the Service
import { Injectable } from "@nestjs/common";
import { ValkeyService } from "./valkey/valkey.service";
@Injectable()
export class MyService {
constructor(private readonly valkeyService: ValkeyService) {}
}
2. Enqueue a Task
const task = await this.valkeyService.enqueue({
type: "send-email",
data: {
to: "user@example.com",
subject: "Welcome!",
body: "Hello, welcome to Mosaic Stack",
},
});
console.log(task.id); // UUID
console.log(task.status); // 'pending'
3. Dequeue and Process
// Worker picks up next task
const task = await this.valkeyService.dequeue();
if (task) {
console.log(task.status); // 'processing'
try {
// Do work...
await sendEmail(task.data);
// Mark as completed
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
result: { sentAt: new Date().toISOString() },
});
} catch (error) {
// Mark as failed
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.FAILED,
error: error.message,
});
}
}
4. Check Task Status
const status = await this.valkeyService.getStatus(taskId);
if (status) {
console.log(status.status); // 'completed' | 'failed' | 'processing' | 'pending'
console.log(status.data); // Task metadata
console.log(status.error); // Error message if failed
}
5. Queue Management
// Get queue length
const length = await this.valkeyService.getQueueLength();
console.log(`${length} tasks in queue`);
// Health check
const healthy = await this.valkeyService.healthCheck();
console.log(`Valkey is ${healthy ? "healthy" : "down"}`);
// Clear queue (use with caution!)
await this.valkeyService.clearQueue();
Task Lifecycle
PENDING → PROCESSING → COMPLETED
↘ FAILED
- PENDING: Task is enqueued and waiting to be processed
- PROCESSING: Task has been dequeued and is being worked on
- COMPLETED: Task finished successfully
- FAILED: Task encountered an error
Data Storage
- Queue: Redis list at key
mosaic:task:queue - Task Metadata: Redis strings at
mosaic:task:{taskId} - TTL: Tasks expire after 24 hours (configurable via
TASK_TTL)
Examples
Background Job Processing
@Injectable()
export class EmailWorker {
constructor(private readonly valkeyService: ValkeyService) {
this.startWorker();
}
private async startWorker() {
while (true) {
const task = await this.valkeyService.dequeue();
if (task) {
await this.processTask(task);
} else {
// No tasks, wait 5 seconds
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}
private async processTask(task: TaskDto) {
try {
switch (task.type) {
case "send-email":
await this.sendEmail(task.data);
break;
case "generate-report":
await this.generateReport(task.data);
break;
}
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.COMPLETED,
});
} catch (error) {
await this.valkeyService.updateStatus(task.id, {
status: TaskStatus.FAILED,
error: error.message,
});
}
}
}
Scheduled Tasks with Cron
@Injectable()
export class ScheduledTasks {
constructor(private readonly valkeyService: ValkeyService) {}
@Cron("0 0 * * *") // Daily at midnight
async dailyReport() {
await this.valkeyService.enqueue({
type: "daily-report",
data: { date: new Date().toISOString() },
});
}
}
Testing
The module includes comprehensive tests with an in-memory Redis mock:
pnpm test valkey.service.spec.ts
Tests cover:
- ✅ Connection and initialization
- ✅ Enqueue operations
- ✅ Dequeue FIFO behavior
- ✅ Status tracking and updates
- ✅ Queue management
- ✅ Complete task lifecycle
- ✅ Concurrent task handling
API Reference
ValkeyService Methods
enqueue(task: EnqueueTaskDto): Promise<TaskDto>
Add a task to the queue.
Parameters:
task.type(string): Task type identifiertask.data(object): Task metadata
Returns: Created task with ID and status
dequeue(): Promise<TaskDto | null>
Get the next task from the queue (FIFO).
Returns: Next task with status updated to PROCESSING, or null if queue is empty
getStatus(taskId: string): Promise<TaskDto | null>
Retrieve task status and metadata.
Parameters:
taskId(string): Task UUID
Returns: Task data or null if not found
updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise<TaskDto | null>
Update task status and optionally add results or errors.
Parameters:
taskId(string): Task UUIDupdate.status(TaskStatus): New statusupdate.error(string, optional): Error message for failed tasksupdate.result(object, optional): Result data to merge
Returns: Updated task or null if not found
getQueueLength(): Promise<number>
Get the number of tasks in queue.
Returns: Queue length
clearQueue(): Promise<void>
Remove all tasks from queue (metadata remains until TTL).
healthCheck(): Promise<boolean>
Verify Valkey connectivity.
Returns: true if connected, false otherwise
Migration Notes
If upgrading from BullMQ or another queue system:
- Task IDs are UUIDs (not incremental)
- No built-in retry mechanism (implement in worker)
- No job priorities (strict FIFO)
- Tasks expire after 24 hours
For advanced features like retries, priorities, or scheduled jobs, consider wrapping this service or using BullMQ alongside it.
Troubleshooting
Connection Issues
// Check Valkey connectivity
const healthy = await this.valkeyService.healthCheck();
if (!healthy) {
console.error("Valkey is not responding");
}
Queue Stuck
# Check queue length
docker exec -it mosaic-valkey valkey-cli LLEN mosaic:task:queue
# Inspect tasks
docker exec -it mosaic-valkey valkey-cli KEYS "mosaic:task:*"
# Clear stuck queue
docker exec -it mosaic-valkey valkey-cli DEL mosaic:task:queue
Debug Logging
The service logs all operations at info level. Check application logs for:
- Task enqueue/dequeue operations
- Status updates
- Connection events
Future Enhancements
Potential improvements for consideration:
- Task priorities (weighted queues)
- Retry mechanism with exponential backoff
- Delayed/scheduled tasks
- Task progress tracking
- Queue metrics and monitoring
- Multi-queue support
- Dead letter queue for failed tasks
License
Part of the Mosaic Stack project.