import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common"; import Redis from "ioredis"; import { TaskDto, TaskStatus, EnqueueTaskDto, UpdateTaskStatusDto } from "./dto/task.dto"; import { randomUUID } from "crypto"; /** * ValkeyService - Task queue service using Valkey (Redis-compatible) * * Provides task queue operations: * - enqueue(task): Add task to queue * - dequeue(): Get next task from queue * - getStatus(taskId): Get task status and metadata * - updateStatus(taskId, status): Update task status */ @Injectable() export class ValkeyService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(ValkeyService.name); private client!: Redis; private readonly QUEUE_KEY = "mosaic:task:queue"; private readonly TASK_PREFIX = "mosaic:task:"; private readonly TASK_TTL = 86400; // 24 hours in seconds async onModuleInit() { const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379"; this.logger.log(`Connecting to Valkey at ${valkeyUrl}`); this.client = new Redis(valkeyUrl, { maxRetriesPerRequest: 3, retryStrategy: (times: number) => { const delay = Math.min(times * 50, 2000); this.logger.warn( `Valkey connection retry attempt ${times.toString()}, waiting ${delay.toString()}ms` ); return delay; }, reconnectOnError: (err: Error) => { this.logger.error("Valkey connection error:", err.message); return true; }, }); this.client.on("connect", () => { this.logger.log("Valkey connected successfully"); }); this.client.on("error", (err: Error) => { this.logger.error("Valkey client error:", err.message); }); this.client.on("close", () => { this.logger.warn("Valkey connection closed"); }); // Wait for connection try { await this.client.ping(); this.logger.log("Valkey health check passed"); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error("Valkey health check failed:", errorMessage); throw error; } } async onModuleDestroy(): Promise { this.logger.log("Disconnecting from Valkey"); // Remove all event listeners to prevent memory leaks this.client.removeAllListeners(); await this.client.quit(); } /** * Add a task to the queue * @param task - Task to enqueue * @returns The created task with ID and metadata */ async enqueue(task: EnqueueTaskDto): Promise { const taskId = randomUUID(); const now = new Date(); const taskData: TaskDto = { id: taskId, type: task.type, data: task.data, status: TaskStatus.PENDING, createdAt: now, updatedAt: now, }; // Store task metadata const taskKey = this.getTaskKey(taskId); await this.client.setex(taskKey, this.TASK_TTL, JSON.stringify(taskData)); // Add to queue (RPUSH = add to tail, LPOP = remove from head => FIFO) await this.client.rpush(this.QUEUE_KEY, taskId); this.logger.log(`Task enqueued: ${taskId} (type: ${task.type})`); return taskData; } /** * Get the next task from the queue * @returns The next task or null if queue is empty */ async dequeue(): Promise { // LPOP = remove from head (FIFO) const taskId = await this.client.lpop(this.QUEUE_KEY); if (!taskId) { return null; } const task = await this.getStatus(taskId); if (!task) { this.logger.warn(`Task ${taskId} not found in metadata store`); return null; } // Update status to processing and return the updated task const updatedTask = await this.updateStatus(taskId, { status: TaskStatus.PROCESSING, }); this.logger.log(`Task dequeued: ${taskId} (type: ${task.type})`); return updatedTask; } /** * Get task status and metadata * @param taskId - Task ID * @returns Task data or null if not found */ async getStatus(taskId: string): Promise { const taskKey = this.getTaskKey(taskId); const taskData = await this.client.get(taskKey); if (!taskData) { return null; } try { return JSON.parse(taskData) as TaskDto; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error(`Failed to parse task data for ${taskId}:`, errorMessage); return null; } } /** * Update task status and metadata * @param taskId - Task ID * @param update - Status update data * @returns Updated task or null if not found */ async updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise { const task = await this.getStatus(taskId); if (!task) { this.logger.warn(`Cannot update status for non-existent task: ${taskId}`); return null; } const now = new Date(); const updatedTask: TaskDto = { ...task, status: update.status, updatedAt: now, }; if (update.error) { updatedTask.error = update.error; } if (update.status === TaskStatus.COMPLETED || update.status === TaskStatus.FAILED) { updatedTask.completedAt = now; } if (update.result) { updatedTask.data = { ...task.data, ...update.result }; } const taskKey = this.getTaskKey(taskId); await this.client.setex(taskKey, this.TASK_TTL, JSON.stringify(updatedTask)); this.logger.log(`Task status updated: ${taskId} => ${update.status}`); return updatedTask; } /** * Get queue length * @returns Number of tasks in queue */ async getQueueLength(): Promise { return await this.client.llen(this.QUEUE_KEY); } /** * Clear all tasks from queue (use with caution!) */ async clearQueue(): Promise { await this.client.del(this.QUEUE_KEY); this.logger.warn("Queue cleared"); } /** * Get task key for Redis storage */ private getTaskKey(taskId: string): string { return `${this.TASK_PREFIX}${taskId}`; } /** * Health check - ping Valkey */ async healthCheck(): Promise { try { const result = await this.client.ping(); // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition return result === "PONG"; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error("Valkey health check failed:", errorMessage); return false; } } }