Files
stack/apps/api/src/valkey/valkey.service.ts
Jason Woltje 22446acd8a fix(CQ-API-4): Remove Redis event listeners in onModuleDestroy
Add removeAllListeners() call before quit() to prevent memory leaks
from lingering event listeners on the Redis client.

Also update test mock to include removeAllListeners method.

Refs #339

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 19:16:37 -06:00

230 lines
6.4 KiB
TypeScript

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common";
import Redis from "ioredis";
import { TaskDto, TaskStatus, EnqueueTaskDto, UpdateTaskStatusDto } from "./dto/task.dto";
import { randomUUID } from "crypto";
/**
* ValkeyService - Task queue service using Valkey (Redis-compatible)
*
* Provides task queue operations:
* - enqueue(task): Add task to queue
* - dequeue(): Get next task from queue
* - getStatus(taskId): Get task status and metadata
* - updateStatus(taskId, status): Update task status
*/
@Injectable()
export class ValkeyService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(ValkeyService.name);
private client!: Redis;
private readonly QUEUE_KEY = "mosaic:task:queue";
private readonly TASK_PREFIX = "mosaic:task:";
private readonly TASK_TTL = 86400; // 24 hours in seconds
async onModuleInit() {
const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379";
this.logger.log(`Connecting to Valkey at ${valkeyUrl}`);
this.client = new Redis(valkeyUrl, {
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
this.logger.warn(
`Valkey connection retry attempt ${times.toString()}, waiting ${delay.toString()}ms`
);
return delay;
},
reconnectOnError: (err: Error) => {
this.logger.error("Valkey connection error:", err.message);
return true;
},
});
this.client.on("connect", () => {
this.logger.log("Valkey connected successfully");
});
this.client.on("error", (err: Error) => {
this.logger.error("Valkey client error:", err.message);
});
this.client.on("close", () => {
this.logger.warn("Valkey connection closed");
});
// Wait for connection
try {
await this.client.ping();
this.logger.log("Valkey health check passed");
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error("Valkey health check failed:", errorMessage);
throw error;
}
}
async onModuleDestroy(): Promise<void> {
this.logger.log("Disconnecting from Valkey");
// Remove all event listeners to prevent memory leaks
this.client.removeAllListeners();
await this.client.quit();
}
/**
* Add a task to the queue
* @param task - Task to enqueue
* @returns The created task with ID and metadata
*/
async enqueue(task: EnqueueTaskDto): Promise<TaskDto> {
const taskId = randomUUID();
const now = new Date();
const taskData: TaskDto = {
id: taskId,
type: task.type,
data: task.data,
status: TaskStatus.PENDING,
createdAt: now,
updatedAt: now,
};
// Store task metadata
const taskKey = this.getTaskKey(taskId);
await this.client.setex(taskKey, this.TASK_TTL, JSON.stringify(taskData));
// Add to queue (RPUSH = add to tail, LPOP = remove from head => FIFO)
await this.client.rpush(this.QUEUE_KEY, taskId);
this.logger.log(`Task enqueued: ${taskId} (type: ${task.type})`);
return taskData;
}
/**
* Get the next task from the queue
* @returns The next task or null if queue is empty
*/
async dequeue(): Promise<TaskDto | null> {
// LPOP = remove from head (FIFO)
const taskId = await this.client.lpop(this.QUEUE_KEY);
if (!taskId) {
return null;
}
const task = await this.getStatus(taskId);
if (!task) {
this.logger.warn(`Task ${taskId} not found in metadata store`);
return null;
}
// Update status to processing and return the updated task
const updatedTask = await this.updateStatus(taskId, {
status: TaskStatus.PROCESSING,
});
this.logger.log(`Task dequeued: ${taskId} (type: ${task.type})`);
return updatedTask;
}
/**
* Get task status and metadata
* @param taskId - Task ID
* @returns Task data or null if not found
*/
async getStatus(taskId: string): Promise<TaskDto | null> {
const taskKey = this.getTaskKey(taskId);
const taskData = await this.client.get(taskKey);
if (!taskData) {
return null;
}
try {
return JSON.parse(taskData) as TaskDto;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to parse task data for ${taskId}:`, errorMessage);
return null;
}
}
/**
* Update task status and metadata
* @param taskId - Task ID
* @param update - Status update data
* @returns Updated task or null if not found
*/
async updateStatus(taskId: string, update: UpdateTaskStatusDto): Promise<TaskDto | null> {
const task = await this.getStatus(taskId);
if (!task) {
this.logger.warn(`Cannot update status for non-existent task: ${taskId}`);
return null;
}
const now = new Date();
const updatedTask: TaskDto = {
...task,
status: update.status,
updatedAt: now,
};
if (update.error) {
updatedTask.error = update.error;
}
if (update.status === TaskStatus.COMPLETED || update.status === TaskStatus.FAILED) {
updatedTask.completedAt = now;
}
if (update.result) {
updatedTask.data = { ...task.data, ...update.result };
}
const taskKey = this.getTaskKey(taskId);
await this.client.setex(taskKey, this.TASK_TTL, JSON.stringify(updatedTask));
this.logger.log(`Task status updated: ${taskId} => ${update.status}`);
return updatedTask;
}
/**
* Get queue length
* @returns Number of tasks in queue
*/
async getQueueLength(): Promise<number> {
return await this.client.llen(this.QUEUE_KEY);
}
/**
* Clear all tasks from queue (use with caution!)
*/
async clearQueue(): Promise<void> {
await this.client.del(this.QUEUE_KEY);
this.logger.warn("Queue cleared");
}
/**
* Get task key for Redis storage
*/
private getTaskKey(taskId: string): string {
return `${this.TASK_PREFIX}${taskId}`;
}
/**
* Health check - ping Valkey
*/
async healthCheck(): Promise<boolean> {
try {
const result = await this.client.ping();
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
return result === "PONG";
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error("Valkey health check failed:", errorMessage);
return false;
}
}
}