fix(#196, #199): Fix TypeScript errors from race condition and throttler changes
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
- Regenerated Prisma client to include version field from #196 - Updated ThrottlerValkeyStorageService to match @nestjs/throttler v6.5 interface - increment() now returns ThrottlerStorageRecord with totalHits, timeToExpire, isBlocked - Added blockDuration and throttlerName parameters to match interface - Added null checks for job variable after length checks in coordinator-integration.service.ts - Fixed template literal type error in ConcurrentUpdateException - Removed unnecessary await in throttler-storage.service.ts - Fixes pipeline 79 typecheck failure Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -8,7 +8,7 @@ import { ConflictException } from "@nestjs/common";
|
|||||||
export class ConcurrentUpdateException extends ConflictException {
|
export class ConcurrentUpdateException extends ConflictException {
|
||||||
constructor(resourceType: string, resourceId: string, currentVersion?: number) {
|
constructor(resourceType: string, resourceId: string, currentVersion?: number) {
|
||||||
const message = currentVersion
|
const message = currentVersion
|
||||||
? `Concurrent update detected for ${resourceType} ${resourceId} at version ${currentVersion}. The record was modified by another process.`
|
? `Concurrent update detected for ${resourceType} ${resourceId} at version ${String(currentVersion)}. The record was modified by another process.`
|
||||||
: `Concurrent update detected for ${resourceType} ${resourceId}. The record was modified by another process.`;
|
: `Concurrent update detected for ${resourceType} ${resourceId}. The record was modified by another process.`;
|
||||||
|
|
||||||
super({
|
super({
|
||||||
|
|||||||
@@ -1,7 +1,18 @@
|
|||||||
import { Injectable, OnModuleInit, Logger } from "@nestjs/common";
|
import { Injectable, OnModuleInit, Logger } from "@nestjs/common";
|
||||||
import { ThrottlerStorageService } from "@nestjs/throttler";
|
import { ThrottlerStorage } from "@nestjs/throttler";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Throttler storage record interface
|
||||||
|
* Matches @nestjs/throttler's ThrottlerStorageRecord
|
||||||
|
*/
|
||||||
|
interface ThrottlerStorageRecord {
|
||||||
|
totalHits: number;
|
||||||
|
timeToExpire: number;
|
||||||
|
isBlocked: boolean;
|
||||||
|
timeToBlockExpire: number;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Redis-based storage for rate limiting using Valkey
|
* Redis-based storage for rate limiting using Valkey
|
||||||
*
|
*
|
||||||
@@ -12,9 +23,9 @@ import Redis from "ioredis";
|
|||||||
* If Redis is unavailable, falls back to in-memory storage.
|
* If Redis is unavailable, falls back to in-memory storage.
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class ThrottlerValkeyStorageService implements ThrottlerStorageService, OnModuleInit {
|
export class ThrottlerValkeyStorageService implements ThrottlerStorage, OnModuleInit {
|
||||||
private readonly logger = new Logger(ThrottlerValkeyStorageService.name);
|
private readonly logger = new Logger(ThrottlerValkeyStorageService.name);
|
||||||
private client?: Redis;
|
private client: Redis | undefined = undefined;
|
||||||
private readonly THROTTLER_PREFIX = "mosaic:throttler:";
|
private readonly THROTTLER_PREFIX = "mosaic:throttler:";
|
||||||
private readonly fallbackStorage = new Map<string, number[]>();
|
private readonly fallbackStorage = new Map<string, number[]>();
|
||||||
private useRedis = false;
|
private useRedis = false;
|
||||||
@@ -54,27 +65,49 @@ export class ThrottlerValkeyStorageService implements ThrottlerStorageService, O
|
|||||||
*
|
*
|
||||||
* @param key - Throttle key (e.g., "apikey:xxx" or "ip:192.168.1.1")
|
* @param key - Throttle key (e.g., "apikey:xxx" or "ip:192.168.1.1")
|
||||||
* @param ttl - Time to live in milliseconds
|
* @param ttl - Time to live in milliseconds
|
||||||
* @returns Promise resolving to the current number of requests
|
* @param limit - Maximum number of requests allowed
|
||||||
|
* @param blockDuration - Duration to block in milliseconds (not used in this implementation)
|
||||||
|
* @param _throttlerName - Name of the throttler (not used in this implementation)
|
||||||
|
* @returns Promise resolving to the current throttler storage record
|
||||||
*/
|
*/
|
||||||
async increment(key: string, ttl: number): Promise<number> {
|
async increment(
|
||||||
|
key: string,
|
||||||
|
ttl: number,
|
||||||
|
limit: number,
|
||||||
|
blockDuration: number,
|
||||||
|
_throttlerName: string
|
||||||
|
): Promise<ThrottlerStorageRecord> {
|
||||||
const throttleKey = this.getThrottleKey(key);
|
const throttleKey = this.getThrottleKey(key);
|
||||||
|
let totalHits: number;
|
||||||
|
|
||||||
if (this.useRedis && this.client) {
|
if (this.useRedis && this.client) {
|
||||||
try {
|
try {
|
||||||
const result = await this.client.multi().incr(throttleKey).pexpire(throttleKey, ttl).exec();
|
const result = await this.client.multi().incr(throttleKey).pexpire(throttleKey, ttl).exec();
|
||||||
|
|
||||||
if (result?.[0]?.[1]) {
|
if (result?.[0]?.[1]) {
|
||||||
return result[0][1] as number;
|
totalHits = result[0][1] as number;
|
||||||
|
} else {
|
||||||
|
totalHits = this.incrementMemory(throttleKey, ttl);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
this.logger.error(`Redis increment failed: ${errorMessage}`);
|
this.logger.error(`Redis increment failed: ${errorMessage}`);
|
||||||
// Fall through to in-memory
|
// Fall through to in-memory
|
||||||
|
totalHits = this.incrementMemory(throttleKey, ttl);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// In-memory fallback
|
||||||
|
totalHits = this.incrementMemory(throttleKey, ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
// In-memory fallback
|
// Return ThrottlerStorageRecord
|
||||||
return this.incrementMemory(throttleKey, ttl);
|
const isBlocked = totalHits > limit;
|
||||||
|
return {
|
||||||
|
totalHits,
|
||||||
|
timeToExpire: ttl,
|
||||||
|
isBlocked,
|
||||||
|
timeToBlockExpire: isBlocked ? blockDuration : 0,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -120,11 +120,14 @@ export class CoordinatorIntegrationService {
|
|||||||
FOR UPDATE
|
FOR UPDATE
|
||||||
`;
|
`;
|
||||||
|
|
||||||
if (!jobs || jobs.length === 0) {
|
if (jobs.length === 0) {
|
||||||
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const job = jobs[0];
|
const job = jobs[0];
|
||||||
|
if (!job) {
|
||||||
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate status transition
|
// Validate status transition
|
||||||
if (!this.isValidStatusTransition(job.status, dto.status as RunnerJobStatus)) {
|
if (!this.isValidStatusTransition(job.status, dto.status as RunnerJobStatus)) {
|
||||||
@@ -245,11 +248,14 @@ export class CoordinatorIntegrationService {
|
|||||||
FOR UPDATE
|
FOR UPDATE
|
||||||
`;
|
`;
|
||||||
|
|
||||||
if (!jobs || jobs.length === 0) {
|
if (jobs.length === 0) {
|
||||||
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const job = jobs[0];
|
const job = jobs[0];
|
||||||
|
if (!job) {
|
||||||
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate status transition
|
// Validate status transition
|
||||||
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.COMPLETED)) {
|
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.COMPLETED)) {
|
||||||
@@ -312,11 +318,14 @@ export class CoordinatorIntegrationService {
|
|||||||
FOR UPDATE
|
FOR UPDATE
|
||||||
`;
|
`;
|
||||||
|
|
||||||
if (!jobs || jobs.length === 0) {
|
if (jobs.length === 0) {
|
||||||
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const job = jobs[0];
|
const job = jobs[0];
|
||||||
|
if (!job) {
|
||||||
|
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate status transition
|
// Validate status transition
|
||||||
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.FAILED)) {
|
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.FAILED)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user