Files
stack/apps/api/src/valkey
Jason Woltje c221b63d14
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix: Resolve CI typecheck failures and improve type safety
Fixes CI pipeline failures caused by missing Prisma Client generation and TypeScript type safety issues. Added Prisma generation step to CI pipeline, installed missing type dependencies, and resolved 40+ exactOptionalPropertyTypes violations across service layer.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-30 20:39:03 -06:00
..

Valkey Task Queue Module

This module provides Redis-compatible task queue functionality using Valkey (Redis fork) for the Mosaic Stack application.

Overview

The ValkeyModule is a global NestJS module that provides task queue operations with a simple FIFO (First-In-First-Out) queue implementation. It uses ioredis for Redis compatibility and is automatically available throughout the application.

Features

  • FIFO Queue: Tasks are processed in the order they are enqueued
  • Task Status Tracking: Monitor task lifecycle (PENDING → PROCESSING → COMPLETED/FAILED)
  • Metadata Storage: Store and retrieve task data with 24-hour TTL
  • Health Monitoring: Built-in health check for Valkey connectivity
  • Type Safety: Fully typed DTOs with validation
  • Global Module: No need to import in every module

Architecture

Components

  1. ValkeyModule (valkey.module.ts)

    • Global module that provides ValkeyService
    • Auto-registered in app.module.ts
  2. ValkeyService (valkey.service.ts)

    • Core service with queue operations
    • Lifecycle hooks for connection management
    • Methods: enqueue(), dequeue(), getStatus(), updateStatus()
  3. DTOs (dto/task.dto.ts)

    • TaskDto: Complete task representation
    • EnqueueTaskDto: Input for creating tasks
    • UpdateTaskStatusDto: Input for status updates
    • TaskStatus: Enum of task states

Configuration

Environment Variables

Add to .env:

VALKEY_URL=redis://localhost:6379

Docker Compose

Valkey service is already configured in docker-compose.yml:

valkey:
  image: valkey/valkey:8-alpine
  container_name: mosaic-valkey
  ports:
    - "6379:6379"
  volumes:
    - valkey_data:/data

Start Valkey:

docker compose up -d valkey

Usage

1. Inject the Service

import { Injectable } from '@nestjs/common';
import { ValkeyService } from './valkey/valkey.service';

@Injectable()
export class MyService {
  constructor(private readonly valkeyService: ValkeyService) {}
}

2. Enqueue a Task

const task = await this.valkeyService.enqueue({
  type: 'send-email',
  data: {
    to: 'user@example.com',
    subject: 'Welcome!',
    body: 'Hello, welcome to Mosaic Stack',
  },
});

console.log(task.id); // UUID
console.log(task.status); // 'pending'

3. Dequeue and Process

// Worker picks up next task
const task = await this.valkeyService.dequeue();

if (task) {
  console.log(task.status); // 'processing'
  
  try {
    // Do work...
    await sendEmail(task.data);
    
    // Mark as completed
    await this.valkeyService.updateStatus(task.id, {
      status: TaskStatus.COMPLETED,
      result: { sentAt: new Date().toISOString() },
    });
  } catch (error) {
    // Mark as failed
    await this.valkeyService.updateStatus(task.id, {
      status: TaskStatus.FAILED,
      error: error.message,
    });
  }
}

4. Check Task Status

const status = await this.valkeyService.getStatus(taskId);

if (status) {
  console.log(status.status); // 'completed' | 'failed' | 'processing' | 'pending'
  console.log(status.data);   // Task metadata
  console.log(status.error);  // Error message if failed
}

5. Queue Management

// Get queue length
const length = await this.valkeyService.getQueueLength();
console.log(`${length} tasks in queue`);

// Health check
const healthy = await this.valkeyService.healthCheck();
console.log(`Valkey is ${healthy ? 'healthy' : 'down'}`);

// Clear queue (use with caution!)
await this.valkeyService.clearQueue();

Task Lifecycle

PENDING → PROCESSING → COMPLETED
                    ↘ FAILED
  1. PENDING: Task is enqueued and waiting to be processed
  2. PROCESSING: Task has been dequeued and is being worked on
  3. COMPLETED: Task finished successfully
  4. FAILED: Task encountered an error

Data Storage

  • Queue: Redis list at key mosaic:task:queue
  • Task Metadata: Redis strings at mosaic:task:{taskId}
  • TTL: Tasks expire after 24 hours (configurable via TASK_TTL)

Examples

Background Job Processing

@Injectable()
export class EmailWorker {
  constructor(private readonly valkeyService: ValkeyService) {
    this.startWorker();
  }

  private async startWorker() {
    while (true) {
      const task = await this.valkeyService.dequeue();
      
      if (task) {
        await this.processTask(task);
      } else {
        // No tasks, wait 5 seconds
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }

  private async processTask(task: TaskDto) {
    try {
      switch (task.type) {
        case 'send-email':
          await this.sendEmail(task.data);
          break;
        case 'generate-report':
          await this.generateReport(task.data);
          break;
      }
      
      await this.valkeyService.updateStatus(task.id, {
        status: TaskStatus.COMPLETED,
      });
    } catch (error) {
      await this.valkeyService.updateStatus(task.id, {
        status: TaskStatus.FAILED,
        error: error.message,
      });
    }
  }
}

Scheduled Tasks with Cron

@Injectable()
export class ScheduledTasks {
  constructor(private readonly valkeyService: ValkeyService) {}

  @Cron('0 0 * * *') // Daily at midnight
  async dailyReport() {
    await this.valkeyService.enqueue({
      type: 'daily-report',
      data: { date: new Date().toISOString() },
    });
  }
}

Testing

The module includes comprehensive tests with an in-memory Redis mock:

pnpm test valkey.service.spec.ts

Tests cover:

  • Connection and initialization
  • Enqueue operations
  • Dequeue FIFO behavior
  • Status tracking and updates
  • Queue management
  • Complete task lifecycle
  • Concurrent task handling

API Reference

ValkeyService Methods

enqueue(task: EnqueueTaskDto): Promise<TaskDto>

Add a task to the queue.

Parameters:

  • task.type (string): Task type identifier
  • task.data (object): Task metadata

Returns: Created task with ID and status


dequeue(): Promise<TaskDto | null>

Get the next task from the queue (FIFO).

Returns: Next task with status updated to PROCESSING, or null if queue is empty


getStatus(taskId: string): Promise<TaskDto | null>

Retrieve task status and metadata.

Parameters:

  • taskId (string): Task UUID

Returns: Task data or null if not found


updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise<TaskDto | null>

Update task status and optionally add results or errors.

Parameters:

  • taskId (string): Task UUID
  • update.status (TaskStatus): New status
  • update.error (string, optional): Error message for failed tasks
  • update.result (object, optional): Result data to merge

Returns: Updated task or null if not found


getQueueLength(): Promise<number>

Get the number of tasks in queue.

Returns: Queue length


clearQueue(): Promise<void>

Remove all tasks from queue (metadata remains until TTL).


healthCheck(): Promise<boolean>

Verify Valkey connectivity.

Returns: true if connected, false otherwise

Migration Notes

If upgrading from BullMQ or another queue system:

  1. Task IDs are UUIDs (not incremental)
  2. No built-in retry mechanism (implement in worker)
  3. No job priorities (strict FIFO)
  4. Tasks expire after 24 hours

For advanced features like retries, priorities, or scheduled jobs, consider wrapping this service or using BullMQ alongside it.

Troubleshooting

Connection Issues

// Check Valkey connectivity
const healthy = await this.valkeyService.healthCheck();
if (!healthy) {
  console.error('Valkey is not responding');
}

Queue Stuck

# Check queue length
docker exec -it mosaic-valkey valkey-cli LLEN mosaic:task:queue

# Inspect tasks
docker exec -it mosaic-valkey valkey-cli KEYS "mosaic:task:*"

# Clear stuck queue
docker exec -it mosaic-valkey valkey-cli DEL mosaic:task:queue

Debug Logging

The service logs all operations at info level. Check application logs for:

  • Task enqueue/dequeue operations
  • Status updates
  • Connection events

Future Enhancements

Potential improvements for consideration:

  • Task priorities (weighted queues)
  • Retry mechanism with exponential backoff
  • Delayed/scheduled tasks
  • Task progress tracking
  • Queue metrics and monitoring
  • Multi-queue support
  • Dead letter queue for failed tasks

License

Part of the Mosaic Stack project.