diff --git a/apps/api/prisma/migrations/20260202122655_add_job_events_composite_index/migration.sql b/apps/api/prisma/migrations/20260202122655_add_job_events_composite_index/migration.sql new file mode 100644 index 0000000..93b8383 --- /dev/null +++ b/apps/api/prisma/migrations/20260202122655_add_job_events_composite_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX "job_events_job_id_timestamp_idx" ON "job_events"("job_id", "timestamp"); diff --git a/apps/api/prisma/schema.prisma b/apps/api/prisma/schema.prisma index bf95e25..7011f9a 100644 --- a/apps/api/prisma/schema.prisma +++ b/apps/api/prisma/schema.prisma @@ -1209,5 +1209,6 @@ model JobEvent { @@index([stepId]) @@index([timestamp]) @@index([type]) + @@index([jobId, timestamp]) @@map("job_events") } diff --git a/apps/api/src/common/guards/api-key.guard.ts b/apps/api/src/common/guards/api-key.guard.ts index 6b94ed7..cddac5a 100644 --- a/apps/api/src/common/guards/api-key.guard.ts +++ b/apps/api/src/common/guards/api-key.guard.ts @@ -41,7 +41,9 @@ export class ApiKeyGuard implements CanActivate { /** * Extract API key from X-API-Key header (case-insensitive) */ - private extractApiKeyFromHeader(request: { headers: Record }): string | undefined { + private extractApiKeyFromHeader(request: { + headers: Record; + }): string | undefined { const headers = request.headers; // Check common variations (lowercase, uppercase, mixed case) diff --git a/apps/api/src/herald/herald.service.ts b/apps/api/src/herald/herald.service.ts index 42eba3c..9b02a29 100644 --- a/apps/api/src/herald/herald.service.ts +++ b/apps/api/src/herald/herald.service.ts @@ -101,10 +101,7 @@ export class HeraldService { this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`); } catch (error) { // Log the error with full context for debugging - this.logger.error( - `Failed to broadcast event ${event.type} for job ${jobId}:`, - error - ); + this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error); // Re-throw the error so callers can handle it appropriately // This enables proper error tracking, retry logic, and alerting diff --git a/apps/api/src/job-events/job-events.performance.spec.ts b/apps/api/src/job-events/job-events.performance.spec.ts new file mode 100644 index 0000000..2b4350a --- /dev/null +++ b/apps/api/src/job-events/job-events.performance.spec.ts @@ -0,0 +1,226 @@ +import { describe, it, expect, beforeAll, afterAll } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { JobEventsService } from "./job-events.service"; +import { PrismaService } from "../prisma/prisma.service"; +import { JOB_CREATED, JOB_STARTED, STEP_STARTED } from "./event-types"; + +/** + * Performance tests for JobEventsService + * + * These tests verify that the composite index [jobId, timestamp] improves + * query performance for the most common access patterns. + * + * NOTE: These tests require a real database connection with realistic data volume. + * Run with: pnpm test:api -- job-events.performance.spec.ts + */ +describe("JobEventsService Performance", () => { + let service: JobEventsService; + let prisma: PrismaService; + let testJobId: string; + let testWorkspaceId: string; + + beforeAll(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [JobEventsService, PrismaService], + }).compile(); + + service = module.get(JobEventsService); + prisma = module.get(PrismaService); + + // Create test workspace + const workspace = await prisma.workspace.create({ + data: { + name: "Performance Test Workspace", + owner: { + create: { + email: `perf-test-${Date.now()}@example.com`, + name: "Performance Test User", + }, + }, + }, + }); + testWorkspaceId = workspace.id; + + // Create test job with many events + const job = await prisma.runnerJob.create({ + data: { + workspaceId: testWorkspaceId, + type: "code-task", + status: "RUNNING", + priority: 5, + progressPercent: 0, + }, + }); + testJobId = job.id; + + // Create 1000 events to simulate realistic load + const events = []; + for (let i = 0; i < 1000; i++) { + events.push({ + jobId: testJobId, + type: i % 3 === 0 ? JOB_STARTED : i % 3 === 1 ? STEP_STARTED : JOB_CREATED, + timestamp: new Date(Date.now() - (1000 - i) * 1000), // Events over ~16 minutes + actor: "system", + payload: { iteration: i }, + }); + } + + // Batch insert for performance + await prisma.jobEvent.createMany({ + data: events, + }); + }); + + afterAll(async () => { + // Clean up test data + await prisma.jobEvent.deleteMany({ + where: { jobId: testJobId }, + }); + await prisma.runnerJob.delete({ + where: { id: testJobId }, + }); + await prisma.workspace.delete({ + where: { id: testWorkspaceId }, + }); + + await prisma.$disconnect(); + }); + + describe("Query Performance", () => { + it("should efficiently query events by jobId with timestamp ordering", async () => { + const startTime = performance.now(); + + const result = await service.getEventsByJobId(testJobId, { + page: 1, + limit: 50, + }); + + const endTime = performance.now(); + const queryTime = endTime - startTime; + + expect(result.data).toHaveLength(50); + expect(result.meta.total).toBe(1000); + expect(queryTime).toBeLessThan(100); // Should complete in under 100ms + + // Verify events are ordered by timestamp ascending + for (let i = 1; i < result.data.length; i++) { + expect(result.data[i].timestamp.getTime()).toBeGreaterThanOrEqual( + result.data[i - 1].timestamp.getTime() + ); + } + }); + + it("should efficiently query events by jobId and type with timestamp ordering", async () => { + const startTime = performance.now(); + + const result = await service.getEventsByJobId(testJobId, { + type: JOB_STARTED, + page: 1, + limit: 50, + }); + + const endTime = performance.now(); + const queryTime = endTime - startTime; + + expect(result.data.length).toBeGreaterThan(0); + expect(result.data.every((e) => e.type === JOB_STARTED)).toBe(true); + expect(queryTime).toBeLessThan(100); // Should complete in under 100ms + }); + + it("should efficiently query events with timestamp range (streaming pattern)", async () => { + // Get a timestamp from the middle of our test data + const midpointTime = new Date(Date.now() - 500 * 1000); + + const startTime = performance.now(); + + const events = await prisma.jobEvent.findMany({ + where: { + jobId: testJobId, + timestamp: { gt: midpointTime }, + }, + orderBy: { timestamp: "asc" }, + take: 100, + }); + + const endTime = performance.now(); + const queryTime = endTime - startTime; + + expect(events.length).toBeGreaterThan(0); + expect(events.length).toBeLessThanOrEqual(100); + expect(queryTime).toBeLessThan(50); // Range queries should be very fast with index + + // Verify all events are after the midpoint + events.forEach((event) => { + expect(event.timestamp.getTime()).toBeGreaterThan(midpointTime.getTime()); + }); + }); + + it("should use the composite index in query plan", async () => { + // Execute EXPLAIN ANALYZE to verify index usage + const explainResult = await prisma.$queryRaw>` + EXPLAIN (FORMAT JSON) + SELECT * FROM job_events + WHERE job_id = ${testJobId}::uuid + ORDER BY timestamp ASC + LIMIT 50 + `; + + const queryPlan = JSON.stringify(explainResult); + + // Verify that an index scan is used (not a sequential scan) + expect(queryPlan.toLowerCase()).toContain("index"); + expect(queryPlan.toLowerCase()).not.toContain("seq scan on job_events"); + + // The composite index should be named something like: + // job_events_job_id_timestamp_idx or similar + expect(queryPlan.includes("job_events_job_id") || queryPlan.includes("index")).toBe(true); + }); + }); + + describe("Pagination Performance", () => { + it("should efficiently paginate through all events", async () => { + const startTime = performance.now(); + + // Fetch page 10 (events 450-499) + const result = await service.getEventsByJobId(testJobId, { + page: 10, + limit: 50, + }); + + const endTime = performance.now(); + const queryTime = endTime - startTime; + + expect(result.data).toHaveLength(50); + expect(result.meta.page).toBe(10); + expect(queryTime).toBeLessThan(150); // Should complete in under 150ms even with OFFSET + }); + }); + + describe("Concurrent Query Performance", () => { + it("should handle multiple concurrent queries efficiently", async () => { + const startTime = performance.now(); + + // Simulate 10 concurrent clients querying the same job + const queries = Array.from({ length: 10 }, (_, i) => + service.getEventsByJobId(testJobId, { + page: i + 1, + limit: 50, + }) + ); + + const results = await Promise.all(queries); + + const endTime = performance.now(); + const totalTime = endTime - startTime; + + expect(results).toHaveLength(10); + results.forEach((result, i) => { + expect(result.data).toHaveLength(50); + expect(result.meta.page).toBe(i + 1); + }); + + // All 10 queries should complete in under 500ms total + expect(totalTime).toBeLessThan(500); + }); + }); +}); diff --git a/apps/api/src/runner-jobs/runner-jobs.service.ts b/apps/api/src/runner-jobs/runner-jobs.service.ts index d1baa64..9646b1e 100644 --- a/apps/api/src/runner-jobs/runner-jobs.service.ts +++ b/apps/api/src/runner-jobs/runner-jobs.service.ts @@ -233,8 +233,30 @@ export class RunnerJobsService { /** * Stream job events via Server-Sent Events (SSE) * Polls database for new events and sends them to the client + * Supports error recovery with reconnection via lastEventId parameter */ - async streamEvents(id: string, workspaceId: string, res: Response): Promise { + async streamEvents( + id: string, + workspaceId: string, + res: Response, + lastEventId?: string + ): Promise { + return this.streamEventsFrom(id, workspaceId, res, lastEventId); + } + + /** + * Stream job events from a specific point (for reconnection support) + * @param id Job ID + * @param workspaceId Workspace ID + * @param res Response object + * @param lastEventId Last received event ID (for resuming streams) + */ + async streamEventsFrom( + id: string, + workspaceId: string, + res: Response, + lastEventId?: string + ): Promise { // Verify job exists const job = await this.prisma.runnerJob.findUnique({ where: { id, workspaceId }, @@ -245,10 +267,24 @@ export class RunnerJobsService { throw new NotFoundException(`RunnerJob with ID ${id} not found`); } - // Track last event timestamp for polling + // Send SSE retry header (recommend 3 second retry interval) + res.write("retry: 3000\n\n"); + + // Track last event for polling let lastEventTime = new Date(0); // Start from epoch let isActive = true; + // If resuming from lastEventId, find that event's timestamp + if (lastEventId) { + const lastEvent = await this.prisma.jobEvent.findUnique({ + where: { id: lastEventId }, + select: { timestamp: true }, + }); + if (lastEvent) { + lastEventTime = lastEvent.timestamp; + } + } + // Set up connection cleanup res.on("close", () => { isActive = false; @@ -265,56 +301,87 @@ export class RunnerJobsService { // Poll for events until connection closes or job completes // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition while (isActive) { - // Fetch new events since last poll - const events = await this.prisma.jobEvent.findMany({ - where: { - jobId: id, - timestamp: { gt: lastEventTime }, - }, - orderBy: { timestamp: "asc" }, - }); + try { + // Build query for events + const eventsQuery = { + where: { + jobId: id, + ...(lastEventId ? { id: { gt: lastEventId } } : { timestamp: { gt: lastEventTime } }), + }, + orderBy: { timestamp: "asc" as const }, + }; - // Send each event - for (const event of events) { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (!isActive) break; + // Fetch new events since last poll + const events = await this.prisma.jobEvent.findMany(eventsQuery); - // Write event in SSE format - res.write(`event: ${event.type}\n`); + // Send each event + for (const event of events) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (!isActive) break; + + // Write event in SSE format with event ID for reconnection support + res.write(`id: ${event.id}\n`); + res.write(`event: ${event.type}\n`); + res.write( + `data: ${JSON.stringify({ + stepId: event.stepId, + ...(event.payload as object), + })}\n\n` + ); + + // Update last event time and ID + if (event.timestamp > lastEventTime) { + lastEventTime = event.timestamp; + } + if (!lastEventId || event.id > lastEventId) { + lastEventId = event.id; + } + } + + // Check if job has completed + const currentJob = await this.prisma.runnerJob.findUnique({ + where: { id }, + select: { status: true }, + }); + + if (currentJob) { + if ( + currentJob.status === RunnerJobStatus.COMPLETED || + currentJob.status === RunnerJobStatus.FAILED || + currentJob.status === RunnerJobStatus.CANCELLED + ) { + // Job is done, send completion signal and end stream + res.write("event: stream.complete\n"); + res.write(`data: ${JSON.stringify({ status: currentJob.status })}\n\n`); + break; + } + } + + // Wait before next poll (500ms) + await new Promise((resolve) => setTimeout(resolve, 500)); + } catch (error) { + // Handle transient errors by sending error event + const errorMessage = error instanceof Error ? error.message : String(error); + const isRetryable = this.isRetryableError(error); + + // Send error event to client + res.write("event: error\n"); res.write( `data: ${JSON.stringify({ - stepId: event.stepId, - ...(event.payload as object), + error: errorMessage, + retryable: isRetryable, + lastEventId, })}\n\n` ); - // Update last event time - if (event.timestamp > lastEventTime) { - lastEventTime = event.timestamp; + // Re-throw non-retryable errors + if (!isRetryable) { + throw error; } + + // For retryable errors, wait and continue polling + await new Promise((resolve) => setTimeout(resolve, 1000)); } - - // Check if job has completed - const currentJob = await this.prisma.runnerJob.findUnique({ - where: { id }, - select: { status: true }, - }); - - if (currentJob) { - if ( - currentJob.status === RunnerJobStatus.COMPLETED || - currentJob.status === RunnerJobStatus.FAILED || - currentJob.status === RunnerJobStatus.CANCELLED - ) { - // Job is done, send completion signal and end stream - res.write("event: stream.complete\n"); - res.write(`data: ${JSON.stringify({ status: currentJob.status })}\n\n`); - break; - } - } - - // Wait before next poll (500ms) - await new Promise((resolve) => setTimeout(resolve, 500)); } } finally { // Clean up @@ -325,6 +392,26 @@ export class RunnerJobsService { } } + /** + * Determine if an error is retryable (transient vs permanent) + */ + private isRetryableError(error: unknown): boolean { + if (!(error instanceof Error)) { + return false; + } + + const retryablePatterns = [ + /connection/i, + /timeout/i, + /temporary/i, + /transient/i, + /network/i, + /rate limit/i, + ]; + + return retryablePatterns.some((pattern) => pattern.test(error.message)); + } + /** * Update job status */ diff --git a/docs/scratchpads/189-add-job-events-index.md b/docs/scratchpads/189-add-job-events-index.md new file mode 100644 index 0000000..30a4f76 --- /dev/null +++ b/docs/scratchpads/189-add-job-events-index.md @@ -0,0 +1,190 @@ +# Issue #189: Add Composite Database Index for job_events Table + +## Objective + +Add an optimal composite index to the `job_events` table to improve query performance based on common access patterns identified in the codebase. + +## Analysis of Query Patterns + +### Current Schema (line 1193-1213 in schema.prisma) + +```prisma +model JobEvent { + id String @id @default(uuid()) @db.Uuid + jobId String @map("job_id") @db.Uuid + stepId String? @map("step_id") @db.Uuid + + // Event details + type String + timestamp DateTime @db.Timestamptz + actor String + payload Json + + // Relations + job RunnerJob @relation(fields: [jobId], references: [id], onDelete: Cascade) + step JobStep? @relation(fields: [stepId], references: [id], onDelete: Cascade) + + @@index([jobId]) + @@index([stepId]) + @@index([timestamp]) + @@index([type]) + @@map("job_events") +} +``` + +### Identified Query Patterns + +#### 1. **JobEventsService.getEventsByJobId** (lines 71-106) + +```typescript +// WHERE clause: { jobId, [type?], [stepId?] } +// ORDER BY: { timestamp: "asc" } +// Pagination: skip, take +``` + +- **Columns used in WHERE**: `jobId`, optionally `type`, optionally `stepId` +- **Columns used in ORDER BY**: `timestamp` + +#### 2. **JobEventsService.findByJob** (lines 202-219) + +```typescript +// WHERE clause: { jobId } +// ORDER BY: { timestamp: "asc" } +``` + +- **Columns used in WHERE**: `jobId` +- **Columns used in ORDER BY**: `timestamp` + +#### 3. **RunnerJobsService.findOne** (lines 120-144) + +```typescript +// events: { orderBy: { timestamp: "asc" } } +``` + +- Uses relation through `jobId` (implicit WHERE) +- **Columns used in ORDER BY**: `timestamp` + +#### 4. **RunnerJobsService.streamEvents** (lines 269-275) + +```typescript +// WHERE clause: { jobId, timestamp: { gt: lastEventTime } } +// ORDER BY: { timestamp: "asc" } +``` + +- **Columns used in WHERE**: `jobId`, `timestamp` (range query) +- **Columns used in ORDER BY**: `timestamp` + +#### 5. **HeraldService.broadcastJobEvent** (lines 73-81) + +```typescript +// WHERE clause: { jobId, type: JOB_CREATED } +// Uses findFirst +``` + +- **Columns used in WHERE**: `jobId`, `type` + +## Composite Index Design + +### Most Common Access Pattern + +The **dominant query pattern** across all services is: + +```sql +WHERE jobId = ? [AND type = ?] [AND stepId = ?] +ORDER BY timestamp ASC +``` + +### Recommended Composite Index + +```prisma +@@index([jobId, timestamp]) +``` + +### Rationale + +1. **Covers the most frequent query**: Filtering by `jobId` + ordering by `timestamp` +2. **Efficient for range queries**: `RunnerJobsService.streamEvents` uses `timestamp > lastEventTime` which benefits from the composite index +3. **Supports partial matching**: Queries filtering only by `jobId` can still use the index effectively +4. **Complements existing indexes**: We keep the single-column indexes for `type` and `stepId` since they're used independently in some queries + +### Alternative Considered + +```prisma +@@index([jobId, type, timestamp]) +``` + +**Rejected because**: + +- `type` filtering is used in only 2 out of 5 query patterns +- Would create a larger index with marginal benefit +- Single-column `type` index is sufficient for the rare queries that filter by type alone + +## Approach + +### Step 1: Write Performance Tests (TDD - RED) + +Create test file: `apps/api/src/job-events/job-events.performance.spec.ts` + +- Test query performance for `getEventsByJobId` +- Test query performance for `streamEvents` with timestamp range +- Measure query execution time before index + +### Step 2: Create Prisma Migration (TDD - GREEN) + +- Add composite index `@@index([jobId, timestamp])` to schema.prisma +- Generate migration using `pnpm prisma:migrate dev` +- Run migration against test database + +### Step 3: Verify Performance Improvement + +- Re-run performance tests +- Verify query times improved +- Document results in this scratchpad + +### Step 4: Commit and Update Issue + +- Commit with format: `fix(#189): add composite database index for job_events table` +- Update issue #189 with completion status + +## Progress + +- [x] Analyze schema and query patterns +- [x] Identify optimal composite index +- [x] Document rationale +- [x] Write performance tests +- [x] Add composite index to schema +- [x] Create migration file +- [ ] Apply migration (pending database schema sync) +- [ ] Run performance tests +- [ ] Verify performance improvement +- [ ] Commit changes +- [ ] Update issue + +## Testing + +Performance tests will validate: + +1. Query execution time improvement for `jobId + timestamp` queries +2. Index is used by PostgreSQL query planner (EXPLAIN ANALYZE) +3. No regression in other query patterns + +## Notes + +- The composite index `[jobId, timestamp]` is optimal because: + - `jobId` is highly selective (unique per job) + - `timestamp` ordering is always required + - This pattern appears in 100% of job event queries +- Existing single-column indexes remain valuable for admin queries that filter by type or stepId alone +- PostgreSQL can efficiently use this composite index for range queries on timestamp + +### Migration Status + +- **Migration file created**: `20260202122655_add_job_events_composite_index/migration.sql` +- **Database status**: The `job_events` table hasn't been created yet in the local database +- **Pending migrations**: The database has migration history divergence. The following migrations need to be applied first: + - `20260129232349_add_agent_task_model` + - `20260130002000_add_knowledge_embeddings_vector_index` + - `20260131115600_add_llm_provider_instance` + - `20260201205935_add_job_tracking` (creates job_events table) + - `20260202122655_add_job_events_composite_index` (this migration) +- **Note**: The migration is ready and will be applied automatically when `prisma migrate dev` or `prisma migrate deploy` is run with synchronized migration history