import { Injectable, NotFoundException, BadRequestException } from "@nestjs/common"; import { Prisma, RunnerJobStatus } from "@prisma/client"; import { Response } from "express"; import { PrismaService } from "../prisma/prisma.service"; import { BullMqService } from "../bullmq/bullmq.service"; import { QUEUE_NAMES } from "../bullmq/queues"; import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception"; import type { CreateJobDto, QueryJobsDto } from "./dto"; /** * Service for managing runner jobs */ @Injectable() export class RunnerJobsService { constructor( private readonly prisma: PrismaService, private readonly bullMq: BullMqService ) {} /** * Create a new runner job and queue it in BullMQ */ async create(workspaceId: string, createJobDto: CreateJobDto) { const priority = createJobDto.priority ?? 5; // Build data object const data: Prisma.RunnerJobCreateInput = { workspace: { connect: { id: workspaceId } }, type: createJobDto.type, priority, status: RunnerJobStatus.PENDING, progressPercent: 0, }; // Add optional fields if (createJobDto.data) { data.result = createJobDto.data as unknown as Prisma.InputJsonValue; } if (createJobDto.agentTaskId) { data.agentTask = { connect: { id: createJobDto.agentTaskId } }; } // Create job in database const job = await this.prisma.runnerJob.create({ data }); // Add job to BullMQ queue await this.bullMq.addJob( QUEUE_NAMES.RUNNER, "runner-job", { jobId: job.id, workspaceId, type: createJobDto.type, data: createJobDto.data, }, { priority } ); return job; } /** * Get paginated jobs with filters */ async findAll(query: QueryJobsDto) { const page = query.page ?? 1; const limit = query.limit ?? 50; const skip = (page - 1) * limit; // Build where clause const where: Prisma.RunnerJobWhereInput = query.workspaceId ? { workspaceId: query.workspaceId, } : {}; if (query.status) { where.status = Array.isArray(query.status) ? { in: query.status } : query.status; } if (query.type) { where.type = query.type; } if (query.agentTaskId) { where.agentTaskId = query.agentTaskId; } // Execute queries in parallel const [data, total] = await Promise.all([ this.prisma.runnerJob.findMany({ where, include: { agentTask: { select: { id: true, title: true, status: true }, }, }, orderBy: { createdAt: "desc", }, skip, take: limit, }), this.prisma.runnerJob.count({ where }), ]); return { data, meta: { total, page, limit, totalPages: Math.ceil(total / limit), }, }; } /** * Get a single job by ID */ async findOne(id: string, workspaceId: string) { const job = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId, }, include: { agentTask: { select: { id: true, title: true, status: true }, }, steps: { orderBy: { ordinal: "asc" }, }, events: { orderBy: { timestamp: "asc" }, }, }, }); if (!job) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } return job; } /** * Cancel a running or queued job with optimistic locking */ async cancel(id: string, workspaceId: string) { return this.retryOnConflict(async () => { // Verify job exists const existingJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!existingJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } // Check if job can be cancelled if ( existingJob.status === RunnerJobStatus.COMPLETED || existingJob.status === RunnerJobStatus.CANCELLED || existingJob.status === RunnerJobStatus.FAILED ) { throw new BadRequestException(`Cannot cancel job with status ${existingJob.status}`); } // Update job status to cancelled with version check const result = await this.prisma.runnerJob.updateMany({ where: { id, workspaceId, version: existingJob.version, }, data: { status: RunnerJobStatus.CANCELLED, completedAt: new Date(), version: { increment: 1 }, }, }); if (result.count === 0) { throw new ConcurrentUpdateException("RunnerJob", id, existingJob.version); } // Fetch and return updated job const job = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!job) { throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`); } return job; }); } /** * Retry a failed job by creating a new job with the same parameters */ async retry(id: string, workspaceId: string) { // Verify job exists const existingJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!existingJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } // Check if job is failed if (existingJob.status !== RunnerJobStatus.FAILED) { throw new BadRequestException("Can only retry failed jobs"); } // Create new job with same parameters const retryData: Prisma.RunnerJobCreateInput = { workspace: { connect: { id: workspaceId } }, type: existingJob.type, priority: existingJob.priority, status: RunnerJobStatus.PENDING, progressPercent: 0, }; // Add optional fields if (existingJob.result) { retryData.result = existingJob.result as Prisma.InputJsonValue; } if (existingJob.agentTaskId) { retryData.agentTask = { connect: { id: existingJob.agentTaskId } }; } const newJob = await this.prisma.runnerJob.create({ data: retryData }); // Add job to BullMQ queue await this.bullMq.addJob( QUEUE_NAMES.RUNNER, "runner-job", { jobId: newJob.id, workspaceId, type: newJob.type, data: existingJob.result, }, { priority: existingJob.priority } ); return newJob; } /** * Stream job events via Server-Sent Events (SSE) * Polls database for new events and sends them to the client * Supports error recovery with reconnection via lastEventId parameter */ async streamEvents( id: string, workspaceId: string, res: Response, lastEventId?: string ): Promise { return this.streamEventsFrom(id, workspaceId, res, lastEventId); } /** * Stream job events from a specific point (for reconnection support) * @param id Job ID * @param workspaceId Workspace ID * @param res Response object * @param lastEventId Last received event ID (for resuming streams) */ async streamEventsFrom( id: string, workspaceId: string, res: Response, lastEventId?: string ): Promise { // Verify job exists const job = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, select: { id: true, status: true }, }); if (!job) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } // Send SSE retry header (recommend 3 second retry interval) res.write("retry: 3000\n\n"); // Track last event for polling let lastEventTime = new Date(0); // Start from epoch let isActive = true; // If resuming from lastEventId, find that event's timestamp if (lastEventId) { const lastEvent = await this.prisma.jobEvent.findUnique({ where: { id: lastEventId }, select: { timestamp: true }, }); if (lastEvent) { lastEventTime = lastEvent.timestamp; } } // Set up connection cleanup res.on("close", () => { isActive = false; }); // Keep-alive ping interval (every 15 seconds) const keepAliveInterval = setInterval(() => { if (isActive) { res.write(": ping\n\n"); } }, 15000); try { // Poll for events until connection closes or job completes // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition while (isActive) { try { // Build query for events const eventsQuery = { where: { jobId: id, ...(lastEventId ? { id: { gt: lastEventId } } : { timestamp: { gt: lastEventTime } }), }, orderBy: { timestamp: "asc" as const }, }; // Fetch new events since last poll const events = await this.prisma.jobEvent.findMany(eventsQuery); // Send each event for (const event of events) { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (!isActive) break; // Write event in SSE format with event ID for reconnection support res.write(`id: ${event.id}\n`); res.write(`event: ${event.type}\n`); res.write( `data: ${JSON.stringify({ stepId: event.stepId, ...(event.payload as object), })}\n\n` ); // Update last event time and ID if (event.timestamp > lastEventTime) { lastEventTime = event.timestamp; } if (!lastEventId || event.id > lastEventId) { lastEventId = event.id; } } // Check if job has completed const currentJob = await this.prisma.runnerJob.findUnique({ where: { id }, select: { status: true }, }); if (currentJob) { if ( currentJob.status === RunnerJobStatus.COMPLETED || currentJob.status === RunnerJobStatus.FAILED || currentJob.status === RunnerJobStatus.CANCELLED ) { // Job is done, send completion signal and end stream res.write("event: stream.complete\n"); res.write(`data: ${JSON.stringify({ status: currentJob.status })}\n\n`); break; } } // Wait before next poll (500ms) await new Promise((resolve) => setTimeout(resolve, 500)); } catch (error) { // Handle transient errors by sending error event const errorMessage = error instanceof Error ? error.message : String(error); const isRetryable = this.isRetryableError(error); // Send error event to client res.write("event: error\n"); res.write( `data: ${JSON.stringify({ error: errorMessage, retryable: isRetryable, lastEventId, })}\n\n` ); // Re-throw non-retryable errors if (!isRetryable) { throw error; } // For retryable errors, wait and continue polling await new Promise((resolve) => setTimeout(resolve, 1000)); } } } finally { // Clean up clearInterval(keepAliveInterval); if (!res.writableEnded) { res.end(); } } } /** * Determine if an error is retryable (transient vs permanent) */ private isRetryableError(error: unknown): boolean { if (!(error instanceof Error)) { return false; } const retryablePatterns = [ /connection/i, /timeout/i, /temporary/i, /transient/i, /network/i, /rate limit/i, ]; return retryablePatterns.some((pattern) => pattern.test(error.message)); } /** * Retry wrapper for optimistic locking conflicts * Retries the operation up to maxRetries times with exponential backoff */ private async retryOnConflict(operation: () => Promise, maxRetries = 3): Promise { for (let attempt = 0; attempt < maxRetries; attempt++) { try { return await operation(); } catch (error) { if (error instanceof ConcurrentUpdateException && attempt < maxRetries - 1) { // Exponential backoff: 100ms, 200ms, 400ms const delayMs = Math.pow(2, attempt) * 100; await new Promise((resolve) => setTimeout(resolve, delayMs)); continue; } throw error; } } throw new Error("Retry logic failed unexpectedly"); } /** * Update job status with optimistic locking */ async updateStatus( id: string, workspaceId: string, status: RunnerJobStatus, data?: { result?: unknown; error?: string } ): Promise>> { return this.retryOnConflict(async () => { // Read current job state const existingJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!existingJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } // Validate status transition (prevent invalid transitions even with concurrency) if (!this.isValidStatusTransition(existingJob.status, status)) { throw new BadRequestException( `Invalid status transition from ${existingJob.status} to ${status}` ); } const updateData: Prisma.RunnerJobUpdateInput = { status, version: { increment: 1 }, // Increment version for optimistic locking }; // Set timestamps based on status if (status === RunnerJobStatus.RUNNING && !existingJob.startedAt) { updateData.startedAt = new Date(); } if ( status === RunnerJobStatus.COMPLETED || status === RunnerJobStatus.FAILED || status === RunnerJobStatus.CANCELLED ) { updateData.completedAt = new Date(); } // Add optional data if (data?.result !== undefined) { updateData.result = data.result as Prisma.InputJsonValue; } if (data?.error !== undefined) { updateData.error = data.error; } // Use updateMany with version check for optimistic locking const result = await this.prisma.runnerJob.updateMany({ where: { id, workspaceId, version: existingJob.version, // Only update if version matches }, data: updateData, }); // If count is 0, version mismatch (concurrent update detected) if (result.count === 0) { throw new ConcurrentUpdateException("RunnerJob", id, existingJob.version); } // Fetch and return updated job const updatedJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!updatedJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); } return updatedJob; }); } /** * Validate status transitions */ private isValidStatusTransition( currentStatus: RunnerJobStatus, newStatus: RunnerJobStatus ): boolean { // Define valid transitions const validTransitions: Record = { [RunnerJobStatus.PENDING]: [ RunnerJobStatus.QUEUED, RunnerJobStatus.RUNNING, RunnerJobStatus.CANCELLED, ], [RunnerJobStatus.QUEUED]: [RunnerJobStatus.RUNNING, RunnerJobStatus.CANCELLED], [RunnerJobStatus.RUNNING]: [ RunnerJobStatus.COMPLETED, RunnerJobStatus.FAILED, RunnerJobStatus.CANCELLED, ], [RunnerJobStatus.COMPLETED]: [], [RunnerJobStatus.FAILED]: [], [RunnerJobStatus.CANCELLED]: [], }; return validTransitions[currentStatus].includes(newStatus); } /** * Update job progress percentage with optimistic locking */ async updateProgress( id: string, workspaceId: string, progressPercent: number ): Promise>> { return this.retryOnConflict(async () => { // Read current job state const existingJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!existingJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } // Use updateMany with version check for optimistic locking const result = await this.prisma.runnerJob.updateMany({ where: { id, workspaceId, version: existingJob.version, }, data: { progressPercent, version: { increment: 1 }, }, }); if (result.count === 0) { throw new ConcurrentUpdateException("RunnerJob", id, existingJob.version); } // Fetch and return updated job const updatedJob = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, }); if (!updatedJob) { throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); } return updatedJob; }); } }