From 56787fabf14dd7e6257539edf9f0d87d10fccb57 Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Thu, 25 Jun 2026 17:17:24 +0000 Subject: [PATCH] fix(gateway): disable Redis consumers on local tier (#689) --- .../src/admin/admin-health.controller.ts | 15 ++++- .../src/commands/command-executor.service.ts | 23 ++++--- apps/gateway/src/commands/commands.module.ts | 20 ++++-- apps/gateway/src/gc/gc.module.ts | 20 ++++-- apps/gateway/src/gc/session-gc.service.ts | 45 +++++++++---- apps/gateway/src/log/cron.service.ts | 18 +++-- .../preferences/system-override.service.ts | 66 +++++++++++++++++-- apps/gateway/src/queue/queue.service.spec.ts | 35 ++++++++++ apps/gateway/src/queue/queue.service.ts | 64 ++++++++++++++++-- 9 files changed, 256 insertions(+), 50 deletions(-) create mode 100644 apps/gateway/src/queue/queue.service.spec.ts diff --git a/apps/gateway/src/admin/admin-health.controller.ts b/apps/gateway/src/admin/admin-health.controller.ts index 2331143..99fd93f 100644 --- a/apps/gateway/src/admin/admin-health.controller.ts +++ b/apps/gateway/src/admin/admin-health.controller.ts @@ -1,9 +1,11 @@ -import { Controller, Get, Inject, UseGuards } from '@nestjs/common'; +import { Controller, Get, Inject, Optional, UseGuards } from '@nestjs/common'; import { sql, type Db } from '@mosaicstack/db'; import { createQueue } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; import { DB } from '../database/database.module.js'; import { AgentService } from '../agent/agent.service.js'; import { ProviderService } from '../agent/provider.service.js'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { AdminGuard } from './admin.guard.js'; import type { HealthStatusDto, ServiceStatusDto } from './admin.dto.js'; @@ -14,6 +16,9 @@ export class AdminHealthController { @Inject(DB) private readonly db: Db, @Inject(AgentService) private readonly agentService: AgentService, @Inject(ProviderService) private readonly providerService: ProviderService, + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, ) {} @Get() @@ -55,6 +60,14 @@ export class AdminHealthController { } private async checkCache(): Promise { + // On Local tier there is no Redis. The cache is intentionally absent, which + // is a healthy state for this tier — report 'ok' rather than opening a new + // ioredis connection on every admin health check (which would spam + // ECONNREFUSED and create/destroy a connection per request). latencyMs 0 + // signals "no cache backend to measure" for this tier. + if (this.mosaicConfig?.queue?.type === 'local') { + return { status: 'ok', latencyMs: 0 }; + } const start = Date.now(); const handle = createQueue(); try { diff --git a/apps/gateway/src/commands/command-executor.service.ts b/apps/gateway/src/commands/command-executor.service.ts index 678f0c1..b8c1ce1 100644 --- a/apps/gateway/src/commands/command-executor.service.ts +++ b/apps/gateway/src/commands/command-executor.service.ts @@ -21,7 +21,10 @@ export class CommandExecutorService { @Inject(AgentService) private readonly agentService: AgentService, @Inject(SystemOverrideService) private readonly systemOverride: SystemOverrideService, @Inject(SessionGCService) private readonly sessionGC: SessionGCService, - @Inject(COMMANDS_REDIS) private readonly redis: QueueHandle['redis'], + // On Local tier COMMANDS_REDIS is null — provider login caching is skipped. + @Optional() + @Inject(COMMANDS_REDIS) + private readonly redis: QueueHandle['redis'] | null, @Inject(BRAIN) private readonly brain: Brain, @Optional() @Inject(forwardRef(() => ReloadService)) @@ -403,14 +406,16 @@ export class CommandExecutorService { }; } const pollToken = crypto.randomUUID(); - const key = `mosaic:auth:poll:${pollToken}`; - // Store pending state in Valkey (TTL 5 minutes) - await this.redis.set( - key, - JSON.stringify({ status: 'pending', provider: providerName, userId }), - 'EX', - 300, - ); + const pollKey = `mosaic:auth:poll:${pollToken}`; + if (this.redis) { + // Store pending state in Valkey (TTL 5 minutes) + await this.redis.set( + pollKey, + JSON.stringify({ status: 'pending', provider: providerName, userId }), + 'EX', + 300, + ); + } // In production this would construct an OAuth URL const loginUrl = `${process.env['MOSAIC_BASE_URL'] ?? 'http://localhost:3000'}/auth/provider/${providerName}?token=${pollToken}`; return { diff --git a/apps/gateway/src/commands/commands.module.ts b/apps/gateway/src/commands/commands.module.ts index 1c3a82c..1d38faa 100644 --- a/apps/gateway/src/commands/commands.module.ts +++ b/apps/gateway/src/commands/commands.module.ts @@ -1,5 +1,7 @@ -import { forwardRef, Inject, Module, type OnApplicationShutdown } from '@nestjs/common'; +import { forwardRef, Inject, Module, Optional, type OnApplicationShutdown } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { ChatModule } from '../chat/chat.module.js'; import { GCModule } from '../gc/gc.module.js'; import { ReloadModule } from '../reload/reload.module.js'; @@ -14,13 +16,17 @@ const COMMANDS_QUEUE_HANDLE = 'COMMANDS_QUEUE_HANDLE'; providers: [ { provide: COMMANDS_QUEUE_HANDLE, - useFactory: (): QueueHandle => { + useFactory: (config: MosaicConfig | null): QueueHandle | null => { + // On Local tier there is no Redis — skip the ioredis connection. + // CommandExecutorService falls back to no-cache for /provider login on local. + if (config?.queue?.type === 'local') return null; return createQueue(); }, + inject: [MOSAIC_CONFIG], }, { provide: COMMANDS_REDIS, - useFactory: (handle: QueueHandle) => handle.redis, + useFactory: (handle: QueueHandle | null) => handle?.redis ?? null, inject: [COMMANDS_QUEUE_HANDLE], }, CommandRegistryService, @@ -29,9 +35,13 @@ const COMMANDS_QUEUE_HANDLE = 'COMMANDS_QUEUE_HANDLE'; exports: [CommandRegistryService, CommandExecutorService], }) export class CommandsModule implements OnApplicationShutdown { - constructor(@Inject(COMMANDS_QUEUE_HANDLE) private readonly handle: QueueHandle) {} + constructor( + @Optional() + @Inject(COMMANDS_QUEUE_HANDLE) + private readonly handle: QueueHandle | null, + ) {} async onApplicationShutdown(): Promise { - await this.handle.close().catch(() => {}); + await this.handle?.close().catch(() => {}); } } diff --git a/apps/gateway/src/gc/gc.module.ts b/apps/gateway/src/gc/gc.module.ts index 1f426d1..bacc058 100644 --- a/apps/gateway/src/gc/gc.module.ts +++ b/apps/gateway/src/gc/gc.module.ts @@ -1,5 +1,7 @@ -import { Module, type OnApplicationShutdown, Inject } from '@nestjs/common'; +import { Module, type OnApplicationShutdown, Inject, Optional } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { SessionGCService } from './session-gc.service.js'; import { REDIS } from './gc.tokens.js'; @@ -9,13 +11,17 @@ const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE'; providers: [ { provide: GC_QUEUE_HANDLE, - useFactory: (): QueueHandle => { + useFactory: (config: MosaicConfig | null): QueueHandle | null => { + // On Local tier there is no Redis — skip the ioredis connection entirely. + // The Valkey GC sweep is a no-op on Local (no session keys stored there). + if (config?.queue?.type === 'local') return null; return createQueue(); }, + inject: [MOSAIC_CONFIG], }, { provide: REDIS, - useFactory: (handle: QueueHandle) => handle.redis, + useFactory: (handle: QueueHandle | null) => handle?.redis ?? null, inject: [GC_QUEUE_HANDLE], }, SessionGCService, @@ -23,9 +29,13 @@ const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE'; exports: [SessionGCService], }) export class GCModule implements OnApplicationShutdown { - constructor(@Inject(GC_QUEUE_HANDLE) private readonly handle: QueueHandle) {} + constructor( + @Optional() + @Inject(GC_QUEUE_HANDLE) + private readonly handle: QueueHandle | null, + ) {} async onApplicationShutdown(): Promise { - await this.handle.close().catch(() => {}); + await this.handle?.close().catch(() => {}); } } diff --git a/apps/gateway/src/gc/session-gc.service.ts b/apps/gateway/src/gc/session-gc.service.ts index 18d1e39..00282c1 100644 --- a/apps/gateway/src/gc/session-gc.service.ts +++ b/apps/gateway/src/gc/session-gc.service.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Logger, type OnModuleInit } from '@nestjs/common'; +import { Inject, Injectable, Logger, Optional, type OnModuleInit } from '@nestjs/common'; import type { QueueHandle } from '@mosaicstack/queue'; import type { LogService } from '@mosaicstack/log'; import { LOG_SERVICE } from '../log/log.tokens.js'; @@ -32,11 +32,21 @@ export class SessionGCService implements OnModuleInit { private readonly logger = new Logger(SessionGCService.name); constructor( - @Inject(REDIS) private readonly redis: QueueHandle['redis'], + // On Local tier there is no Redis — the GC module provides null for this token. + // NOTE: if a future feature stores Redis-backed state on Local tier, this guard + // would silently skip GC for those keys. Revisit when that happens. + @Optional() + @Inject(REDIS) + private readonly redis: QueueHandle['redis'] | null, @Inject(LOG_SERVICE) private readonly logService: LogService, ) {} onModuleInit(): void { + if (!this.redis) { + // Local tier: no Valkey — skip cold-start GC entirely (correct no-op). + this.logger.log('SessionGCService: Valkey GC skipped on local tier (no Redis configured)'); + return; + } // Fire-and-forget: run full GC asynchronously so it does not block the // NestJS bootstrap chain. Cold-start GC typically takes 100–500 ms // depending on Valkey key count; deferring it removes that latency from @@ -60,8 +70,10 @@ export class SessionGCService implements OnModuleInit { * Scan Valkey for all keys matching a pattern using SCAN (non-blocking). * KEYS is avoided because it blocks the Valkey event loop for the full scan * duration, which can cause latency spikes under production key volumes. + * Returns empty array when Redis is not available (Local tier). */ private async scanKeys(pattern: string): Promise { + if (!this.redis) return []; const collected: string[] = []; let cursor = '0'; do { @@ -78,12 +90,14 @@ export class SessionGCService implements OnModuleInit { async collect(sessionId: string): Promise { const result: GCResult = { sessionId, cleaned: {} }; - // 1. Valkey: delete all session-scoped keys - const pattern = `mosaic:session:${sessionId}:*`; - const valkeyKeys = await this.scanKeys(pattern); - if (valkeyKeys.length > 0) { - await this.redis.del(...valkeyKeys); - result.cleaned.valkeyKeys = valkeyKeys.length; + // 1. Valkey: delete all session-scoped keys (skipped on Local tier) + if (this.redis) { + const pattern = `mosaic:session:${sessionId}:*`; + const valkeyKeys = await this.scanKeys(pattern); + if (valkeyKeys.length > 0) { + await this.redis.del(...valkeyKeys); + result.cleaned.valkeyKeys = valkeyKeys.length; + } } // 2. PG: demote hot-tier agent_logs for this session to warm @@ -106,6 +120,7 @@ export class SessionGCService implements OnModuleInit { const cleaned: GCResult[] = []; // 1. Find all session-scoped Valkey keys (non-blocking SCAN) + // Returns empty on Local tier — no Valkey session keys exist there. const allSessionKeys = await this.scanKeys('mosaic:session:*'); // Extract unique session IDs from keys @@ -136,11 +151,15 @@ export class SessionGCService implements OnModuleInit { */ async fullCollect(): Promise { const start = Date.now(); + let valkeyKeysCount = 0; - // 1. Valkey: delete ALL session-scoped keys (non-blocking SCAN) - const sessionKeys = await this.scanKeys('mosaic:session:*'); - if (sessionKeys.length > 0) { - await this.redis.del(...sessionKeys); + if (this.redis) { + // 1. Valkey: delete ALL session-scoped keys (non-blocking SCAN) + const sessionKeys = await this.scanKeys('mosaic:session:*'); + if (sessionKeys.length > 0) { + await this.redis.del(...sessionKeys); + } + valkeyKeysCount = sessionKeys.length; } // 2. NOTE: channel keys are NOT collected on cold start @@ -154,7 +173,7 @@ export class SessionGCService implements OnModuleInit { const jobsPurged = 0; return { - valkeyKeys: sessionKeys.length, + valkeyKeys: valkeyKeysCount, logsDemoted, jobsPurged, tempFilesRemoved: 0, diff --git a/apps/gateway/src/log/cron.service.ts b/apps/gateway/src/log/cron.service.ts index aa9b82f..4b5ccb8 100644 --- a/apps/gateway/src/log/cron.service.ts +++ b/apps/gateway/src/log/cron.service.ts @@ -19,7 +19,7 @@ import type { MosaicJobData } from '../queue/queue.service.js'; @Injectable() export class CronService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CronService.name); - private readonly registeredWorkers: Worker[] = []; + private readonly registeredWorkers: Array> = []; constructor( @Inject(SummarizationService) private readonly summarization: SummarizationService, @@ -28,6 +28,16 @@ export class CronService implements OnModuleInit, OnModuleDestroy { ) {} async onModuleInit(): Promise { + // On Local tier BullMQ is disabled — skip all job scheduling. + // NOTE: this means summarization, tier management, and Valkey GC jobs do not + // run on Local installs. For a single-user local install this is acceptable. + // If periodic background work is needed on Local in the future, add a + // setInterval-based scheduler here. + if (!this.queueService.isEnabled()) { + this.logger.log('CronService: BullMQ disabled on local tier — no jobs will be scheduled'); + return; + } + const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am @@ -42,7 +52,7 @@ export class CronService implements OnModuleInit, OnModuleDestroy { const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => { await this.summarization.runSummarization(); }); - this.registeredWorkers.push(summarizationWorker); + if (summarizationWorker) this.registeredWorkers.push(summarizationWorker); // M6-005: Tier management repeatable job await this.queueService.addRepeatableJob( @@ -54,14 +64,14 @@ export class CronService implements OnModuleInit, OnModuleDestroy { const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => { await this.summarization.runTierManagement(); }); - this.registeredWorkers.push(tierWorker); + if (tierWorker) this.registeredWorkers.push(tierWorker); // M6-004: GC repeatable job await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule); const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => { await this.sessionGC.sweepOrphans(); }); - this.registeredWorkers.push(gcWorker); + if (gcWorker) this.registeredWorkers.push(gcWorker); this.logger.log( `BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`, diff --git a/apps/gateway/src/preferences/system-override.service.ts b/apps/gateway/src/preferences/system-override.service.ts index 5fa48da..e35ed63 100644 --- a/apps/gateway/src/preferences/system-override.service.ts +++ b/apps/gateway/src/preferences/system-override.service.ts @@ -1,5 +1,7 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger, Optional, type OnApplicationShutdown } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; const SESSION_SYSTEM_KEY = (sessionId: string) => `mosaic:session:${sessionId}:system`; const SESSION_SYSTEM_FRAGMENTS_KEY = (sessionId: string) => @@ -11,16 +13,54 @@ interface OverrideFragment { addedAt: number; } -@Injectable() -export class SystemOverrideService { - private readonly logger = new Logger(SystemOverrideService.name); - private readonly handle: QueueHandle; +interface LocalOverrideEntry { + condensed: string; + fragments: OverrideFragment[]; +} - constructor() { - this.handle = createQueue(); +@Injectable() +export class SystemOverrideService implements OnApplicationShutdown { + private readonly logger = new Logger(SystemOverrideService.name); + private readonly handle: QueueHandle | null; + /** + * In-memory fallback used on Local tier (no Redis). + * NOTE: state is ephemeral — lost on restart. For Local single-user installs + * this is acceptable; system overrides are re-applied at the next session. + * This is a deliberate behavior change from the Redis-backed 7-day TTL. + */ + private readonly localStore = new Map(); + + constructor( + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, + ) { + if (this.mosaicConfig?.queue?.type === 'local') { + this.handle = null; + } else { + this.handle = createQueue(); + } + } + + async onApplicationShutdown(): Promise { + // On non-local tiers the constructor opens an ioredis connection; close it + // on graceful shutdown to avoid leaking the handle (local tier is null). + await this.handle?.close().catch(() => {}); } async set(sessionId: string, override: string): Promise { + if (!this.handle) { + // Local tier: in-memory path + const entry = this.localStore.get(sessionId) ?? { condensed: '', fragments: [] }; + entry.fragments.push({ text: override, addedAt: Date.now() }); + entry.condensed = await this.condenseOverrides(entry.fragments.map((f) => f.text)); + this.localStore.set(sessionId, entry); + this.logger.debug( + `Set system override for session ${sessionId} (local, ${entry.fragments.length} fragment(s))`, + ); + return; + } + // Load existing fragments const existing = await this.handle.redis.get(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId)); const fragments: OverrideFragment[] = existing @@ -50,10 +90,17 @@ export class SystemOverrideService { } async get(sessionId: string): Promise { + if (!this.handle) { + return this.localStore.get(sessionId)?.condensed ?? null; + } return this.handle.redis.get(SESSION_SYSTEM_KEY(sessionId)); } async renew(sessionId: string): Promise { + if (!this.handle) { + // Local tier: no TTL to renew; entry persists until restart + return; + } const pipeline = this.handle.redis.pipeline(); pipeline.expire(SESSION_SYSTEM_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS); pipeline.expire(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS); @@ -61,6 +108,11 @@ export class SystemOverrideService { } async clear(sessionId: string): Promise { + if (!this.handle) { + this.localStore.delete(sessionId); + this.logger.debug(`Cleared system override for session ${sessionId} (local)`); + return; + } await this.handle.redis.del( SESSION_SYSTEM_KEY(sessionId), SESSION_SYSTEM_FRAGMENTS_KEY(sessionId), diff --git a/apps/gateway/src/queue/queue.service.spec.ts b/apps/gateway/src/queue/queue.service.spec.ts new file mode 100644 index 0000000..85f1e64 --- /dev/null +++ b/apps/gateway/src/queue/queue.service.spec.ts @@ -0,0 +1,35 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { QueueService } from './queue.service.js'; + +const localConfig = { + queue: { type: 'local' }, +} as MosaicConfig; + +describe('QueueService local tier', () => { + it('disables BullMQ and treats queue operations as local no-ops', async () => { + const service = new QueueService(null, localConfig); + + expect(service.isEnabled()).toBe(false); + expect(service.getQueue('mosaic-test')).toBeNull(); + expect(service.registerWorker('mosaic-test', vi.fn())).toBeNull(); + + await expect( + service.addRepeatableJob('mosaic-test', 'local-noop', {}, '* * * * *'), + ).resolves.toBeUndefined(); + await expect(service.getHealthStatus()).resolves.toEqual({ queues: {}, healthy: true }); + await expect(service.listJobs()).resolves.toEqual([]); + await expect(service.retryJob('mosaic-test__1')).resolves.toEqual({ + ok: false, + message: 'BullMQ is disabled on local tier.', + }); + await expect(service.pauseQueue('mosaic-test')).resolves.toEqual({ + ok: false, + message: 'BullMQ is disabled on local tier.', + }); + await expect(service.resumeQueue('mosaic-test')).resolves.toEqual({ + ok: false, + message: 'BullMQ is disabled on local tier.', + }); + }); +}); diff --git a/apps/gateway/src/queue/queue.service.ts b/apps/gateway/src/queue/queue.service.ts index a50a773..6e84340 100644 --- a/apps/gateway/src/queue/queue.service.ts +++ b/apps/gateway/src/queue/queue.service.ts @@ -8,7 +8,9 @@ import { } from '@nestjs/common'; import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq'; import type { LogService } from '@mosaicstack/log'; +import type { MosaicConfig } from '@mosaicstack/config'; import { LOG_SERVICE } from '../log/log.tokens.js'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import type { JobDto, JobStatus } from './queue-admin.dto.js'; // --------------------------------------------------------------------------- @@ -108,21 +110,42 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { private readonly connection: ConnectionOptions; private readonly queues = new Map>(); private readonly workers = new Map>(); + /** False on Local tier — BullMQ/Redis operations become no-ops. */ + private readonly enabled: boolean; constructor( @Optional() @Inject(LOG_SERVICE) private readonly logService: LogService | null, + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, ) { - this.connection = getConnection(); + this.enabled = this.mosaicConfig?.queue?.type !== 'local'; + this.connection = this.enabled + ? getConnection() + : ({ host: '127.0.0.1', port: 6380 } as ConnectionOptions); + } + + /** Returns true when BullMQ/Redis is active (Standalone and Federated tiers). */ + isEnabled(): boolean { + return this.enabled; } onModuleInit(): void { - this.logger.log('QueueService initialised (BullMQ)'); + if (this.enabled) { + this.logger.log('QueueService initialised (BullMQ)'); + } else { + this.logger.log( + 'QueueService: BullMQ disabled for local tier — no Redis connections will be opened', + ); + } } async onModuleDestroy(): Promise { - await this.closeAll(); + if (this.enabled) { + await this.closeAll(); + } } // ------------------------------------------------------------------------- @@ -131,8 +154,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Get or create a BullMQ Queue for the given queue name. + * Returns null on Local tier where BullMQ is disabled. */ - getQueue(name: string): Queue { + getQueue(name: string): Queue | null { + if (!this.enabled) return null; let queue = this.queues.get(name) as Queue | undefined; if (!queue) { queue = new Queue(name, { connection: this.connection }); @@ -144,6 +169,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Add a BullMQ repeatable job (cron-style). * Uses `jobId` as a deterministic key so duplicate registrations are idempotent. + * No-op on Local tier. */ async addRepeatableJob( queueName: string, @@ -151,7 +177,13 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { data: T, cronExpression: string, ): Promise { - const queue = this.getQueue(queueName); + if (!this.enabled) { + this.logger.debug( + `Skipping repeatable job "${jobName}" on "${queueName}" (local tier — BullMQ disabled)`, + ); + return; + } + const queue = this.getQueue(queueName)!; // eslint-disable-next-line @typescript-eslint/no-explicit-any await (queue as Queue).add(jobName, data, { repeat: { pattern: cronExpression }, @@ -165,8 +197,18 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Register a Worker for the given queue name with error handling and * exponential backoff. + * Returns null on Local tier where BullMQ is disabled. */ - registerWorker(queueName: string, handler: JobHandler): Worker { + registerWorker( + queueName: string, + handler: JobHandler, + ): Worker | null { + if (!this.enabled) { + this.logger.debug( + `Skipping worker registration for "${queueName}" (local tier — BullMQ disabled)`, + ); + return null; + } const worker = new Worker( queueName, async (job) => { @@ -223,8 +265,12 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Return queue health statistics for all managed queues. + * Returns an empty healthy result on Local tier. */ async getHealthStatus(): Promise { + if (!this.enabled) { + return { queues: {}, healthy: true }; + } const queues: QueueHealthStatus['queues'] = {}; let healthy = true; @@ -255,8 +301,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * List jobs across all managed queues, optionally filtered by status. * BullMQ jobs are fetched by state type from each queue. + * Returns empty array on Local tier. */ async listJobs(status?: JobStatus): Promise { + if (!this.enabled) return []; const jobs: JobDto[] = []; const states: JobStatus[] = status ? [status] @@ -283,8 +331,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Retry a specific failed job by its BullMQ job ID (format: "queueName:id"). * The caller passes "__" as the composite ID because BullMQ * job IDs are not globally unique — they are scoped to their queue. + * Returns an error on Local tier. */ async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const sep = compositeId.lastIndexOf('__'); if (sep === -1) { return { ok: false, message: 'Invalid job id format. Expected "__".' }; @@ -316,6 +366,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Pause a queue by name. */ async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.pause(); @@ -327,6 +378,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Resume a paused queue by name. */ async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.resume();