Implemented optimistic locking with version field and SELECT FOR UPDATE transactions to prevent data corruption from concurrent job status updates. Changes: - Added version field to RunnerJob schema for optimistic locking - Created migration 20260202_add_runner_job_version_for_concurrency - Implemented ConcurrentUpdateException for conflict detection - Updated RunnerJobsService methods with optimistic locking: * updateStatus() - with version checking and retry logic * updateProgress() - with version checking and retry logic * cancel() - with version checking and retry logic - Updated CoordinatorIntegrationService with SELECT FOR UPDATE: * updateJobStatus() - transaction with row locking * completeJob() - transaction with row locking * failJob() - transaction with row locking * updateJobProgress() - optimistic locking - Added retry mechanism (3 attempts) with exponential backoff - Added comprehensive concurrency tests (10 tests, all passing) - Updated existing test mocks to support updateMany Test Results: - All 10 concurrency tests passing ✓ - Tests cover concurrent status updates, progress updates, completions, cancellations, retry logic, and exponential backoff This fix prevents race conditions that could cause: - Lost job results (double completion) - Lost progress updates - Invalid status transitions - Data corruption under concurrent access Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
322 lines
8.4 KiB
TypeScript
322 lines
8.4 KiB
TypeScript
import { Injectable, NotFoundException } from "@nestjs/common";
|
|
import { Prisma, Idea } from "@prisma/client";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import { ActivityService } from "../activity/activity.service";
|
|
import { IdeaStatus } from "@prisma/client";
|
|
import type { CreateIdeaDto, CaptureIdeaDto, UpdateIdeaDto, QueryIdeasDto } from "./dto";
|
|
|
|
type IdeaWithRelations = Idea & {
|
|
creator: { id: string; name: string; email: string };
|
|
domain: { id: string; name: string; color: string | null } | null;
|
|
project: { id: string; name: string; color: string | null } | null;
|
|
};
|
|
|
|
type IdeaCaptured = Idea & {
|
|
creator: { id: string; name: string; email: string };
|
|
};
|
|
|
|
/**
|
|
* Service for managing ideas
|
|
*/
|
|
@Injectable()
|
|
export class IdeasService {
|
|
constructor(
|
|
private readonly prisma: PrismaService,
|
|
private readonly activityService: ActivityService
|
|
) {}
|
|
|
|
/**
|
|
* Create a new idea
|
|
*/
|
|
async create(
|
|
workspaceId: string,
|
|
userId: string,
|
|
createIdeaDto: CreateIdeaDto
|
|
): Promise<IdeaWithRelations> {
|
|
const domainConnection = createIdeaDto.domainId
|
|
? { connect: { id: createIdeaDto.domainId } }
|
|
: undefined;
|
|
|
|
const projectConnection = createIdeaDto.projectId
|
|
? { connect: { id: createIdeaDto.projectId } }
|
|
: undefined;
|
|
|
|
const data: Prisma.IdeaCreateInput = {
|
|
title: createIdeaDto.title ?? null,
|
|
content: createIdeaDto.content,
|
|
category: createIdeaDto.category ?? null,
|
|
workspace: { connect: { id: workspaceId } },
|
|
creator: { connect: { id: userId } },
|
|
status: createIdeaDto.status ?? IdeaStatus.CAPTURED,
|
|
priority: createIdeaDto.priority ?? "MEDIUM",
|
|
tags: createIdeaDto.tags ?? [],
|
|
metadata: createIdeaDto.metadata
|
|
? (createIdeaDto.metadata as unknown as Prisma.InputJsonValue)
|
|
: {},
|
|
...(domainConnection && { domain: domainConnection }),
|
|
...(projectConnection && { project: projectConnection }),
|
|
};
|
|
|
|
const idea = await this.prisma.idea.create({
|
|
data,
|
|
include: {
|
|
creator: {
|
|
select: { id: true, name: true, email: true },
|
|
},
|
|
domain: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
project: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
},
|
|
});
|
|
|
|
// Log activity
|
|
await this.activityService.logIdeaCreated(workspaceId, userId, idea.id, {
|
|
title: idea.title ?? "Untitled",
|
|
});
|
|
|
|
return idea;
|
|
}
|
|
|
|
/**
|
|
* Quick capture - create an idea with minimal fields
|
|
* Optimized for rapid idea capture from the front-end
|
|
*/
|
|
async capture(
|
|
workspaceId: string,
|
|
userId: string,
|
|
captureIdeaDto: CaptureIdeaDto
|
|
): Promise<IdeaCaptured> {
|
|
const data: Prisma.IdeaCreateInput = {
|
|
workspace: { connect: { id: workspaceId } },
|
|
creator: { connect: { id: userId } },
|
|
content: captureIdeaDto.content,
|
|
title: captureIdeaDto.title ?? null,
|
|
status: IdeaStatus.CAPTURED,
|
|
priority: "MEDIUM",
|
|
tags: [],
|
|
metadata: {},
|
|
};
|
|
|
|
const idea = await this.prisma.idea.create({
|
|
data,
|
|
include: {
|
|
creator: {
|
|
select: { id: true, name: true, email: true },
|
|
},
|
|
},
|
|
});
|
|
|
|
// Log activity
|
|
await this.activityService.logIdeaCreated(workspaceId, userId, idea.id, {
|
|
quickCapture: true,
|
|
title: idea.title ?? "Untitled",
|
|
});
|
|
|
|
return idea;
|
|
}
|
|
|
|
/**
|
|
* Get paginated ideas with filters
|
|
*/
|
|
async findAll(query: QueryIdeasDto): Promise<{
|
|
data: IdeaWithRelations[];
|
|
meta: {
|
|
total: number;
|
|
page: number;
|
|
limit: number;
|
|
totalPages: number;
|
|
};
|
|
}> {
|
|
const page = query.page ?? 1;
|
|
const limit = query.limit ?? 50;
|
|
const skip = (page - 1) * limit;
|
|
|
|
// Build where clause
|
|
const where: Prisma.IdeaWhereInput = query.workspaceId
|
|
? {
|
|
workspaceId: query.workspaceId,
|
|
}
|
|
: {};
|
|
|
|
if (query.status) {
|
|
where.status = query.status;
|
|
}
|
|
|
|
if (query.domainId) {
|
|
where.domainId = query.domainId;
|
|
}
|
|
|
|
if (query.projectId) {
|
|
where.projectId = query.projectId;
|
|
}
|
|
|
|
if (query.category) {
|
|
where.category = query.category;
|
|
}
|
|
|
|
// Add search filter if provided
|
|
if (query.search) {
|
|
where.OR = [
|
|
{ title: { contains: query.search, mode: "insensitive" } },
|
|
{ content: { contains: query.search, mode: "insensitive" } },
|
|
];
|
|
}
|
|
|
|
// Execute queries in parallel
|
|
const [data, total] = await Promise.all([
|
|
this.prisma.idea.findMany({
|
|
where,
|
|
include: {
|
|
creator: {
|
|
select: { id: true, name: true, email: true },
|
|
},
|
|
domain: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
project: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
skip,
|
|
take: limit,
|
|
}),
|
|
this.prisma.idea.count({ where }),
|
|
]);
|
|
|
|
return {
|
|
data,
|
|
meta: {
|
|
total,
|
|
page,
|
|
limit,
|
|
totalPages: Math.ceil(total / limit),
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get a single idea by ID
|
|
*/
|
|
async findOne(id: string, workspaceId: string): Promise<IdeaWithRelations> {
|
|
const idea = await this.prisma.idea.findUnique({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
},
|
|
include: {
|
|
creator: {
|
|
select: { id: true, name: true, email: true },
|
|
},
|
|
domain: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
project: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
},
|
|
});
|
|
|
|
if (!idea) {
|
|
throw new NotFoundException(`Idea with ID ${id} not found`);
|
|
}
|
|
|
|
return idea;
|
|
}
|
|
|
|
/**
|
|
* Update an idea
|
|
*/
|
|
async update(
|
|
id: string,
|
|
workspaceId: string,
|
|
userId: string,
|
|
updateIdeaDto: UpdateIdeaDto
|
|
): Promise<IdeaWithRelations> {
|
|
// Verify idea exists
|
|
const existingIdea = await this.prisma.idea.findUnique({
|
|
where: { id, workspaceId },
|
|
});
|
|
|
|
if (!existingIdea) {
|
|
throw new NotFoundException(`Idea with ID ${id} not found`);
|
|
}
|
|
|
|
// Build update data, only including defined fields (excluding domainId and projectId)
|
|
const updateData: Prisma.IdeaUpdateInput = {};
|
|
if (updateIdeaDto.title !== undefined) updateData.title = updateIdeaDto.title;
|
|
if (updateIdeaDto.content !== undefined) updateData.content = updateIdeaDto.content;
|
|
if (updateIdeaDto.status !== undefined) updateData.status = updateIdeaDto.status;
|
|
if (updateIdeaDto.priority !== undefined) updateData.priority = updateIdeaDto.priority;
|
|
if (updateIdeaDto.category !== undefined) updateData.category = updateIdeaDto.category;
|
|
if (updateIdeaDto.tags !== undefined) updateData.tags = updateIdeaDto.tags;
|
|
if (updateIdeaDto.metadata !== undefined) {
|
|
updateData.metadata = updateIdeaDto.metadata as unknown as Prisma.InputJsonValue;
|
|
}
|
|
// Handle domain and project relations separately
|
|
if (updateIdeaDto.domainId !== undefined) {
|
|
updateData.domain = { connect: { id: updateIdeaDto.domainId } };
|
|
}
|
|
if (updateIdeaDto.projectId !== undefined) {
|
|
updateData.project = { connect: { id: updateIdeaDto.projectId } };
|
|
}
|
|
|
|
const idea = await this.prisma.idea.update({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
},
|
|
data: updateData,
|
|
include: {
|
|
creator: {
|
|
select: { id: true, name: true, email: true },
|
|
},
|
|
domain: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
project: {
|
|
select: { id: true, name: true, color: true },
|
|
},
|
|
},
|
|
});
|
|
|
|
// Log activity
|
|
await this.activityService.logIdeaUpdated(workspaceId, userId, id, {
|
|
changes: updateIdeaDto as Prisma.JsonValue,
|
|
});
|
|
|
|
return idea;
|
|
}
|
|
|
|
/**
|
|
* Delete an idea
|
|
*/
|
|
async remove(id: string, workspaceId: string, userId: string): Promise<void> {
|
|
// Verify idea exists
|
|
const idea = await this.prisma.idea.findUnique({
|
|
where: { id, workspaceId },
|
|
});
|
|
|
|
if (!idea) {
|
|
throw new NotFoundException(`Idea with ID ${id} not found`);
|
|
}
|
|
|
|
await this.prisma.idea.delete({
|
|
where: {
|
|
id,
|
|
workspaceId,
|
|
},
|
|
});
|
|
|
|
// Log activity
|
|
await this.activityService.logIdeaDeleted(workspaceId, userId, id, {
|
|
title: idea.title ?? "Untitled",
|
|
});
|
|
}
|
|
}
|