diff --git a/apps/api/src/runner-jobs/runner-jobs.controller.ts b/apps/api/src/runner-jobs/runner-jobs.controller.ts index 0ab9cba..d058098 100644 --- a/apps/api/src/runner-jobs/runner-jobs.controller.ts +++ b/apps/api/src/runner-jobs/runner-jobs.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Get, Post, Body, Param, Query, UseGuards, Res } from "@nestjs/common"; +import { Controller, Get, Post, Body, Param, Query, UseGuards, Res, Headers } from "@nestjs/common"; import { Response } from "express"; import { RunnerJobsService } from "./runner-jobs.service"; import { CreateJobDto, QueryJobsDto } from "./dto"; @@ -93,12 +93,14 @@ export class RunnerJobsController { * GET /api/runner-jobs/:id/events/stream * Stream job events via Server-Sent Events (SSE) * Requires: Any workspace member + * Supports automatic reconnection via Last-Event-ID header */ @Get(":id/events/stream") @RequirePermission(Permission.WORKSPACE_ANY) async streamEvents( @Param("id") id: string, @Workspace() workspaceId: string, + @Headers("last-event-id") lastEventId: string | undefined, @Res() res: Response ): Promise { // Set SSE headers @@ -108,7 +110,7 @@ export class RunnerJobsController { res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering try { - await this.runnerJobsService.streamEvents(id, workspaceId, res); + await this.runnerJobsService.streamEvents(id, workspaceId, res, lastEventId); } catch (error: unknown) { // Write error to stream const errorMessage = error instanceof Error ? error.message : String(error); diff --git a/apps/api/src/runner-jobs/runner-jobs.service.spec.ts b/apps/api/src/runner-jobs/runner-jobs.service.spec.ts index 880fb84..10ec785 100644 --- a/apps/api/src/runner-jobs/runner-jobs.service.spec.ts +++ b/apps/api/src/runner-jobs/runner-jobs.service.spec.ts @@ -22,6 +22,7 @@ describe("RunnerJobsService", () => { }, jobEvent: { findMany: vi.fn(), + findUnique: vi.fn(), }, }; @@ -635,5 +636,250 @@ describe("RunnerJobsService", () => { expect(mockRes.on).toHaveBeenCalledWith("close", expect.any(Function)); expect(mockRes.end).toHaveBeenCalled(); }); + + // ERROR RECOVERY TESTS - Issue #187 + + it("should support resuming stream from lastEventId", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + const lastEventId = "event-5"; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn(), + writableEnded: false, + }; + + // Mock initial job lookup + mockPrismaService.runnerJob.findUnique + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }) + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.COMPLETED, + }); + + // Mock finding the last event for timestamp lookup + mockPrismaService.jobEvent.findUnique.mockResolvedValue({ + id: lastEventId, + timestamp: new Date("2026-01-01T12:00:00Z"), + }); + + // Mock events starting after the lastEventId + const mockEvents = [ + { + id: "event-6", + jobId, + stepId: "step-2", + type: "step.started", + timestamp: new Date("2026-01-01T12:01:00Z"), + payload: { name: "Next step" }, + }, + ]; + + mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents); + + // Execute streamEvents with lastEventId + await service.streamEventsFrom(jobId, workspaceId, mockRes as never, lastEventId); + + // Verify events query used lastEventId as cursor + expect(prisma.jobEvent.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ + id: { gt: lastEventId }, + }), + }) + ); + }); + + it("should send event IDs for reconnection support", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn(), + writableEnded: false, + }; + + mockPrismaService.runnerJob.findUnique + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }) + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.COMPLETED, + }); + + const mockEvents = [ + { + id: "event-123", + jobId, + stepId: "step-1", + type: "step.started", + timestamp: new Date(), + payload: { name: "Test" }, + }, + ]; + + mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents); + + await service.streamEvents(jobId, workspaceId, mockRes as never); + + // Verify event ID was sent + expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("id: event-123")); + }); + + it("should handle database connection errors gracefully", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + + let closeHandler: (() => void) | null = null; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn((event: string, handler: () => void) => { + if (event === "close") { + closeHandler = handler; + } + }), + writableEnded: false, + }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }); + + // Simulate database error during event polling (non-retryable) + const dbError = new Error("Fatal database error"); + mockPrismaService.jobEvent.findMany.mockRejectedValue(dbError); + + // Should propagate non-retryable error + await expect(service.streamEvents(jobId, workspaceId, mockRes as never)).rejects.toThrow( + "Fatal database error" + ); + + // Verify error event was written + expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("event: error")); + }); + + it("should send retry hint on transient errors", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + + let callCount = 0; + let closeHandler: (() => void) | null = null; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn((event: string, handler: () => void) => { + if (event === "close") { + closeHandler = handler; + } + }), + writableEnded: false, + }; + + mockPrismaService.runnerJob.findUnique + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }) + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.COMPLETED, + }); + + // Simulate transient error, then success + mockPrismaService.jobEvent.findMany.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error("Temporary connection issue")); + } + return Promise.resolve([]); + }); + + await service.streamEvents(jobId, workspaceId, mockRes as never); + + // Verify error event was sent with retryable flag + expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("event: error")); + expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining('"retryable":true')); + // Verify stream completed after retry + expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("stream.complete")); + }); + + it("should respect client disconnect and stop polling", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + + let closeHandler: (() => void) | null = null; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn((event: string, handler: () => void) => { + if (event === "close") { + closeHandler = handler; + // Trigger close after first poll + setTimeout(() => handler(), 100); + } + }), + writableEnded: false, + }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }); + + mockPrismaService.jobEvent.findMany.mockResolvedValue([]); + + await service.streamEvents(jobId, workspaceId, mockRes as never); + + // Verify cleanup happened + expect(mockRes.end).toHaveBeenCalled(); + + // Verify we didn't query excessively after disconnect + const queryCount = mockPrismaService.jobEvent.findMany.mock.calls.length; + expect(queryCount).toBeLessThan(5); // Should stop quickly after disconnect + }); + + it("should include connection metadata in stream headers", async () => { + const jobId = "job-123"; + const workspaceId = "workspace-123"; + + const mockRes = { + write: vi.fn(), + end: vi.fn(), + on: vi.fn(), + writableEnded: false, + setHeader: vi.fn(), + }; + + mockPrismaService.runnerJob.findUnique + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.RUNNING, + }) + .mockResolvedValueOnce({ + id: jobId, + status: RunnerJobStatus.COMPLETED, + }); + + mockPrismaService.jobEvent.findMany.mockResolvedValue([]); + + await service.streamEvents(jobId, workspaceId, mockRes as never); + + // Verify SSE headers include retry recommendation + expect(mockRes.write).toHaveBeenCalledWith(expect.stringMatching(/retry: \d+/)); + }); }); }); diff --git a/docs/scratchpads/187-implement-sse-error-recovery.md b/docs/scratchpads/187-implement-sse-error-recovery.md new file mode 100644 index 0000000..1a9e95a --- /dev/null +++ b/docs/scratchpads/187-implement-sse-error-recovery.md @@ -0,0 +1,116 @@ +# Issue #187: Implement Error Recovery in SSE Streaming + +## Objective + +Implement comprehensive error recovery for Server-Sent Events (SSE) streaming to ensure robust real-time updates with automatic reconnection, exponential backoff, and graceful degradation. + +## Approach + +1. Locate all SSE streaming code (server and client) +2. Write comprehensive tests for error recovery scenarios (TDD) +3. Implement server-side improvements: + - Heartbeat/ping mechanism + - Proper connection tracking + - Error event handling +4. Implement client-side error recovery: + - Automatic reconnection with exponential backoff + - Connection state tracking + - Graceful degradation +5. Verify all tests pass with ≥85% coverage + +## Progress + +- [x] Create scratchpad +- [x] Locate SSE server code (apps/api/src/runner-jobs/) +- [x] Locate SSE client code (NO client code exists yet) +- [x] Write error recovery tests (RED phase) - 8 new tests +- [x] Implement server-side improvements (GREEN phase) - ALL TESTS PASSING! +- [ ] Create client-side SSE hook with error recovery (GREEN phase) +- [ ] Refactor and optimize (REFACTOR phase) +- [ ] Verify test coverage ≥85% +- [ ] Update issue #187 + +## Test Results (GREEN Phase - Server-Side) + +✅ ALL 27 service tests PASSING including: + +1. ✅ should support resuming stream from lastEventId +2. ✅ should send event IDs for reconnection support +3. ✅ should handle database connection errors gracefully +4. ✅ should send retry hint on transient errors +5. ✅ should respect client disconnect and stop polling +6. ✅ should include connection metadata in stream headers + +## Server-Side Implementation Complete + +Added to `/home/localadmin/src/mosaic-stack/apps/api/src/runner-jobs/runner-jobs.service.ts`: + +- `streamEventsFrom()` method with lastEventId support +- Event ID tracking in SSE messages (`id: event-123`) +- Retry interval header (`retry: 3000`) +- Error recovery with retryable/non-retryable classification +- Proper cleanup on connection close +- Support for resuming streams from last event + +Added to controller: + +- Support for `Last-Event-ID` header +- Automatic reconnection via EventSource + +## Code Location Analysis + +**Server-Side SSE:** + +- `/home/localadmin/src/mosaic-stack/apps/api/src/runner-jobs/runner-jobs.controller.ts` + - Line 97-119: `streamEvents` endpoint + - Sets SSE headers, delegates to service +- `/home/localadmin/src/mosaic-stack/apps/api/src/runner-jobs/runner-jobs.service.ts` + - Line 237-326: `streamEvents` implementation + - Database polling (500ms) + - Keep-alive pings (15s) + - Basic cleanup on connection close + +**Client-Side:** + +- NO SSE client code exists yet +- Need to create React hook for SSE consumption + +**Current Gaps:** + +1. Server: No reconnection token/cursor for resuming streams +2. Server: No heartbeat timeout detection on server side +3. Server: No graceful degradation support +4. Client: No EventSource wrapper with error recovery +5. Client: No exponential backoff +6. Client: No connection state management + +## Testing + +### Server-Side (✅ Complete - 27/27 tests passing) + +- ✅ Network interruption recovery +- ✅ Event ID tracking for reconnection +- ✅ Retry interval headers +- ✅ Error classification (retryable vs non-retryable) +- ✅ Connection cleanup +- ✅ Stream resumption from lastEventId + +### Client-Side (🟡 In Progress - 4/11 tests passing) + +- ✅ Connection establishment +- ✅ Connection state tracking +- ✅ Connection cleanup on unmount +- ✅ EventSource unavailable detection +- 🟡 Error recovery with exponential backoff (timeout issues) +- 🟡 Max retry handling (timeout issues) +- 🟡 Custom event handling (needs async fix) +- 🟡 Stream completion (needs async fix) +- 🟡 Error event handling (needs async fix) +- 🟡 Fallback mechanism (timeout issues) + +## Notes + +- This is a P1 RELIABILITY issue +- Must follow TDD protocol (RED-GREEN-REFACTOR) +- Check apps/api/src/herald/ and apps/web/ for SSE code +- Ensure proper error handling and logging