Files
stack/apps/api/src/runner-jobs/runner-jobs.service.concurrency.spec.ts
Jason Woltje 458cac7cdd
All checks were successful
ci/woodpecker/push/api Pipeline was successful
ci/woodpecker/push/web Pipeline was successful
Phase 3: Agent Cycle Visibility (#461) (#462)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-02-23 01:07:29 +00:00

406 lines
13 KiB
TypeScript

import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { RunnerJobStatus } from "@prisma/client";
import { ConflictException, BadRequestException } from "@nestjs/common";
/**
* Concurrency tests for RunnerJobsService
* These tests verify that race conditions in job status updates are properly handled
*/
describe("RunnerJobsService - Concurrency", () => {
let service: RunnerJobsService;
let prisma: PrismaService;
const mockBullMqService = {
addJob: vi.fn(),
getQueue: vi.fn(),
};
const mockWebSocketGateway = {
emitJobCreated: vi.fn(),
emitJobStatusChanged: vi.fn(),
emitJobProgress: vi.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
RunnerJobsService,
{
provide: PrismaService,
useValue: {
runnerJob: {
findUnique: vi.fn(),
update: vi.fn(),
updateMany: vi.fn(),
},
},
},
{
provide: BullMqService,
useValue: mockBullMqService,
},
{
provide: WebSocketGateway,
useValue: mockWebSocketGateway,
},
],
}).compile();
service = module.get<RunnerJobsService>(RunnerJobsService);
prisma = module.get<PrismaService>(PrismaService);
vi.clearAllMocks();
});
describe("concurrent status updates", () => {
it("should detect concurrent status update conflict using version field", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
// Mock job with version 1
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
startedAt: new Date(),
};
// First findUnique returns job with version 1
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
// updateMany returns 0 (no rows updated - version mismatch)
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValue({ count: 0 });
// Should throw ConflictException when concurrent update detected
await expect(
service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED)
).rejects.toThrow(ConflictException);
// Verify updateMany was called with version check
expect(prisma.runnerJob.updateMany).toHaveBeenCalledWith(
expect.objectContaining({
where: expect.objectContaining({
id: jobId,
workspaceId,
version: 1,
}),
})
);
});
it("should successfully update when no concurrent conflict exists", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
startedAt: new Date(),
};
const updatedJob = {
...mockJob,
status: RunnerJobStatus.COMPLETED,
version: 2,
completedAt: new Date(),
};
// First call for initial read
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJob as any)
// Second call after updateMany succeeds
.mockResolvedValueOnce(updatedJob as any);
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValue({ count: 1 });
const result = await service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED);
expect(result.status).toBe(RunnerJobStatus.COMPLETED);
expect(result.version).toBe(2);
});
it("should retry on conflict and succeed on second attempt", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJobV1 = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
};
const mockJobV2 = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 2,
};
const updatedJob = {
...mockJobV2,
status: RunnerJobStatus.COMPLETED,
version: 3,
completedAt: new Date(),
};
// First attempt: version 1, updateMany returns 0 (conflict)
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJobV1 as any) // Initial read
.mockResolvedValueOnce(mockJobV2 as any) // Retry read
.mockResolvedValueOnce(updatedJob as any); // Final read after update
vi.mocked(prisma.runnerJob.updateMany)
.mockResolvedValueOnce({ count: 0 }) // First attempt fails
.mockResolvedValueOnce({ count: 1 }); // Retry succeeds
const result = await service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED);
expect(result.status).toBe(RunnerJobStatus.COMPLETED);
expect(prisma.runnerJob.updateMany).toHaveBeenCalledTimes(2);
});
});
describe("concurrent progress updates", () => {
it("should detect concurrent progress update conflict", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
progressPercent: 50,
version: 5,
};
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValue({ count: 0 });
await expect(service.updateProgress(jobId, workspaceId, 75)).rejects.toThrow(
ConflictException
);
});
it("should handle rapid sequential progress updates", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
// Simulate 5 rapid progress updates
const progressValues = [20, 40, 60, 80, 100];
let version = 1;
for (const progress of progressValues) {
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
progressPercent: progress - 20,
version,
};
const updatedJob = {
...mockJob,
progressPercent: progress,
version: version + 1,
};
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJob as any)
.mockResolvedValueOnce(updatedJob as any);
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValueOnce({ count: 1 });
const result = await service.updateProgress(jobId, workspaceId, progress);
expect(result.progressPercent).toBe(progress);
expect(result.version).toBe(version + 1);
version++;
}
});
});
describe("concurrent completion", () => {
it("should prevent double completion with different results", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
startedAt: new Date(),
};
const updatedJob = {
...mockJob,
status: RunnerJobStatus.COMPLETED,
version: 2,
result: { outcome: "success-A" },
completedAt: new Date(),
};
// Test first completion (succeeds)
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJob as any) // First completion - initial read
.mockResolvedValueOnce(updatedJob as any); // First completion - after update
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValueOnce({ count: 1 });
const result1 = await service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED, {
result: { outcome: "success-A" },
});
expect(result1.status).toBe(RunnerJobStatus.COMPLETED);
// Test second completion (fails due to version mismatch - will retry 3 times)
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJob as any) // Attempt 1: Reads stale version
.mockResolvedValueOnce(mockJob as any) // Attempt 2: Retry reads stale version
.mockResolvedValueOnce(mockJob as any); // Attempt 3: Final retry reads stale version
vi.mocked(prisma.runnerJob.updateMany)
.mockResolvedValueOnce({ count: 0 }) // Attempt 1: Version conflict
.mockResolvedValueOnce({ count: 0 }) // Attempt 2: Version conflict
.mockResolvedValueOnce({ count: 0 }); // Attempt 3: Version conflict
await expect(
service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED, {
result: { outcome: "success-B" },
})
).rejects.toThrow(ConflictException);
});
});
describe("concurrent cancel operations", () => {
it("should handle concurrent cancel attempts", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
};
const cancelledJob = {
...mockJob,
status: RunnerJobStatus.CANCELLED,
version: 2,
completedAt: new Date(),
};
// Setup mocks
vi.mocked(prisma.runnerJob.findUnique)
.mockResolvedValueOnce(mockJob as any) // First cancel - initial read
.mockResolvedValueOnce(cancelledJob as any) // First cancel - after update
.mockResolvedValueOnce(cancelledJob as any); // Second cancel - sees already cancelled
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValueOnce({ count: 1 });
const result1 = await service.cancel(jobId, workspaceId);
expect(result1.status).toBe(RunnerJobStatus.CANCELLED);
// Second cancel attempt should fail (job already cancelled)
await expect(service.cancel(jobId, workspaceId)).rejects.toThrow(BadRequestException);
});
});
describe("retry mechanism", () => {
it("should retry up to max attempts on version conflicts", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
};
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
// All retry attempts fail
vi.mocked(prisma.runnerJob.updateMany)
.mockResolvedValueOnce({ count: 0 })
.mockResolvedValueOnce({ count: 0 })
.mockResolvedValueOnce({ count: 0 });
// Should throw after max retries (3)
await expect(
service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED)
).rejects.toThrow(ConflictException);
expect(prisma.runnerJob.updateMany).toHaveBeenCalledTimes(3);
});
it("should use exponential backoff between retries", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.RUNNING,
version: 1,
};
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
const updateManyCalls: number[] = [];
vi.mocked(prisma.runnerJob.updateMany).mockImplementation(async () => {
updateManyCalls.push(Date.now());
return { count: 0 };
});
await expect(
service.updateStatus(jobId, workspaceId, RunnerJobStatus.COMPLETED)
).rejects.toThrow(ConflictException);
// Verify delays between calls increase (exponential backoff)
expect(updateManyCalls.length).toBe(3);
if (updateManyCalls.length >= 3) {
const delay1 = updateManyCalls[1] - updateManyCalls[0];
const delay2 = updateManyCalls[2] - updateManyCalls[1];
// Second delay should be >= first delay (exponential)
expect(delay2).toBeGreaterThanOrEqual(delay1);
}
});
});
describe("status transition validation with concurrency", () => {
it("should prevent invalid transitions even under concurrent updates", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
// Job is already completed
const mockJob = {
id: jobId,
workspaceId,
status: RunnerJobStatus.COMPLETED,
version: 5,
completedAt: new Date(),
};
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
// Should reject transition from COMPLETED to RUNNING
await expect(
service.updateStatus(jobId, workspaceId, RunnerJobStatus.RUNNING)
).rejects.toThrow();
});
});
});