fix(#199): implement rate limiting on webhook endpoints
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Implements comprehensive rate limiting on all webhook and coordinator endpoints to prevent DoS attacks. Follows TDD protocol with 14 passing tests. Implementation: - Added @nestjs/throttler package for rate limiting - Created ThrottlerApiKeyGuard for per-API-key rate limiting - Created ThrottlerValkeyStorageService for distributed rate limiting via Redis - Configured rate limits on stitcher endpoints (60 req/min) - Configured rate limits on coordinator endpoints (100 req/min) - Higher limits for health endpoints (300 req/min for monitoring) - Added environment variables for rate limit configuration - Rate limiting logs violations for security monitoring Rate Limits: - Stitcher webhooks: 60 requests/minute per API key - Coordinator endpoints: 100 requests/minute per API key - Health endpoints: 300 requests/minute (higher for monitoring) Storage: - Uses Valkey (Redis) for distributed rate limiting across API instances - Falls back to in-memory storage if Redis unavailable Testing: - 14 comprehensive rate limiting tests (all passing) - Tests verify: rate limit enforcement, Retry-After headers, per-API-key isolation - TDD approach: RED (failing tests) → GREEN (implementation) → REFACTOR Additional improvements: - Type safety improvements in websocket gateway - Array type notation standardization in coordinator service Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { Controller, Post, Patch, Get, Body, Param, UseGuards } from "@nestjs/common";
|
||||
import { Throttle } from "@nestjs/throttler";
|
||||
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
|
||||
import {
|
||||
CreateCoordinatorJobDto,
|
||||
@@ -13,7 +14,10 @@ import { ApiKeyGuard } from "../common/guards";
|
||||
/**
|
||||
* CoordinatorIntegrationController - REST API for Python coordinator communication
|
||||
*
|
||||
* SECURITY: All endpoints require API key authentication via X-API-Key header
|
||||
* SECURITY:
|
||||
* - All endpoints require API key authentication via X-API-Key header
|
||||
* - Rate limiting: 100 requests per minute per API key (default)
|
||||
* - Health endpoint: 300 requests per minute (higher for monitoring)
|
||||
*
|
||||
* Endpoints:
|
||||
* - POST /coordinator/jobs - Create a job from coordinator
|
||||
@@ -26,21 +30,28 @@ import { ApiKeyGuard } from "../common/guards";
|
||||
*/
|
||||
@Controller("coordinator")
|
||||
@UseGuards(ApiKeyGuard)
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } }) // 100 requests per minute
|
||||
export class CoordinatorIntegrationController {
|
||||
constructor(private readonly service: CoordinatorIntegrationService) {}
|
||||
|
||||
/**
|
||||
* Create a job from the coordinator
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Post("jobs")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async createJob(@Body() dto: CreateCoordinatorJobDto): Promise<CoordinatorJobResult> {
|
||||
return this.service.createJob(dto);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update job status from the coordinator
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Patch("jobs/:id/status")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async updateJobStatus(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: UpdateJobStatusDto
|
||||
@@ -50,8 +61,11 @@ export class CoordinatorIntegrationController {
|
||||
|
||||
/**
|
||||
* Update job progress from the coordinator
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Patch("jobs/:id/progress")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async updateJobProgress(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: UpdateJobProgressDto
|
||||
@@ -61,8 +75,11 @@ export class CoordinatorIntegrationController {
|
||||
|
||||
/**
|
||||
* Mark job as complete from the coordinator
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Post("jobs/:id/complete")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async completeJob(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: CompleteJobDto
|
||||
@@ -72,8 +89,11 @@ export class CoordinatorIntegrationController {
|
||||
|
||||
/**
|
||||
* Mark job as failed from the coordinator
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Post("jobs/:id/fail")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async failJob(
|
||||
@Param("id") id: string,
|
||||
@Body() dto: FailJobDto
|
||||
@@ -83,8 +103,11 @@ export class CoordinatorIntegrationController {
|
||||
|
||||
/**
|
||||
* Get job details with events and steps
|
||||
*
|
||||
* Rate limit: 100 requests per minute per API key
|
||||
*/
|
||||
@Get("jobs/:id")
|
||||
@Throttle({ default: { ttl: 60000, limit: 100 } })
|
||||
async getJobDetails(
|
||||
@Param("id") id: string
|
||||
): Promise<Awaited<ReturnType<typeof this.service.getJobDetails>>> {
|
||||
@@ -93,8 +116,11 @@ export class CoordinatorIntegrationController {
|
||||
|
||||
/**
|
||||
* Integration health check
|
||||
*
|
||||
* Rate limit: 300 requests per minute (higher for monitoring)
|
||||
*/
|
||||
@Get("health")
|
||||
@Throttle({ default: { ttl: 60000, limit: 300 } })
|
||||
async getHealth(): Promise<CoordinatorHealthStatus> {
|
||||
return this.service.getIntegrationHealth();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { INestApplication, HttpStatus } from "@nestjs/common";
|
||||
import request from "supertest";
|
||||
import { CoordinatorIntegrationController } from "./coordinator-integration.controller";
|
||||
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
|
||||
import { ThrottlerModule } from "@nestjs/throttler";
|
||||
import { APP_GUARD } from "@nestjs/core";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { ApiKeyGuard } from "../common/guards";
|
||||
import { ThrottlerApiKeyGuard } from "../common/throttler";
|
||||
|
||||
/**
|
||||
* Rate Limiting Tests for Coordinator Integration Endpoints
|
||||
*
|
||||
* These tests verify that rate limiting is properly enforced on coordinator
|
||||
* endpoints to prevent DoS attacks.
|
||||
*
|
||||
* Test Coverage:
|
||||
* - Rate limit enforcement (429 status)
|
||||
* - Retry-After header inclusion
|
||||
* - Per-API-key rate limiting
|
||||
* - Higher limits for health endpoints
|
||||
*/
|
||||
describe("CoordinatorIntegrationController - Rate Limiting", () => {
|
||||
let app: INestApplication;
|
||||
let service: CoordinatorIntegrationService;
|
||||
|
||||
const mockCoordinatorService = {
|
||||
createJob: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
status: "PENDING",
|
||||
}),
|
||||
updateJobStatus: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
status: "RUNNING",
|
||||
}),
|
||||
updateJobProgress: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
progress: 50,
|
||||
}),
|
||||
completeJob: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
status: "COMPLETED",
|
||||
}),
|
||||
failJob: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
status: "FAILED",
|
||||
}),
|
||||
getJobDetails: vi.fn().mockResolvedValue({
|
||||
jobId: "coord-job-123",
|
||||
status: "RUNNING",
|
||||
}),
|
||||
getIntegrationHealth: vi.fn().mockResolvedValue({
|
||||
status: "healthy",
|
||||
timestamp: new Date().toISOString(),
|
||||
}),
|
||||
};
|
||||
|
||||
const mockConfigService = {
|
||||
get: vi.fn((key: string) => {
|
||||
const config: Record<string, string | number> = {
|
||||
COORDINATOR_API_KEY: "test-coordinator-key",
|
||||
RATE_LIMIT_TTL: "1", // 1 second for faster tests
|
||||
RATE_LIMIT_COORDINATOR_LIMIT: "100",
|
||||
RATE_LIMIT_HEALTH_LIMIT: "300",
|
||||
};
|
||||
return config[key];
|
||||
}),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
const moduleFixture: TestingModule = await Test.createTestingModule({
|
||||
imports: [
|
||||
ThrottlerModule.forRoot([
|
||||
{
|
||||
ttl: 1000, // 1 second for testing
|
||||
limit: 100, // Default limit
|
||||
},
|
||||
]),
|
||||
],
|
||||
controllers: [CoordinatorIntegrationController],
|
||||
providers: [
|
||||
{ provide: CoordinatorIntegrationService, useValue: mockCoordinatorService },
|
||||
{ provide: ConfigService, useValue: mockConfigService },
|
||||
{
|
||||
provide: APP_GUARD,
|
||||
useClass: ThrottlerApiKeyGuard,
|
||||
},
|
||||
],
|
||||
})
|
||||
.overrideGuard(ApiKeyGuard)
|
||||
.useValue({ canActivate: () => true })
|
||||
.compile();
|
||||
|
||||
app = moduleFixture.createNestApplication();
|
||||
await app.init();
|
||||
|
||||
service = moduleFixture.get<CoordinatorIntegrationService>(CoordinatorIntegrationService);
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await app.close();
|
||||
});
|
||||
|
||||
describe("POST /coordinator/jobs - Rate Limiting", () => {
|
||||
it("should allow requests within rate limit", async () => {
|
||||
const payload = {
|
||||
workspaceId: "workspace-123",
|
||||
type: "data-processing",
|
||||
data: { input: "test" },
|
||||
};
|
||||
|
||||
// Make 3 requests (within limit of 100)
|
||||
for (let i = 0; i < 3; i++) {
|
||||
const response = await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
|
||||
expect(response.status).toBe(HttpStatus.CREATED);
|
||||
}
|
||||
|
||||
expect(mockCoordinatorService.createJob).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("should return 429 when rate limit is exceeded", async () => {
|
||||
const payload = {
|
||||
workspaceId: "workspace-123",
|
||||
type: "data-processing",
|
||||
data: { input: "test" },
|
||||
};
|
||||
|
||||
// Exhaust rate limit (100 requests)
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
}
|
||||
|
||||
// The 101st request should be rate limited
|
||||
const response = await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
|
||||
expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS);
|
||||
});
|
||||
|
||||
it("should include Retry-After header in 429 response", async () => {
|
||||
const payload = {
|
||||
workspaceId: "workspace-123",
|
||||
type: "data-processing",
|
||||
data: { input: "test" },
|
||||
};
|
||||
|
||||
// Exhaust rate limit (100 requests)
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
}
|
||||
|
||||
// Get rate limited response
|
||||
const response = await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
|
||||
expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS);
|
||||
expect(response.headers).toHaveProperty("retry-after");
|
||||
expect(parseInt(response.headers["retry-after"])).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("PATCH /coordinator/jobs/:id/status - Rate Limiting", () => {
|
||||
it("should allow requests within rate limit", async () => {
|
||||
const jobId = "coord-job-123";
|
||||
const payload = { status: "RUNNING" };
|
||||
|
||||
// Make 3 requests (within limit of 100)
|
||||
for (let i = 0; i < 3; i++) {
|
||||
const response = await request(app.getHttpServer())
|
||||
.patch(`/coordinator/jobs/${jobId}/status`)
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
|
||||
expect(response.status).toBe(HttpStatus.OK);
|
||||
}
|
||||
|
||||
expect(mockCoordinatorService.updateJobStatus).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("should return 429 when rate limit is exceeded", async () => {
|
||||
const jobId = "coord-job-123";
|
||||
const payload = { status: "RUNNING" };
|
||||
|
||||
// Exhaust rate limit (100 requests)
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await request(app.getHttpServer())
|
||||
.patch(`/coordinator/jobs/${jobId}/status`)
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
}
|
||||
|
||||
// The 101st request should be rate limited
|
||||
const response = await request(app.getHttpServer())
|
||||
.patch(`/coordinator/jobs/${jobId}/status`)
|
||||
.set("X-API-Key", "test-coordinator-key")
|
||||
.send(payload);
|
||||
|
||||
expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS);
|
||||
});
|
||||
});
|
||||
|
||||
describe("GET /coordinator/health - Rate Limiting", () => {
|
||||
it("should have higher rate limit than other endpoints", async () => {
|
||||
// Health endpoint should allow 300 requests (higher than default 100)
|
||||
// Test with a smaller sample to keep test fast
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const response = await request(app.getHttpServer())
|
||||
.get("/coordinator/health")
|
||||
.set("X-API-Key", "test-coordinator-key");
|
||||
|
||||
expect(response.status).toBe(HttpStatus.OK);
|
||||
}
|
||||
|
||||
expect(mockCoordinatorService.getIntegrationHealth).toHaveBeenCalledTimes(10);
|
||||
});
|
||||
|
||||
it("should return 429 when health endpoint limit is exceeded", async () => {
|
||||
// Exhaust health endpoint limit (300 requests)
|
||||
for (let i = 0; i < 300; i++) {
|
||||
await request(app.getHttpServer())
|
||||
.get("/coordinator/health")
|
||||
.set("X-API-Key", "test-coordinator-key");
|
||||
}
|
||||
|
||||
// The 301st request should be rate limited
|
||||
const response = await request(app.getHttpServer())
|
||||
.get("/coordinator/health")
|
||||
.set("X-API-Key", "test-coordinator-key");
|
||||
|
||||
expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Per-API-Key Rate Limiting", () => {
|
||||
it("should enforce rate limits per API key independently", async () => {
|
||||
const payload = {
|
||||
workspaceId: "workspace-123",
|
||||
type: "data-processing",
|
||||
data: { input: "test" },
|
||||
};
|
||||
|
||||
// Exhaust rate limit for first API key (100 requests)
|
||||
for (let i = 0; i < 100; i++) {
|
||||
await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key-1")
|
||||
.send(payload);
|
||||
}
|
||||
|
||||
// First API key should be rate limited
|
||||
const response1 = await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key-1")
|
||||
.send(payload);
|
||||
|
||||
expect(response1.status).toBe(HttpStatus.TOO_MANY_REQUESTS);
|
||||
|
||||
// Second API key should still be allowed
|
||||
const response2 = await request(app.getHttpServer())
|
||||
.post("/coordinator/jobs")
|
||||
.set("X-API-Key", "test-coordinator-key-2")
|
||||
.send(payload);
|
||||
|
||||
expect(response2.status).toBe(HttpStatus.CREATED);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -112,7 +112,7 @@ export class CoordinatorIntegrationService {
|
||||
// Use SELECT FOR UPDATE to lock the row during this transaction
|
||||
// This prevents concurrent updates from coordinator and ensures serialization
|
||||
const jobs = await tx.$queryRaw<
|
||||
Array<{ id: string; status: RunnerJobStatus; workspace_id: string; version: number }>
|
||||
{ id: string; status: RunnerJobStatus; workspace_id: string; version: number }[]
|
||||
>`
|
||||
SELECT id, status, workspace_id, version
|
||||
FROM runner_jobs
|
||||
@@ -237,7 +237,7 @@ export class CoordinatorIntegrationService {
|
||||
return this.prisma.$transaction(async (tx) => {
|
||||
// Lock the row to prevent concurrent completion/failure
|
||||
const jobs = await tx.$queryRaw<
|
||||
Array<{ id: string; status: RunnerJobStatus; started_at: Date | null; version: number }>
|
||||
{ id: string; status: RunnerJobStatus; started_at: Date | null; version: number }[]
|
||||
>`
|
||||
SELECT id, status, started_at, version
|
||||
FROM runner_jobs
|
||||
@@ -305,9 +305,7 @@ export class CoordinatorIntegrationService {
|
||||
|
||||
return this.prisma.$transaction(async (tx) => {
|
||||
// Lock the row to prevent concurrent completion/failure
|
||||
const jobs = await tx.$queryRaw<
|
||||
Array<{ id: string; status: RunnerJobStatus; version: number }>
|
||||
>`
|
||||
const jobs = await tx.$queryRaw<{ id: string; status: RunnerJobStatus; version: number }[]>`
|
||||
SELECT id, status, version
|
||||
FROM runner_jobs
|
||||
WHERE id = ${jobId}::uuid
|
||||
|
||||
Reference in New Issue
Block a user