Server-side improvements (ALL 27/27 TESTS PASSING): - Add streamEventsFrom() method with lastEventId parameter for resuming streams - Include event IDs in SSE messages (id: event-123) for reconnection support - Send retry interval header (retry: 3000ms) to clients - Classify errors as retryable vs non-retryable - Handle transient errors gracefully with retry logic - Support Last-Event-ID header in controller for automatic reconnection Files modified: - apps/api/src/runner-jobs/runner-jobs.service.ts (new streamEventsFrom method) - apps/api/src/runner-jobs/runner-jobs.controller.ts (Last-Event-ID header support) - apps/api/src/runner-jobs/runner-jobs.service.spec.ts (comprehensive error recovery tests) - docs/scratchpads/187-implement-sse-error-recovery.md (implementation notes) This ensures robust real-time updates with automatic recovery from network issues. Client-side React hook will be added in a follow-up PR after fixing Quality Rails lint issues. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
123 lines
4.0 KiB
TypeScript
123 lines
4.0 KiB
TypeScript
import { Controller, Get, Post, Body, Param, Query, UseGuards, Res, Headers } from "@nestjs/common";
|
|
import { Response } from "express";
|
|
import { RunnerJobsService } from "./runner-jobs.service";
|
|
import { CreateJobDto, QueryJobsDto } from "./dto";
|
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
|
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
|
|
import { Workspace, Permission, RequirePermission } from "../common/decorators";
|
|
import { CurrentUser } from "../auth/decorators/current-user.decorator";
|
|
import type { AuthenticatedUser } from "../common/types/user.types";
|
|
|
|
/**
|
|
* Controller for runner job endpoints
|
|
* All endpoints require authentication and workspace context
|
|
*
|
|
* Guards are applied in order:
|
|
* 1. AuthGuard - Verifies user authentication
|
|
* 2. WorkspaceGuard - Validates workspace access and sets RLS context
|
|
* 3. PermissionGuard - Checks role-based permissions
|
|
*/
|
|
@Controller("runner-jobs")
|
|
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
|
|
export class RunnerJobsController {
|
|
constructor(private readonly runnerJobsService: RunnerJobsService) {}
|
|
|
|
/**
|
|
* POST /api/runner-jobs
|
|
* Create a new runner job and queue it
|
|
* Requires: MEMBER role or higher
|
|
*/
|
|
@Post()
|
|
@RequirePermission(Permission.WORKSPACE_MEMBER)
|
|
async create(
|
|
@Body() createJobDto: CreateJobDto,
|
|
@Workspace() workspaceId: string,
|
|
@CurrentUser() _user: AuthenticatedUser
|
|
) {
|
|
return this.runnerJobsService.create(workspaceId, createJobDto);
|
|
}
|
|
|
|
/**
|
|
* GET /api/runner-jobs
|
|
* Get paginated jobs with optional filters
|
|
* Requires: Any workspace member (including GUEST)
|
|
*/
|
|
@Get()
|
|
@RequirePermission(Permission.WORKSPACE_ANY)
|
|
async findAll(@Query() query: QueryJobsDto, @Workspace() workspaceId: string) {
|
|
return this.runnerJobsService.findAll(Object.assign({}, query, { workspaceId }));
|
|
}
|
|
|
|
/**
|
|
* GET /api/runner-jobs/:id
|
|
* Get a single job by ID
|
|
* Requires: Any workspace member
|
|
*/
|
|
@Get(":id")
|
|
@RequirePermission(Permission.WORKSPACE_ANY)
|
|
async findOne(@Param("id") id: string, @Workspace() workspaceId: string) {
|
|
return this.runnerJobsService.findOne(id, workspaceId);
|
|
}
|
|
|
|
/**
|
|
* POST /api/runner-jobs/:id/cancel
|
|
* Cancel a running or queued job
|
|
* Requires: MEMBER role or higher
|
|
*/
|
|
@Post(":id/cancel")
|
|
@RequirePermission(Permission.WORKSPACE_MEMBER)
|
|
async cancel(
|
|
@Param("id") id: string,
|
|
@Workspace() workspaceId: string,
|
|
@CurrentUser() _user: AuthenticatedUser
|
|
) {
|
|
return this.runnerJobsService.cancel(id, workspaceId);
|
|
}
|
|
|
|
/**
|
|
* POST /api/runner-jobs/:id/retry
|
|
* Retry a failed job
|
|
* Requires: MEMBER role or higher
|
|
*/
|
|
@Post(":id/retry")
|
|
@RequirePermission(Permission.WORKSPACE_MEMBER)
|
|
async retry(
|
|
@Param("id") id: string,
|
|
@Workspace() workspaceId: string,
|
|
@CurrentUser() _user: AuthenticatedUser
|
|
) {
|
|
return this.runnerJobsService.retry(id, workspaceId);
|
|
}
|
|
|
|
/**
|
|
* GET /api/runner-jobs/:id/events/stream
|
|
* Stream job events via Server-Sent Events (SSE)
|
|
* Requires: Any workspace member
|
|
* Supports automatic reconnection via Last-Event-ID header
|
|
*/
|
|
@Get(":id/events/stream")
|
|
@RequirePermission(Permission.WORKSPACE_ANY)
|
|
async streamEvents(
|
|
@Param("id") id: string,
|
|
@Workspace() workspaceId: string,
|
|
@Headers("last-event-id") lastEventId: string | undefined,
|
|
@Res() res: Response
|
|
): Promise<void> {
|
|
// Set SSE headers
|
|
res.setHeader("Content-Type", "text/event-stream");
|
|
res.setHeader("Cache-Control", "no-cache");
|
|
res.setHeader("Connection", "keep-alive");
|
|
res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering
|
|
|
|
try {
|
|
await this.runnerJobsService.streamEvents(id, workspaceId, res, lastEventId);
|
|
} catch (error: unknown) {
|
|
// Write error to stream
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
res.write(`event: error\n`);
|
|
res.write(`data: ${JSON.stringify({ error: errorMessage })}\n\n`);
|
|
res.end();
|
|
}
|
|
}
|
|
}
|