Files
stack/apps/api/src/runner-jobs/runner-jobs.controller.ts
Jason Woltje a3b48dd631 fix(#187): implement server-side SSE error recovery
Server-side improvements (ALL 27/27 TESTS PASSING):
- Add streamEventsFrom() method with lastEventId parameter for resuming streams
- Include event IDs in SSE messages (id: event-123) for reconnection support
- Send retry interval header (retry: 3000ms) to clients
- Classify errors as retryable vs non-retryable
- Handle transient errors gracefully with retry logic
- Support Last-Event-ID header in controller for automatic reconnection

Files modified:
- apps/api/src/runner-jobs/runner-jobs.service.ts (new streamEventsFrom method)
- apps/api/src/runner-jobs/runner-jobs.controller.ts (Last-Event-ID header support)
- apps/api/src/runner-jobs/runner-jobs.service.spec.ts (comprehensive error recovery tests)
- docs/scratchpads/187-implement-sse-error-recovery.md (implementation notes)

This ensures robust real-time updates with automatic recovery from network issues.
Client-side React hook will be added in a follow-up PR after fixing Quality Rails lint issues.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-02 12:41:12 -06:00

123 lines
4.0 KiB
TypeScript

import { Controller, Get, Post, Body, Param, Query, UseGuards, Res, Headers } from "@nestjs/common";
import { Response } from "express";
import { RunnerJobsService } from "./runner-jobs.service";
import { CreateJobDto, QueryJobsDto } from "./dto";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { Workspace, Permission, RequirePermission } from "../common/decorators";
import { CurrentUser } from "../auth/decorators/current-user.decorator";
import type { AuthenticatedUser } from "../common/types/user.types";
/**
* Controller for runner job endpoints
* All endpoints require authentication and workspace context
*
* Guards are applied in order:
* 1. AuthGuard - Verifies user authentication
* 2. WorkspaceGuard - Validates workspace access and sets RLS context
* 3. PermissionGuard - Checks role-based permissions
*/
@Controller("runner-jobs")
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
export class RunnerJobsController {
constructor(private readonly runnerJobsService: RunnerJobsService) {}
/**
* POST /api/runner-jobs
* Create a new runner job and queue it
* Requires: MEMBER role or higher
*/
@Post()
@RequirePermission(Permission.WORKSPACE_MEMBER)
async create(
@Body() createJobDto: CreateJobDto,
@Workspace() workspaceId: string,
@CurrentUser() _user: AuthenticatedUser
) {
return this.runnerJobsService.create(workspaceId, createJobDto);
}
/**
* GET /api/runner-jobs
* Get paginated jobs with optional filters
* Requires: Any workspace member (including GUEST)
*/
@Get()
@RequirePermission(Permission.WORKSPACE_ANY)
async findAll(@Query() query: QueryJobsDto, @Workspace() workspaceId: string) {
return this.runnerJobsService.findAll(Object.assign({}, query, { workspaceId }));
}
/**
* GET /api/runner-jobs/:id
* Get a single job by ID
* Requires: Any workspace member
*/
@Get(":id")
@RequirePermission(Permission.WORKSPACE_ANY)
async findOne(@Param("id") id: string, @Workspace() workspaceId: string) {
return this.runnerJobsService.findOne(id, workspaceId);
}
/**
* POST /api/runner-jobs/:id/cancel
* Cancel a running or queued job
* Requires: MEMBER role or higher
*/
@Post(":id/cancel")
@RequirePermission(Permission.WORKSPACE_MEMBER)
async cancel(
@Param("id") id: string,
@Workspace() workspaceId: string,
@CurrentUser() _user: AuthenticatedUser
) {
return this.runnerJobsService.cancel(id, workspaceId);
}
/**
* POST /api/runner-jobs/:id/retry
* Retry a failed job
* Requires: MEMBER role or higher
*/
@Post(":id/retry")
@RequirePermission(Permission.WORKSPACE_MEMBER)
async retry(
@Param("id") id: string,
@Workspace() workspaceId: string,
@CurrentUser() _user: AuthenticatedUser
) {
return this.runnerJobsService.retry(id, workspaceId);
}
/**
* GET /api/runner-jobs/:id/events/stream
* Stream job events via Server-Sent Events (SSE)
* Requires: Any workspace member
* Supports automatic reconnection via Last-Event-ID header
*/
@Get(":id/events/stream")
@RequirePermission(Permission.WORKSPACE_ANY)
async streamEvents(
@Param("id") id: string,
@Workspace() workspaceId: string,
@Headers("last-event-id") lastEventId: string | undefined,
@Res() res: Response
): Promise<void> {
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering
try {
await this.runnerJobsService.streamEvents(id, workspaceId, res, lastEventId);
} catch (error: unknown) {
// Write error to stream
const errorMessage = error instanceof Error ? error.message : String(error);
res.write(`event: error\n`);
res.write(`data: ${JSON.stringify({ error: errorMessage })}\n\n`);
res.end();
}
}
}