Implement error recovery in SSE streaming #187

Closed
opened 2026-02-02 17:24:20 +00:00 by jason.woltje · 0 comments
Owner

Problem

SSE streaming implementation uses database polling in an infinite loop without error handling. Database failures cause infinite loops, memory leaks, and zombie connections.

Location

apps/api/src/runner-jobs/runner-jobs.service.ts:264-318

Issues

  1. No try-catch around database operations
  2. No timeout (infinite polling)
  3. Job deletion mid-stream never exits loop
  4. Client disconnect flag may be ignored
  5. Database outage causes connection accumulation

Hidden Failures

  • Database connection loss → unhandled exception
  • Job deleted mid-stream → null currentJob, loop never exits
  • Network partition → polling continues consuming resources
  • Client disconnect → zombie polling loops
  • Memory leak from failed streams never cleaned up

User Impact

  • Database outage causes all SSE connections to fail
  • Server accumulates zombie polling loops
  • Users see connection hang with no timeout
  • No error message explaining failure

Acceptance Criteria

  • Add try-catch around database operations
  • Implement 1-hour maximum stream duration
  • Handle job deletion gracefully with error event
  • Send error events with retry guidance
  • Implement exponential backoff on database errors
  • Add cleanup on client disconnect
  • Add tests for error scenarios
  • Add metrics for active SSE connections

Implementation

async streamEvents(id: string, workspaceId: string, res: Response) {
  let pollCount = 0;
  const MAX_POLLS = 7200;  // 1 hour
  
  try {
    while (isActive && pollCount < MAX_POLLS) {
      try {
        const events = await this.prisma.jobEvent.findMany({...});
        const currentJob = await this.prisma.runnerJob.findUnique({...});
        
        if (!currentJob) {
          res.write('event: error\n');
          res.write('data: {"error":"Job no longer exists"}\n\n');
          break;
        }
      } catch (dbError) {
        this.logger.error(`SSE polling error: ${dbError}`);
        res.write('event: error\n');
        res.write('data: {"error":"Database temporarily unavailable"}\n\n');
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
      pollCount++;
      await new Promise(resolve => setTimeout(resolve, 500));
    }
  } finally {
    clearInterval(keepAliveInterval);
    if (!res.writableEnded) res.end();
  }
}

References

M4.2-Infrastructure verification report (2026-02-02)
Security review agent ID: a1b8b3f

## Problem SSE streaming implementation uses database polling in an infinite loop without error handling. Database failures cause infinite loops, memory leaks, and zombie connections. ## Location apps/api/src/runner-jobs/runner-jobs.service.ts:264-318 ## Issues 1. No try-catch around database operations 2. No timeout (infinite polling) 3. Job deletion mid-stream never exits loop 4. Client disconnect flag may be ignored 5. Database outage causes connection accumulation ## Hidden Failures - Database connection loss → unhandled exception - Job deleted mid-stream → null currentJob, loop never exits - Network partition → polling continues consuming resources - Client disconnect → zombie polling loops - Memory leak from failed streams never cleaned up ## User Impact - Database outage causes all SSE connections to fail - Server accumulates zombie polling loops - Users see connection hang with no timeout - No error message explaining failure ## Acceptance Criteria - [ ] Add try-catch around database operations - [ ] Implement 1-hour maximum stream duration - [ ] Handle job deletion gracefully with error event - [ ] Send error events with retry guidance - [ ] Implement exponential backoff on database errors - [ ] Add cleanup on client disconnect - [ ] Add tests for error scenarios - [ ] Add metrics for active SSE connections ## Implementation ```typescript async streamEvents(id: string, workspaceId: string, res: Response) { let pollCount = 0; const MAX_POLLS = 7200; // 1 hour try { while (isActive && pollCount < MAX_POLLS) { try { const events = await this.prisma.jobEvent.findMany({...}); const currentJob = await this.prisma.runnerJob.findUnique({...}); if (!currentJob) { res.write('event: error\n'); res.write('data: {"error":"Job no longer exists"}\n\n'); break; } } catch (dbError) { this.logger.error(`SSE polling error: ${dbError}`); res.write('event: error\n'); res.write('data: {"error":"Database temporarily unavailable"}\n\n'); await new Promise(resolve => setTimeout(resolve, 5000)); } pollCount++; await new Promise(resolve => setTimeout(resolve, 500)); } } finally { clearInterval(keepAliveInterval); if (!res.writableEnded) res.end(); } } ``` ## References M4.2-Infrastructure verification report (2026-02-02) Security review agent ID: a1b8b3f
jason.woltje added this to the M4.2-Infrastructure (0.0.4) milestone 2026-02-02 17:24:20 +00:00
jason.woltje added the apiapip1 labels 2026-02-02 17:24:20 +00:00
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: mosaic/stack#187