Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
77 lines
2.7 KiB
TypeScript
77 lines
2.7 KiB
TypeScript
import {
|
|
Inject,
|
|
Injectable,
|
|
Logger,
|
|
type OnModuleInit,
|
|
type OnModuleDestroy,
|
|
} from '@nestjs/common';
|
|
import { SummarizationService } from './summarization.service.js';
|
|
import { SessionGCService } from '../gc/session-gc.service.js';
|
|
import {
|
|
QueueService,
|
|
QUEUE_SUMMARIZATION,
|
|
QUEUE_GC,
|
|
QUEUE_TIER_MANAGEMENT,
|
|
} from '../queue/queue.service.js';
|
|
import type { Worker } from 'bullmq';
|
|
import type { MosaicJobData } from '../queue/queue.service.js';
|
|
|
|
@Injectable()
|
|
export class CronService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(CronService.name);
|
|
private readonly registeredWorkers: Worker<MosaicJobData>[] = [];
|
|
|
|
constructor(
|
|
@Inject(SummarizationService) private readonly summarization: SummarizationService,
|
|
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
|
|
@Inject(QueueService) private readonly queueService: QueueService,
|
|
) {}
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
|
|
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
|
|
const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am
|
|
|
|
// M6-003: Summarization repeatable job
|
|
await this.queueService.addRepeatableJob(
|
|
QUEUE_SUMMARIZATION,
|
|
'summarization',
|
|
{},
|
|
summarizationSchedule,
|
|
);
|
|
const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => {
|
|
await this.summarization.runSummarization();
|
|
});
|
|
this.registeredWorkers.push(summarizationWorker);
|
|
|
|
// M6-005: Tier management repeatable job
|
|
await this.queueService.addRepeatableJob(
|
|
QUEUE_TIER_MANAGEMENT,
|
|
'tier-management',
|
|
{},
|
|
tierManagementSchedule,
|
|
);
|
|
const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => {
|
|
await this.summarization.runTierManagement();
|
|
});
|
|
this.registeredWorkers.push(tierWorker);
|
|
|
|
// M6-004: GC repeatable job
|
|
await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule);
|
|
const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => {
|
|
await this.sessionGC.sweepOrphans();
|
|
});
|
|
this.registeredWorkers.push(gcWorker);
|
|
|
|
this.logger.log(
|
|
`BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`,
|
|
);
|
|
}
|
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
// Workers are closed by QueueService.onModuleDestroy — nothing extra needed here.
|
|
this.registeredWorkers.length = 0;
|
|
this.logger.log('CronService destroyed (workers managed by QueueService)');
|
|
}
|
|
}
|