87 lines
3.3 KiB
TypeScript
87 lines
3.3 KiB
TypeScript
import {
|
|
Inject,
|
|
Injectable,
|
|
Logger,
|
|
type OnModuleInit,
|
|
type OnModuleDestroy,
|
|
} from '@nestjs/common';
|
|
import { SummarizationService } from './summarization.service.js';
|
|
import { SessionGCService } from '../gc/session-gc.service.js';
|
|
import {
|
|
QueueService,
|
|
QUEUE_SUMMARIZATION,
|
|
QUEUE_GC,
|
|
QUEUE_TIER_MANAGEMENT,
|
|
} from '../queue/queue.service.js';
|
|
import type { Worker } from 'bullmq';
|
|
import type { MosaicJobData } from '../queue/queue.service.js';
|
|
|
|
@Injectable()
|
|
export class CronService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(CronService.name);
|
|
private readonly registeredWorkers: Array<Worker<MosaicJobData>> = [];
|
|
|
|
constructor(
|
|
@Inject(SummarizationService) private readonly summarization: SummarizationService,
|
|
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
|
|
@Inject(QueueService) private readonly queueService: QueueService,
|
|
) {}
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
// On Local tier BullMQ is disabled — skip all job scheduling.
|
|
// NOTE: this means summarization, tier management, and Valkey GC jobs do not
|
|
// run on Local installs. For a single-user local install this is acceptable.
|
|
// If periodic background work is needed on Local in the future, add a
|
|
// setInterval-based scheduler here.
|
|
if (!this.queueService.isEnabled()) {
|
|
this.logger.log('CronService: BullMQ disabled on local tier — no jobs will be scheduled');
|
|
return;
|
|
}
|
|
|
|
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
|
|
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
|
|
const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am
|
|
|
|
// M6-003: Summarization repeatable job
|
|
await this.queueService.addRepeatableJob(
|
|
QUEUE_SUMMARIZATION,
|
|
'summarization',
|
|
{},
|
|
summarizationSchedule,
|
|
);
|
|
const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => {
|
|
await this.summarization.runSummarization();
|
|
});
|
|
if (summarizationWorker) this.registeredWorkers.push(summarizationWorker);
|
|
|
|
// M6-005: Tier management repeatable job
|
|
await this.queueService.addRepeatableJob(
|
|
QUEUE_TIER_MANAGEMENT,
|
|
'tier-management',
|
|
{},
|
|
tierManagementSchedule,
|
|
);
|
|
const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => {
|
|
await this.summarization.runTierManagement();
|
|
});
|
|
if (tierWorker) this.registeredWorkers.push(tierWorker);
|
|
|
|
// M6-004: GC repeatable job
|
|
await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule);
|
|
const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => {
|
|
await this.sessionGC.sweepOrphans();
|
|
});
|
|
if (gcWorker) this.registeredWorkers.push(gcWorker);
|
|
|
|
this.logger.log(
|
|
`BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`,
|
|
);
|
|
}
|
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
// Workers are closed by QueueService.onModuleDestroy — nothing extra needed here.
|
|
this.registeredWorkers.length = 0;
|
|
this.logger.log('CronService destroyed (workers managed by QueueService)');
|
|
}
|
|
}
|