Files
stack/apps/api/src/domains/domains.service.ts
Jason Woltje ef25167c24 fix(#196): fix race condition in job status updates
Implemented optimistic locking with version field and SELECT FOR UPDATE
transactions to prevent data corruption from concurrent job status updates.

Changes:
- Added version field to RunnerJob schema for optimistic locking
- Created migration 20260202_add_runner_job_version_for_concurrency
- Implemented ConcurrentUpdateException for conflict detection
- Updated RunnerJobsService methods with optimistic locking:
  * updateStatus() - with version checking and retry logic
  * updateProgress() - with version checking and retry logic
  * cancel() - with version checking and retry logic
- Updated CoordinatorIntegrationService with SELECT FOR UPDATE:
  * updateJobStatus() - transaction with row locking
  * completeJob() - transaction with row locking
  * failJob() - transaction with row locking
  * updateJobProgress() - optimistic locking
- Added retry mechanism (3 attempts) with exponential backoff
- Added comprehensive concurrency tests (10 tests, all passing)
- Updated existing test mocks to support updateMany

Test Results:
- All 10 concurrency tests passing ✓
- Tests cover concurrent status updates, progress updates, completions,
  cancellations, retry logic, and exponential backoff

This fix prevents race conditions that could cause:
- Lost job results (double completion)
- Lost progress updates
- Invalid status transitions
- Data corruption under concurrent access

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-02 12:51:17 -06:00

217 lines
5.5 KiB
TypeScript

import { Injectable, NotFoundException } from "@nestjs/common";
import { Prisma, Domain } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import { ActivityService } from "../activity/activity.service";
import type { CreateDomainDto, UpdateDomainDto, QueryDomainsDto } from "./dto";
type DomainWithCount = Domain & {
_count: { tasks: number; events: number; projects: number; ideas: number };
};
/**
* Service for managing domains
*/
@Injectable()
export class DomainsService {
constructor(
private readonly prisma: PrismaService,
private readonly activityService: ActivityService
) {}
/**
* Create a new domain
*/
async create(
workspaceId: string,
userId: string,
createDomainDto: CreateDomainDto
): Promise<DomainWithCount> {
const domain = await this.prisma.domain.create({
data: {
name: createDomainDto.name,
slug: createDomainDto.slug,
description: createDomainDto.description ?? null,
color: createDomainDto.color ?? null,
icon: createDomainDto.icon ?? null,
workspace: {
connect: { id: workspaceId },
},
metadata: (createDomainDto.metadata ?? {}) as unknown as Prisma.InputJsonValue,
sortOrder: 0, // Default to 0, consistent with other services
},
include: {
_count: {
select: { tasks: true, events: true, projects: true, ideas: true },
},
},
});
// Log activity
await this.activityService.logDomainCreated(workspaceId, userId, domain.id, {
name: domain.name,
});
return domain;
}
/**
* Get paginated domains with filters
*/
async findAll(query: QueryDomainsDto): Promise<{
data: DomainWithCount[];
meta: {
total: number;
page: number;
limit: number;
totalPages: number;
};
}> {
const page = query.page ?? 1;
const limit = query.limit ?? 50;
const skip = (page - 1) * limit;
// Build where clause
const where: Prisma.DomainWhereInput = query.workspaceId
? {
workspaceId: query.workspaceId,
}
: {};
// Add search filter if provided
if (query.search) {
where.OR = [
{ name: { contains: query.search, mode: "insensitive" } },
{ description: { contains: query.search, mode: "insensitive" } },
];
}
// Execute queries in parallel
const [data, total] = await Promise.all([
this.prisma.domain.findMany({
where,
include: {
_count: {
select: { tasks: true, events: true, projects: true, ideas: true },
},
},
orderBy: {
sortOrder: "asc",
},
skip,
take: limit,
}),
this.prisma.domain.count({ where }),
]);
return {
data,
meta: {
total,
page,
limit,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* Get a single domain by ID
*/
async findOne(id: string, workspaceId: string): Promise<DomainWithCount> {
const domain = await this.prisma.domain.findUnique({
where: {
id,
workspaceId,
},
include: {
_count: {
select: { tasks: true, events: true, projects: true, ideas: true },
},
},
});
if (!domain) {
throw new NotFoundException(`Domain with ID ${id} not found`);
}
return domain;
}
/**
* Update a domain
*/
async update(
id: string,
workspaceId: string,
userId: string,
updateDomainDto: UpdateDomainDto
): Promise<DomainWithCount> {
// Verify domain exists
const existingDomain = await this.prisma.domain.findUnique({
where: { id, workspaceId },
});
if (!existingDomain) {
throw new NotFoundException(`Domain with ID ${id} not found`);
}
// Build update data, only including defined fields
const updateData: Prisma.DomainUpdateInput = {};
if (updateDomainDto.name !== undefined) updateData.name = updateDomainDto.name;
if (updateDomainDto.slug !== undefined) updateData.slug = updateDomainDto.slug;
if (updateDomainDto.description !== undefined)
updateData.description = updateDomainDto.description;
if (updateDomainDto.color !== undefined) updateData.color = updateDomainDto.color;
if (updateDomainDto.icon !== undefined) updateData.icon = updateDomainDto.icon;
if (updateDomainDto.metadata !== undefined) {
updateData.metadata = updateDomainDto.metadata as unknown as Prisma.InputJsonValue;
}
const domain = await this.prisma.domain.update({
where: {
id,
workspaceId,
},
data: updateData,
include: {
_count: {
select: { tasks: true, events: true, projects: true, ideas: true },
},
},
});
// Log activity
await this.activityService.logDomainUpdated(workspaceId, userId, id, {
changes: updateDomainDto as Prisma.JsonValue,
});
return domain;
}
/**
* Delete a domain
*/
async remove(id: string, workspaceId: string, userId: string): Promise<void> {
// Verify domain exists
const domain = await this.prisma.domain.findUnique({
where: { id, workspaceId },
});
if (!domain) {
throw new NotFoundException(`Domain with ID ${id} not found`);
}
await this.prisma.domain.delete({
where: {
id,
workspaceId,
},
});
// Log activity
await this.activityService.logDomainDeleted(workspaceId, userId, id, {
name: domain.name,
});
}
}