import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common"; import { PrismaService } from "../prisma/prisma.service"; import { WebSocketGateway } from "../websocket/websocket.gateway"; export interface CronExecutionResult { scheduleId: string; command: string; executedAt: Date; success: boolean; error?: string; } @Injectable() export class CronSchedulerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CronSchedulerService.name); private isRunning = false; private checkInterval: ReturnType | null = null; constructor( private readonly prisma: PrismaService, private readonly wsGateway: WebSocketGateway ) {} onModuleInit() { this.startScheduler(); this.logger.log("Cron scheduler started"); } onModuleDestroy() { this.stopScheduler(); this.logger.log("Cron scheduler stopped"); } /** * Start the scheduler - poll every minute for due schedules */ startScheduler() { if (this.isRunning) return; this.isRunning = true; this.checkInterval = setInterval(() => void this.processDueSchedules(), 60_000); // Also run immediately on start void this.processDueSchedules(); } /** * Stop the scheduler */ stopScheduler() { this.isRunning = false; if (this.checkInterval) { clearInterval(this.checkInterval); this.checkInterval = null; } } /** * Process all due cron schedules * Called every minute and on scheduler start */ async processDueSchedules(): Promise { const now = new Date(); const results: CronExecutionResult[] = []; try { // Find all enabled schedules that are due (nextRun <= now) or never run const dueSchedules = await this.prisma.cronSchedule.findMany({ where: { enabled: true, OR: [{ nextRun: null }, { nextRun: { lte: now } }], }, }); this.logger.debug(`Found ${dueSchedules.length.toString()} due schedules`); for (const schedule of dueSchedules) { const result = await this.executeSchedule( schedule.id, schedule.command, schedule.workspaceId ); results.push(result); } return results; } catch (error) { this.logger.error("Error processing due schedules", error); return results; } } /** * Execute a single cron schedule */ async executeSchedule( scheduleId: string, command: string, workspaceId: string ): Promise { const executedAt = new Date(); let success = true; let error: string | undefined; try { this.logger.log(`Executing schedule ${scheduleId}: ${command}`); // TODO: Trigger actual MoltBot command here // For now, we just log it and emit the WebSocket event // In production, this would call the MoltBot API or internal command dispatcher this.triggerMoltBotCommand(workspaceId, command); // Calculate next run time const nextRun = this.calculateNextRun(scheduleId); // Update schedule with execution info await this.prisma.cronSchedule.update({ where: { id: scheduleId }, data: { lastRun: executedAt, nextRun, }, }); // Emit WebSocket event this.wsGateway.emitCronExecuted(workspaceId, { scheduleId, command, executedAt, }); this.logger.log( `Schedule ${scheduleId} executed successfully, next run: ${nextRun.toISOString()}` ); } catch (err) { success = false; error = err instanceof Error ? err.message : "Unknown error"; this.logger.error(`Schedule ${scheduleId} failed: ${error}`); // Still update lastRun even on failure, but keep nextRun as-is await this.prisma.cronSchedule.update({ where: { id: scheduleId }, data: { lastRun: executedAt, }, }); } // Build result with conditional error property for exactOptionalPropertyTypes const result: CronExecutionResult = { scheduleId, command, executedAt, success, }; if (error !== undefined) { result.error = error; } return result; } /** * Trigger a MoltBot command (placeholder for actual integration) */ private triggerMoltBotCommand(workspaceId: string, command: string): void { // TODO: Implement actual MoltBot command triggering // Options: // 1. Internal API call if MoltBot runs in same process // 2. HTTP webhook to MoltBot endpoint // 3. Message queue (Bull/RabbitMQ) for async processing // 4. WebSocket message to MoltBot client this.logger.debug(`[MOLTBOT-TRIGGER] workspaceId=${workspaceId} command="${command}"`); // Placeholder: In production, this would actually trigger the command // For now, we just log the intent } /** * Calculate next run time from cron expression * Simple implementation - parses expression and calculates next occurrence */ private calculateNextRun(_scheduleId: string): Date { // Get the schedule to read its expression // Note: In a real implementation, this would use a proper cron parser library // like 'cron-parser' or 'cron-schedule' const now = new Date(); const next = new Date(now); next.setMinutes(next.getMinutes() + 1); // Default: next minute // TODO: Implement proper cron parsing with a library return next; } /** * Manually trigger a schedule (for testing or on-demand execution) */ async triggerManual(scheduleId: string): Promise { const schedule = await this.prisma.cronSchedule.findUnique({ where: { id: scheduleId }, }); if (!schedule?.enabled) { return null; } return this.executeSchedule(scheduleId, schedule.command, schedule.workspaceId); } /** * Get scheduler status */ getStatus() { return { running: this.isRunning, checkIntervalMs: this.isRunning ? 60_000 : null, }; } }