feat(queue): add redis connection module with health check (MQ-002)
This commit is contained in:
@@ -12,5 +12,8 @@
|
||||
"lint": "eslint \"src/**/*.ts\" \"tests/**/*.ts\" \"vitest.config.ts\"",
|
||||
"build": "tsc -p tsconfig.build.json",
|
||||
"test": "vitest run"
|
||||
},
|
||||
"dependencies": {
|
||||
"ioredis": "^5.10.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1 +1,14 @@
|
||||
export const packageVersion = '0.0.1';
|
||||
|
||||
export {
|
||||
assertRedisHealthy,
|
||||
createRedisClient,
|
||||
resolveRedisUrl,
|
||||
runRedisHealthCheck,
|
||||
} from './redis-connection.js';
|
||||
export type {
|
||||
CreateRedisClientOptions,
|
||||
RedisClientConstructor,
|
||||
RedisHealthCheck,
|
||||
RedisPingClient,
|
||||
} from './redis-connection.js';
|
||||
|
||||
95
packages/queue/src/redis-connection.ts
Normal file
95
packages/queue/src/redis-connection.ts
Normal file
@@ -0,0 +1,95 @@
|
||||
import Redis, { type RedisOptions } from 'ioredis';
|
||||
|
||||
const ERR_MISSING_REDIS_URL =
|
||||
'Missing required Valkey/Redis connection URL. Set VALKEY_URL or REDIS_URL.';
|
||||
|
||||
export interface RedisHealthCheck {
|
||||
readonly checkedAt: number;
|
||||
readonly latencyMs: number;
|
||||
readonly ok: boolean;
|
||||
readonly response?: string;
|
||||
readonly error?: string;
|
||||
}
|
||||
|
||||
export interface RedisPingClient {
|
||||
ping(): Promise<string>;
|
||||
}
|
||||
|
||||
export type RedisClientConstructor<TClient> = new (
|
||||
url: string,
|
||||
options?: RedisOptions,
|
||||
) => TClient;
|
||||
|
||||
export interface CreateRedisClientOptions<TClient> {
|
||||
readonly env?: NodeJS.ProcessEnv;
|
||||
readonly redisConstructor?: RedisClientConstructor<TClient>;
|
||||
readonly redisOptions?: RedisOptions;
|
||||
}
|
||||
|
||||
export function resolveRedisUrl(env: NodeJS.ProcessEnv = process.env): string {
|
||||
const resolvedUrl = env.VALKEY_URL ?? env.REDIS_URL;
|
||||
|
||||
if (typeof resolvedUrl !== 'string' || resolvedUrl.trim().length === 0) {
|
||||
throw new Error(ERR_MISSING_REDIS_URL);
|
||||
}
|
||||
|
||||
return resolvedUrl;
|
||||
}
|
||||
|
||||
export function createRedisClient<TClient = Redis>(
|
||||
options: CreateRedisClientOptions<TClient> = {},
|
||||
): TClient {
|
||||
const redisUrl = resolveRedisUrl(options.env);
|
||||
|
||||
const RedisCtor =
|
||||
options.redisConstructor ??
|
||||
(Redis as unknown as RedisClientConstructor<TClient>);
|
||||
|
||||
return new RedisCtor(redisUrl, {
|
||||
maxRetriesPerRequest: null,
|
||||
...options.redisOptions,
|
||||
});
|
||||
}
|
||||
|
||||
export async function runRedisHealthCheck(
|
||||
client: RedisPingClient,
|
||||
): Promise<RedisHealthCheck> {
|
||||
const startedAt = process.hrtime.bigint();
|
||||
|
||||
try {
|
||||
const response = await client.ping();
|
||||
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
|
||||
|
||||
return {
|
||||
checkedAt: Date.now(),
|
||||
latencyMs: elapsedMs,
|
||||
ok: true,
|
||||
response,
|
||||
};
|
||||
} catch (error) {
|
||||
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
|
||||
const message =
|
||||
error instanceof Error ? error.message : 'Unknown redis health check error';
|
||||
|
||||
return {
|
||||
checkedAt: Date.now(),
|
||||
latencyMs: elapsedMs,
|
||||
ok: false,
|
||||
error: message,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export async function assertRedisHealthy(
|
||||
client: RedisPingClient,
|
||||
): Promise<RedisHealthCheck> {
|
||||
const health = await runRedisHealthCheck(client);
|
||||
|
||||
if (!health.ok) {
|
||||
throw new Error(
|
||||
`Redis health check failed after ${health.latencyMs}ms: ${health.error ?? 'unknown error'}`,
|
||||
);
|
||||
}
|
||||
|
||||
return health;
|
||||
}
|
||||
76
packages/queue/tests/redis-connection.test.ts
Normal file
76
packages/queue/tests/redis-connection.test.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import {
|
||||
createRedisClient,
|
||||
resolveRedisUrl,
|
||||
runRedisHealthCheck,
|
||||
} from '../src/redis-connection.js';
|
||||
|
||||
describe('resolveRedisUrl', () => {
|
||||
it('prefers VALKEY_URL when both env vars are present', () => {
|
||||
const url = resolveRedisUrl({
|
||||
VALKEY_URL: 'redis://valkey.local:6379',
|
||||
REDIS_URL: 'redis://redis.local:6379',
|
||||
});
|
||||
|
||||
expect(url).toBe('redis://valkey.local:6379');
|
||||
});
|
||||
|
||||
it('falls back to REDIS_URL when VALKEY_URL is missing', () => {
|
||||
const url = resolveRedisUrl({
|
||||
REDIS_URL: 'redis://redis.local:6379',
|
||||
});
|
||||
|
||||
expect(url).toBe('redis://redis.local:6379');
|
||||
});
|
||||
|
||||
it('throws loudly when no redis environment variable exists', () => {
|
||||
expect(() => resolveRedisUrl({})).toThrowError(
|
||||
/Missing required Valkey\/Redis connection URL/i,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('createRedisClient', () => {
|
||||
it('uses env URL for client creation with no hardcoded defaults', () => {
|
||||
class FakeRedis {
|
||||
public readonly url: string;
|
||||
|
||||
public constructor(url: string) {
|
||||
this.url = url;
|
||||
}
|
||||
}
|
||||
|
||||
const client = createRedisClient({
|
||||
env: {
|
||||
VALKEY_URL: 'redis://queue.local:6379',
|
||||
},
|
||||
redisConstructor: FakeRedis,
|
||||
});
|
||||
|
||||
expect(client.url).toBe('redis://queue.local:6379');
|
||||
});
|
||||
});
|
||||
|
||||
describe('runRedisHealthCheck', () => {
|
||||
it('returns healthy status when ping succeeds', async () => {
|
||||
const health = await runRedisHealthCheck({
|
||||
ping: () => Promise.resolve('PONG'),
|
||||
});
|
||||
|
||||
expect(health.ok).toBe(true);
|
||||
expect(health.response).toBe('PONG');
|
||||
expect(health.latencyMs).toBeTypeOf('number');
|
||||
expect(health.latencyMs).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it('returns unhealthy status when ping fails', async () => {
|
||||
const health = await runRedisHealthCheck({
|
||||
ping: () => Promise.reject(new Error('connection refused')),
|
||||
});
|
||||
|
||||
expect(health.ok).toBe(false);
|
||||
expect(health.error).toMatch(/connection refused/i);
|
||||
expect(health.latencyMs).toBeTypeOf('number');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user