Implemented optimistic locking with version field and SELECT FOR UPDATE transactions to prevent data corruption from concurrent job status updates. Changes: - Added version field to RunnerJob schema for optimistic locking - Created migration 20260202_add_runner_job_version_for_concurrency - Implemented ConcurrentUpdateException for conflict detection - Updated RunnerJobsService methods with optimistic locking: * updateStatus() - with version checking and retry logic * updateProgress() - with version checking and retry logic * cancel() - with version checking and retry logic - Updated CoordinatorIntegrationService with SELECT FOR UPDATE: * updateJobStatus() - transaction with row locking * completeJob() - transaction with row locking * failJob() - transaction with row locking * updateJobProgress() - optimistic locking - Added retry mechanism (3 attempts) with exponential backoff - Added comprehensive concurrency tests (10 tests, all passing) - Updated existing test mocks to support updateMany Test Results: - All 10 concurrency tests passing ✓ - Tests cover concurrent status updates, progress updates, completions, cancellations, retry logic, and exponential backoff This fix prevents race conditions that could cause: - Lost job results (double completion) - Lost progress updates - Invalid status transitions - Data corruption under concurrent access Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
194 lines
4.7 KiB
TypeScript
194 lines
4.7 KiB
TypeScript
import { Injectable, NotFoundException } from "@nestjs/common";
|
|
import { Prisma, UserLayout } from "@prisma/client";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import type { CreateLayoutDto, UpdateLayoutDto } from "./dto";
|
|
|
|
/**
|
|
* Service for managing user layouts
|
|
*/
|
|
@Injectable()
|
|
export class LayoutsService {
|
|
constructor(private readonly prisma: PrismaService) {}
|
|
|
|
/**
|
|
* Get all layouts for a user
|
|
*/
|
|
async findAll(workspaceId: string, userId: string): Promise<UserLayout[]> {
|
|
return this.prisma.userLayout.findMany({
|
|
where: {
|
|
workspaceId,
|
|
userId,
|
|
},
|
|
orderBy: {
|
|
isDefault: "desc",
|
|
createdAt: "desc",
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get the default layout for a user
|
|
*/
|
|
async findDefault(workspaceId: string, userId: string): Promise<UserLayout> {
|
|
const layout = await this.prisma.userLayout.findFirst({
|
|
where: {
|
|
workspaceId,
|
|
userId,
|
|
isDefault: true,
|
|
},
|
|
});
|
|
|
|
// If no default layout exists, return the most recently created one
|
|
if (!layout) {
|
|
const recentLayout = await this.prisma.userLayout.findFirst({
|
|
where: {
|
|
workspaceId,
|
|
userId,
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
});
|
|
|
|
if (!recentLayout) {
|
|
throw new NotFoundException(`No layouts found for this user`);
|
|
}
|
|
|
|
return recentLayout;
|
|
}
|
|
|
|
return layout;
|
|
}
|
|
|
|
/**
|
|
* Get a single layout by ID
|
|
*/
|
|
async findOne(id: string, workspaceId: string, userId: string): Promise<UserLayout> {
|
|
const layout = await this.prisma.userLayout.findUnique({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
userId,
|
|
},
|
|
});
|
|
|
|
if (!layout) {
|
|
throw new NotFoundException(`Layout with ID ${id} not found`);
|
|
}
|
|
|
|
return layout;
|
|
}
|
|
|
|
/**
|
|
* Create a new layout
|
|
*/
|
|
async create(
|
|
workspaceId: string,
|
|
userId: string,
|
|
createLayoutDto: CreateLayoutDto
|
|
): Promise<UserLayout> {
|
|
// Use transaction to ensure atomicity when setting default
|
|
return this.prisma.$transaction(async (tx) => {
|
|
// If setting as default, unset other defaults first
|
|
if (createLayoutDto.isDefault) {
|
|
await tx.userLayout.updateMany({
|
|
where: {
|
|
workspaceId,
|
|
userId,
|
|
isDefault: true,
|
|
},
|
|
data: {
|
|
isDefault: false,
|
|
},
|
|
});
|
|
}
|
|
|
|
return tx.userLayout.create({
|
|
data: {
|
|
name: createLayoutDto.name,
|
|
workspaceId,
|
|
userId,
|
|
isDefault: createLayoutDto.isDefault ?? false,
|
|
layout: createLayoutDto.layout as unknown as Prisma.InputJsonValue,
|
|
},
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Update a layout
|
|
*/
|
|
async update(
|
|
id: string,
|
|
workspaceId: string,
|
|
userId: string,
|
|
updateLayoutDto: UpdateLayoutDto
|
|
): Promise<UserLayout> {
|
|
// Use transaction to ensure atomicity when setting default
|
|
return this.prisma.$transaction(async (tx) => {
|
|
// Verify layout exists
|
|
const existingLayout = await tx.userLayout.findUnique({
|
|
where: { id, workspaceId, userId },
|
|
});
|
|
|
|
if (!existingLayout) {
|
|
throw new NotFoundException(`Layout with ID ${id} not found`);
|
|
}
|
|
|
|
// If setting as default, unset other defaults first
|
|
if (updateLayoutDto.isDefault === true) {
|
|
await tx.userLayout.updateMany({
|
|
where: {
|
|
workspaceId,
|
|
userId,
|
|
id: { not: id },
|
|
isDefault: true,
|
|
},
|
|
data: {
|
|
isDefault: false,
|
|
},
|
|
});
|
|
}
|
|
|
|
// Build update data, handling layout field separately
|
|
const updateData: Prisma.UserLayoutUpdateInput = {};
|
|
if (updateLayoutDto.name !== undefined) updateData.name = updateLayoutDto.name;
|
|
if (updateLayoutDto.isDefault !== undefined) updateData.isDefault = updateLayoutDto.isDefault;
|
|
if (updateLayoutDto.layout !== undefined) {
|
|
updateData.layout = updateLayoutDto.layout as unknown as Prisma.InputJsonValue;
|
|
}
|
|
|
|
return tx.userLayout.update({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
userId,
|
|
},
|
|
data: updateData,
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Delete a layout
|
|
*/
|
|
async remove(id: string, workspaceId: string, userId: string): Promise<void> {
|
|
// Verify layout exists
|
|
const layout = await this.prisma.userLayout.findUnique({
|
|
where: { id, workspaceId, userId },
|
|
});
|
|
|
|
if (!layout) {
|
|
throw new NotFoundException(`Layout with ID ${id} not found`);
|
|
}
|
|
|
|
await this.prisma.userLayout.delete({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
userId,
|
|
},
|
|
});
|
|
}
|
|
}
|