Implemented optimistic locking with version field and SELECT FOR UPDATE transactions to prevent data corruption from concurrent job status updates. Changes: - Added version field to RunnerJob schema for optimistic locking - Created migration 20260202_add_runner_job_version_for_concurrency - Implemented ConcurrentUpdateException for conflict detection - Updated RunnerJobsService methods with optimistic locking: * updateStatus() - with version checking and retry logic * updateProgress() - with version checking and retry logic * cancel() - with version checking and retry logic - Updated CoordinatorIntegrationService with SELECT FOR UPDATE: * updateJobStatus() - transaction with row locking * completeJob() - transaction with row locking * failJob() - transaction with row locking * updateJobProgress() - optimistic locking - Added retry mechanism (3 attempts) with exponential backoff - Added comprehensive concurrency tests (10 tests, all passing) - Updated existing test mocks to support updateMany Test Results: - All 10 concurrency tests passing ✓ - Tests cover concurrent status updates, progress updates, completions, cancellations, retry logic, and exponential backoff This fix prevents race conditions that could cause: - Lost job results (double completion) - Lost progress updates - Invalid status transitions - Data corruption under concurrent access Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
251 lines
7.1 KiB
Markdown
251 lines
7.1 KiB
Markdown
# Issue #196: Fix race condition in job status updates
|
|
|
|
## Objective
|
|
|
|
Fix race condition in job status update logic that can cause data corruption when multiple processes attempt to update the same job simultaneously. This is a P2 RELIABILITY issue.
|
|
|
|
## Race Condition Analysis
|
|
|
|
### Current Implementation Problems
|
|
|
|
1. **RunnerJobsService.updateStatus() (lines 418-462)**
|
|
- Read job: `prisma.runnerJob.findUnique()`
|
|
- Make decision based on read data
|
|
- Update job: `prisma.runnerJob.update()`
|
|
- **RACE CONDITION**: Between read and update, another process can modify the job
|
|
|
|
2. **RunnerJobsService.updateProgress() (lines 467-485)**
|
|
- Same pattern: read, check, update
|
|
- **RACE CONDITION**: Progress updates can be lost or overwritten
|
|
|
|
3. **CoordinatorIntegrationService.updateJobStatus() (lines 103-152)**
|
|
- Reads job to validate status transition
|
|
- **RACE CONDITION**: Status can change between validation and update
|
|
|
|
4. **RunnerJobsService.cancel() (lines 149-178)**
|
|
- Similar pattern with race condition
|
|
|
|
### Concurrent Scenarios That Cause Issues
|
|
|
|
**Scenario 1: Double completion**
|
|
|
|
- Process A: Reads job (status=RUNNING), decides to complete it
|
|
- Process B: Reads job (status=RUNNING), decides to complete it
|
|
- Process A: Updates job to COMPLETED with resultA
|
|
- Process B: Updates job to COMPLETED with resultB (overwrites resultA)
|
|
- **Result**: Lost data (resultA lost)
|
|
|
|
**Scenario 2: Progress updates lost**
|
|
|
|
- Process A: Updates progress to 50%
|
|
- Process B: Updates progress to 75% (concurrent)
|
|
- **Result**: One update lost depending on race timing
|
|
|
|
**Scenario 3: Invalid status transitions**
|
|
|
|
- Process A: Reads job (status=RUNNING), validates transition to COMPLETED
|
|
- Process B: Reads job (status=RUNNING), validates transition to FAILED
|
|
- Process A: Updates to COMPLETED
|
|
- Process B: Updates to FAILED (overwrites COMPLETED)
|
|
- **Result**: Invalid state - job marked as FAILED when it actually completed
|
|
|
|
## Approach
|
|
|
|
### Solution 1: Add Version Field (Optimistic Locking)
|
|
|
|
Add a `version` field to RunnerJob model:
|
|
|
|
```prisma
|
|
model RunnerJob {
|
|
// ... existing fields
|
|
version Int @default(0)
|
|
}
|
|
```
|
|
|
|
Update pattern:
|
|
|
|
```typescript
|
|
const result = await prisma.runnerJob.updateMany({
|
|
where: {
|
|
id: jobId,
|
|
workspaceId: workspaceId,
|
|
version: currentVersion, // Only update if version matches
|
|
},
|
|
data: {
|
|
status: newStatus,
|
|
version: { increment: 1 },
|
|
},
|
|
});
|
|
|
|
if (result.count === 0) {
|
|
// Concurrent update detected - retry or throw error
|
|
}
|
|
```
|
|
|
|
### Solution 2: Use Database Transactions with SELECT FOR UPDATE
|
|
|
|
```typescript
|
|
await prisma.$transaction(async (tx) => {
|
|
// Lock the row
|
|
const job = await tx.$queryRaw`
|
|
SELECT * FROM "RunnerJob"
|
|
WHERE id = ${jobId} AND workspace_id = ${workspaceId}
|
|
FOR UPDATE
|
|
`;
|
|
|
|
// Validate and update
|
|
// Row is locked until transaction commits
|
|
});
|
|
```
|
|
|
|
### Solution 3: Hybrid Approach (Recommended)
|
|
|
|
- Use optimistic locking (version field) for most updates (better performance)
|
|
- Use SELECT FOR UPDATE for critical sections (status transitions)
|
|
- Implement retry logic for optimistic lock failures
|
|
|
|
## Progress
|
|
|
|
- [x] Analyze current implementation
|
|
- [x] Identify race conditions
|
|
- [x] Design solution approach
|
|
- [x] Write concurrency tests (RED phase)
|
|
- [x] Add version field to schema
|
|
- [x] Create migration for version field
|
|
- [x] Implement optimistic locking in updateStatus()
|
|
- [x] Implement optimistic locking in updateProgress()
|
|
- [x] Implement optimistic locking in cancel()
|
|
- [x] Implement SELECT FOR UPDATE for coordinator updates (updateJobStatus, completeJob, failJob)
|
|
- [x] Add retry logic for concurrent update conflicts
|
|
- [x] Create ConcurrentUpdateException
|
|
- [ ] Verify all tests pass
|
|
- [ ] Run coverage check (≥85%)
|
|
- [ ] Commit changes
|
|
|
|
## Testing Strategy
|
|
|
|
### Concurrency Tests to Write
|
|
|
|
1. **Test concurrent status updates**
|
|
- Simulate 2+ processes updating same job status
|
|
- Verify only one succeeds or updates are properly serialized
|
|
- Verify no data loss
|
|
|
|
2. **Test concurrent progress updates**
|
|
- Simulate rapid progress updates
|
|
- Verify all updates are recorded or properly merged
|
|
|
|
3. **Test status transition validation with concurrency**
|
|
- Simulate concurrent invalid transitions
|
|
- Verify invalid transitions are rejected
|
|
|
|
4. **Test completion race**
|
|
- Simulate concurrent completion with different results
|
|
- Verify only one completion succeeds and data isn't lost
|
|
|
|
5. **Test optimistic lock retry logic**
|
|
- Simulate version conflicts
|
|
- Verify retry mechanism works correctly
|
|
|
|
## Implementation Plan
|
|
|
|
### Phase 1: Schema Changes (with migration)
|
|
|
|
1. Add `version` field to RunnerJob model
|
|
2. Create migration
|
|
3. Run migration
|
|
|
|
### Phase 2: Update Methods (TDD)
|
|
|
|
1. **updateStatus()** - Add optimistic locking
|
|
2. **updateProgress()** - Add optimistic locking
|
|
3. **completeJob()** - Add optimistic locking
|
|
4. **failJob()** - Add optimistic locking
|
|
5. **cancel()** - Add optimistic locking
|
|
|
|
### Phase 3: Critical Sections
|
|
|
|
1. **updateJobStatus()** in coordinator integration - Add transaction with SELECT FOR UPDATE
|
|
2. Add retry logic wrapper
|
|
|
|
### Phase 4: Error Handling
|
|
|
|
1. Add custom exception for concurrent update conflicts
|
|
2. Implement retry logic (max 3 retries with exponential backoff)
|
|
3. Log concurrent update conflicts for monitoring
|
|
|
|
## Notes
|
|
|
|
### Version Field vs SELECT FOR UPDATE
|
|
|
|
**Optimistic Locking (version field):**
|
|
|
|
- ✅ Better performance (no row locks)
|
|
- ✅ Works well for high-concurrency scenarios
|
|
- ✅ Simple to implement
|
|
- ❌ Requires retry logic
|
|
- ❌ Client must handle conflicts
|
|
|
|
**Pessimistic Locking (SELECT FOR UPDATE):**
|
|
|
|
- ✅ Guarantees no conflicts
|
|
- ✅ No retry logic needed
|
|
- ❌ Locks rows (can cause contention)
|
|
- ❌ Risk of deadlocks if not careful
|
|
- ❌ Lower throughput under high concurrency
|
|
|
|
**Recommendation:** Use optimistic locking as default, SELECT FOR UPDATE only for critical status transitions.
|
|
|
|
### Prisma Limitations
|
|
|
|
Prisma doesn't have native optimistic locking support. We need to:
|
|
|
|
1. Add version field manually
|
|
2. Use `updateMany()` with version check (returns count)
|
|
3. Handle count=0 as conflict
|
|
|
|
### Retry Strategy
|
|
|
|
For optimistic lock failures:
|
|
|
|
```typescript
|
|
async function retryOnConflict<T>(operation: () => Promise<T>, maxRetries = 3): Promise<T> {
|
|
for (let i = 0; i < maxRetries; i++) {
|
|
try {
|
|
return await operation();
|
|
} catch (error) {
|
|
if (error instanceof ConcurrentUpdateError && i < maxRetries - 1) {
|
|
await sleep(Math.pow(2, i) * 100); // Exponential backoff
|
|
continue;
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Findings
|
|
|
|
### Current State
|
|
|
|
- No concurrency protection exists
|
|
- All update methods are vulnerable to race conditions
|
|
- No version tracking or locking mechanism
|
|
- High risk under concurrent job processing
|
|
|
|
### Risk Assessment
|
|
|
|
- **P2 RELIABILITY** is correct - can cause data corruption
|
|
- Most likely to occur when:
|
|
- Multiple workers process same job queue
|
|
- Coordinator and API update job simultaneously
|
|
- Retry logic causes concurrent updates
|
|
|
|
## Next Steps
|
|
|
|
1. Write failing concurrency tests
|
|
2. Implement version field with migration
|
|
3. Update all job update methods
|
|
4. Verify tests pass
|
|
5. Document behavior for developers
|