diff --git a/.env.example b/.env.example index fdb8dec..2b7dd82 100644 --- a/.env.example +++ b/.env.example @@ -170,6 +170,30 @@ GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET # The coordinator service uses this key to authenticate with the API COORDINATOR_API_KEY=REPLACE_WITH_RANDOM_API_KEY_MINIMUM_32_CHARS +# ====================== +# Rate Limiting +# ====================== +# Rate limiting prevents DoS attacks on webhook and API endpoints +# TTL is in seconds, limits are per TTL window + +# Global rate limit (applies to all endpoints unless overridden) +RATE_LIMIT_TTL=60 # Time window in seconds +RATE_LIMIT_GLOBAL_LIMIT=100 # Requests per window + +# Webhook endpoints (/stitcher/webhook, /stitcher/dispatch) +RATE_LIMIT_WEBHOOK_LIMIT=60 # Requests per minute + +# Coordinator endpoints (/coordinator/*) +RATE_LIMIT_COORDINATOR_LIMIT=100 # Requests per minute + +# Health check endpoints (/coordinator/health) +RATE_LIMIT_HEALTH_LIMIT=300 # Requests per minute (higher for monitoring) + +# Storage backend for rate limiting (redis or memory) +# redis: Uses Valkey for distributed rate limiting (recommended for production) +# memory: Uses in-memory storage (single instance only, for development) +RATE_LIMIT_STORAGE=redis + # ====================== # Discord Bridge (Optional) # ====================== diff --git a/apps/api/package.json b/apps/api/package.json index a26a320..5e4c388 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -33,6 +33,7 @@ "@nestjs/mapped-types": "^2.1.0", "@nestjs/platform-express": "^11.1.12", "@nestjs/platform-socket.io": "^11.1.12", + "@nestjs/throttler": "^6.5.0", "@nestjs/websockets": "^11.1.12", "@opentelemetry/api": "^1.9.0", "@opentelemetry/auto-instrumentations-node": "^0.55.0", diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index f5fdc50..0cdb04f 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -1,5 +1,7 @@ import { Module } from "@nestjs/common"; -import { APP_INTERCEPTOR } from "@nestjs/core"; +import { APP_INTERCEPTOR, APP_GUARD } from "@nestjs/core"; +import { ThrottlerModule } from "@nestjs/throttler"; +import { ThrottlerValkeyStorageService, ThrottlerApiKeyGuard } from "./common/throttler"; import { AppController } from "./app.controller"; import { AppService } from "./app.service"; import { PrismaModule } from "./prisma/prisma.module"; @@ -31,6 +33,23 @@ import { CoordinatorIntegrationModule } from "./coordinator-integration/coordina @Module({ imports: [ + // Rate limiting configuration + ThrottlerModule.forRootAsync({ + useFactory: () => { + const ttl = parseInt(process.env.RATE_LIMIT_TTL ?? "60", 10) * 1000; // Convert to milliseconds + const limit = parseInt(process.env.RATE_LIMIT_GLOBAL_LIMIT ?? "100", 10); + + return { + throttlers: [ + { + ttl, + limit, + }, + ], + storage: new ThrottlerValkeyStorageService(), + }; + }, + }), TelemetryModule, PrismaModule, DatabaseModule, @@ -65,6 +84,10 @@ import { CoordinatorIntegrationModule } from "./coordinator-integration/coordina provide: APP_INTERCEPTOR, useClass: TelemetryInterceptor, }, + { + provide: APP_GUARD, + useClass: ThrottlerApiKeyGuard, + }, ], }) export class AppModule {} diff --git a/apps/api/src/common/throttler/index.ts b/apps/api/src/common/throttler/index.ts new file mode 100644 index 0000000..fff271a --- /dev/null +++ b/apps/api/src/common/throttler/index.ts @@ -0,0 +1,2 @@ +export { ThrottlerApiKeyGuard } from "./throttler-api-key.guard"; +export { ThrottlerValkeyStorageService } from "./throttler-storage.service"; diff --git a/apps/api/src/common/throttler/throttler-api-key.guard.ts b/apps/api/src/common/throttler/throttler-api-key.guard.ts new file mode 100644 index 0000000..9d3b74b --- /dev/null +++ b/apps/api/src/common/throttler/throttler-api-key.guard.ts @@ -0,0 +1,44 @@ +import { Injectable, ExecutionContext } from "@nestjs/common"; +import { ThrottlerGuard, ThrottlerException } from "@nestjs/throttler"; +import { Request } from "express"; + +/** + * Custom ThrottlerGuard that tracks rate limits by API key instead of IP + * + * This guard extracts the API key from the X-API-Key header and uses it + * as the tracking key for rate limiting. This ensures that different API + * keys have independent rate limits. + */ +@Injectable() +export class ThrottlerApiKeyGuard extends ThrottlerGuard { + /** + * Generate tracking key based on API key from X-API-Key header + * + * If no API key is present, falls back to IP-based tracking. + */ + protected getTracker(req: Request): Promise { + const apiKey = req.headers["x-api-key"] as string | undefined; + + if (apiKey) { + // Track by API key + return Promise.resolve(`apikey:${apiKey}`); + } + + // Fallback to IP tracking + const ip = req.ip ?? req.socket.remoteAddress ?? "unknown"; + return Promise.resolve(`ip:${ip}`); + } + + /** + * Override to add custom error handling and logging + */ + protected async throwThrottlingException(context: ExecutionContext): Promise { + const request = context.switchToHttp().getRequest(); + const tracker = await this.getTracker(request); + + // Log rate limit violations for security monitoring + console.warn(`Rate limit exceeded for ${tracker} on ${request.method} ${request.url}`); + + throw new ThrottlerException("Rate limit exceeded. Please try again later."); + } +} diff --git a/apps/api/src/common/throttler/throttler-storage.service.ts b/apps/api/src/common/throttler/throttler-storage.service.ts new file mode 100644 index 0000000..d64c9ab --- /dev/null +++ b/apps/api/src/common/throttler/throttler-storage.service.ts @@ -0,0 +1,146 @@ +import { Injectable, OnModuleInit, Logger } from "@nestjs/common"; +import { ThrottlerStorageService } from "@nestjs/throttler"; +import Redis from "ioredis"; + +/** + * Redis-based storage for rate limiting using Valkey + * + * This service uses Valkey (Redis-compatible) as the storage backend + * for rate limiting. This allows rate limits to work across multiple + * API instances in a distributed environment. + * + * If Redis is unavailable, falls back to in-memory storage. + */ +@Injectable() +export class ThrottlerValkeyStorageService implements ThrottlerStorageService, OnModuleInit { + private readonly logger = new Logger(ThrottlerValkeyStorageService.name); + private client?: Redis; + private readonly THROTTLER_PREFIX = "mosaic:throttler:"; + private readonly fallbackStorage = new Map(); + private useRedis = false; + + async onModuleInit(): Promise { + const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379"; + + try { + this.logger.log(`Connecting to Valkey for rate limiting at ${valkeyUrl}`); + + this.client = new Redis(valkeyUrl, { + maxRetriesPerRequest: 3, + retryStrategy: (times: number) => { + const delay = Math.min(times * 50, 2000); + return delay; + }, + lazyConnect: true, // Don't connect immediately + }); + + // Try to connect + await this.client.connect(); + await this.client.ping(); + + this.useRedis = true; + this.logger.log("Valkey connected successfully for rate limiting"); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.warn(`Failed to connect to Valkey for rate limiting: ${errorMessage}`); + this.logger.warn("Falling back to in-memory rate limiting storage"); + this.useRedis = false; + this.client = undefined; + } + } + + /** + * Increment the number of requests for a given key + * + * @param key - Throttle key (e.g., "apikey:xxx" or "ip:192.168.1.1") + * @param ttl - Time to live in milliseconds + * @returns Promise resolving to the current number of requests + */ + async increment(key: string, ttl: number): Promise { + const throttleKey = this.getThrottleKey(key); + + if (this.useRedis && this.client) { + try { + const result = await this.client.multi().incr(throttleKey).pexpire(throttleKey, ttl).exec(); + + if (result?.[0]?.[1]) { + return result[0][1] as number; + } + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Redis increment failed: ${errorMessage}`); + // Fall through to in-memory + } + } + + // In-memory fallback + return this.incrementMemory(throttleKey, ttl); + } + + /** + * Get the current number of requests for a given key + * + * @param key - Throttle key + * @returns Promise resolving to the current number of requests + */ + async get(key: string): Promise { + const throttleKey = this.getThrottleKey(key); + + if (this.useRedis && this.client) { + try { + const value = await this.client.get(throttleKey); + return value ? parseInt(value, 10) : 0; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Redis get failed: ${errorMessage}`); + // Fall through to in-memory + } + } + + // In-memory fallback + return this.getMemory(throttleKey); + } + + /** + * In-memory increment implementation + */ + private incrementMemory(key: string, ttl: number): number { + const now = Date.now(); + const timestamps = this.fallbackStorage.get(key) ?? []; + + // Remove expired timestamps + const validTimestamps = timestamps.filter((timestamp) => now - timestamp < ttl); + + // Add new timestamp + validTimestamps.push(now); + + // Store updated timestamps + this.fallbackStorage.set(key, validTimestamps); + + return validTimestamps.length; + } + + /** + * In-memory get implementation + */ + private getMemory(key: string): number { + const timestamps = this.fallbackStorage.get(key); + return timestamps ? timestamps.length : 0; + } + + /** + * Get throttle key with prefix + */ + private getThrottleKey(key: string): string { + return `${this.THROTTLER_PREFIX}${key}`; + } + + /** + * Clean up on module destroy + */ + async onModuleDestroy(): Promise { + if (this.client) { + await this.client.quit(); + } + } +} diff --git a/apps/api/src/coordinator-integration/coordinator-integration.controller.ts b/apps/api/src/coordinator-integration/coordinator-integration.controller.ts index ebe14ef..cdee880 100644 --- a/apps/api/src/coordinator-integration/coordinator-integration.controller.ts +++ b/apps/api/src/coordinator-integration/coordinator-integration.controller.ts @@ -1,4 +1,5 @@ import { Controller, Post, Patch, Get, Body, Param, UseGuards } from "@nestjs/common"; +import { Throttle } from "@nestjs/throttler"; import { CoordinatorIntegrationService } from "./coordinator-integration.service"; import { CreateCoordinatorJobDto, @@ -13,7 +14,10 @@ import { ApiKeyGuard } from "../common/guards"; /** * CoordinatorIntegrationController - REST API for Python coordinator communication * - * SECURITY: All endpoints require API key authentication via X-API-Key header + * SECURITY: + * - All endpoints require API key authentication via X-API-Key header + * - Rate limiting: 100 requests per minute per API key (default) + * - Health endpoint: 300 requests per minute (higher for monitoring) * * Endpoints: * - POST /coordinator/jobs - Create a job from coordinator @@ -26,21 +30,28 @@ import { ApiKeyGuard } from "../common/guards"; */ @Controller("coordinator") @UseGuards(ApiKeyGuard) +@Throttle({ default: { ttl: 60000, limit: 100 } }) // 100 requests per minute export class CoordinatorIntegrationController { constructor(private readonly service: CoordinatorIntegrationService) {} /** * Create a job from the coordinator + * + * Rate limit: 100 requests per minute per API key */ @Post("jobs") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async createJob(@Body() dto: CreateCoordinatorJobDto): Promise { return this.service.createJob(dto); } /** * Update job status from the coordinator + * + * Rate limit: 100 requests per minute per API key */ @Patch("jobs/:id/status") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async updateJobStatus( @Param("id") id: string, @Body() dto: UpdateJobStatusDto @@ -50,8 +61,11 @@ export class CoordinatorIntegrationController { /** * Update job progress from the coordinator + * + * Rate limit: 100 requests per minute per API key */ @Patch("jobs/:id/progress") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async updateJobProgress( @Param("id") id: string, @Body() dto: UpdateJobProgressDto @@ -61,8 +75,11 @@ export class CoordinatorIntegrationController { /** * Mark job as complete from the coordinator + * + * Rate limit: 100 requests per minute per API key */ @Post("jobs/:id/complete") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async completeJob( @Param("id") id: string, @Body() dto: CompleteJobDto @@ -72,8 +89,11 @@ export class CoordinatorIntegrationController { /** * Mark job as failed from the coordinator + * + * Rate limit: 100 requests per minute per API key */ @Post("jobs/:id/fail") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async failJob( @Param("id") id: string, @Body() dto: FailJobDto @@ -83,8 +103,11 @@ export class CoordinatorIntegrationController { /** * Get job details with events and steps + * + * Rate limit: 100 requests per minute per API key */ @Get("jobs/:id") + @Throttle({ default: { ttl: 60000, limit: 100 } }) async getJobDetails( @Param("id") id: string ): Promise>> { @@ -93,8 +116,11 @@ export class CoordinatorIntegrationController { /** * Integration health check + * + * Rate limit: 300 requests per minute (higher for monitoring) */ @Get("health") + @Throttle({ default: { ttl: 60000, limit: 300 } }) async getHealth(): Promise { return this.service.getIntegrationHealth(); } diff --git a/apps/api/src/coordinator-integration/coordinator-integration.rate-limit.spec.ts b/apps/api/src/coordinator-integration/coordinator-integration.rate-limit.spec.ts new file mode 100644 index 0000000..38919ff --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.rate-limit.spec.ts @@ -0,0 +1,284 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { INestApplication, HttpStatus } from "@nestjs/common"; +import request from "supertest"; +import { CoordinatorIntegrationController } from "./coordinator-integration.controller"; +import { CoordinatorIntegrationService } from "./coordinator-integration.service"; +import { ThrottlerModule } from "@nestjs/throttler"; +import { APP_GUARD } from "@nestjs/core"; +import { ConfigService } from "@nestjs/config"; +import { ApiKeyGuard } from "../common/guards"; +import { ThrottlerApiKeyGuard } from "../common/throttler"; + +/** + * Rate Limiting Tests for Coordinator Integration Endpoints + * + * These tests verify that rate limiting is properly enforced on coordinator + * endpoints to prevent DoS attacks. + * + * Test Coverage: + * - Rate limit enforcement (429 status) + * - Retry-After header inclusion + * - Per-API-key rate limiting + * - Higher limits for health endpoints + */ +describe("CoordinatorIntegrationController - Rate Limiting", () => { + let app: INestApplication; + let service: CoordinatorIntegrationService; + + const mockCoordinatorService = { + createJob: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + status: "PENDING", + }), + updateJobStatus: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + status: "RUNNING", + }), + updateJobProgress: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + progress: 50, + }), + completeJob: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + status: "COMPLETED", + }), + failJob: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + status: "FAILED", + }), + getJobDetails: vi.fn().mockResolvedValue({ + jobId: "coord-job-123", + status: "RUNNING", + }), + getIntegrationHealth: vi.fn().mockResolvedValue({ + status: "healthy", + timestamp: new Date().toISOString(), + }), + }; + + const mockConfigService = { + get: vi.fn((key: string) => { + const config: Record = { + COORDINATOR_API_KEY: "test-coordinator-key", + RATE_LIMIT_TTL: "1", // 1 second for faster tests + RATE_LIMIT_COORDINATOR_LIMIT: "100", + RATE_LIMIT_HEALTH_LIMIT: "300", + }; + return config[key]; + }), + }; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [ + ThrottlerModule.forRoot([ + { + ttl: 1000, // 1 second for testing + limit: 100, // Default limit + }, + ]), + ], + controllers: [CoordinatorIntegrationController], + providers: [ + { provide: CoordinatorIntegrationService, useValue: mockCoordinatorService }, + { provide: ConfigService, useValue: mockConfigService }, + { + provide: APP_GUARD, + useClass: ThrottlerApiKeyGuard, + }, + ], + }) + .overrideGuard(ApiKeyGuard) + .useValue({ canActivate: () => true }) + .compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + + service = moduleFixture.get(CoordinatorIntegrationService); + vi.clearAllMocks(); + }); + + afterEach(async () => { + await app.close(); + }); + + describe("POST /coordinator/jobs - Rate Limiting", () => { + it("should allow requests within rate limit", async () => { + const payload = { + workspaceId: "workspace-123", + type: "data-processing", + data: { input: "test" }, + }; + + // Make 3 requests (within limit of 100) + for (let i = 0; i < 3; i++) { + const response = await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key") + .send(payload); + + expect(response.status).toBe(HttpStatus.CREATED); + } + + expect(mockCoordinatorService.createJob).toHaveBeenCalledTimes(3); + }); + + it("should return 429 when rate limit is exceeded", async () => { + const payload = { + workspaceId: "workspace-123", + type: "data-processing", + data: { input: "test" }, + }; + + // Exhaust rate limit (100 requests) + for (let i = 0; i < 100; i++) { + await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key") + .send(payload); + } + + // The 101st request should be rate limited + const response = await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + }); + + it("should include Retry-After header in 429 response", async () => { + const payload = { + workspaceId: "workspace-123", + type: "data-processing", + data: { input: "test" }, + }; + + // Exhaust rate limit (100 requests) + for (let i = 0; i < 100; i++) { + await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key") + .send(payload); + } + + // Get rate limited response + const response = await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + expect(response.headers).toHaveProperty("retry-after"); + expect(parseInt(response.headers["retry-after"])).toBeGreaterThan(0); + }); + }); + + describe("PATCH /coordinator/jobs/:id/status - Rate Limiting", () => { + it("should allow requests within rate limit", async () => { + const jobId = "coord-job-123"; + const payload = { status: "RUNNING" }; + + // Make 3 requests (within limit of 100) + for (let i = 0; i < 3; i++) { + const response = await request(app.getHttpServer()) + .patch(`/coordinator/jobs/${jobId}/status`) + .set("X-API-Key", "test-coordinator-key") + .send(payload); + + expect(response.status).toBe(HttpStatus.OK); + } + + expect(mockCoordinatorService.updateJobStatus).toHaveBeenCalledTimes(3); + }); + + it("should return 429 when rate limit is exceeded", async () => { + const jobId = "coord-job-123"; + const payload = { status: "RUNNING" }; + + // Exhaust rate limit (100 requests) + for (let i = 0; i < 100; i++) { + await request(app.getHttpServer()) + .patch(`/coordinator/jobs/${jobId}/status`) + .set("X-API-Key", "test-coordinator-key") + .send(payload); + } + + // The 101st request should be rate limited + const response = await request(app.getHttpServer()) + .patch(`/coordinator/jobs/${jobId}/status`) + .set("X-API-Key", "test-coordinator-key") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + }); + }); + + describe("GET /coordinator/health - Rate Limiting", () => { + it("should have higher rate limit than other endpoints", async () => { + // Health endpoint should allow 300 requests (higher than default 100) + // Test with a smaller sample to keep test fast + for (let i = 0; i < 10; i++) { + const response = await request(app.getHttpServer()) + .get("/coordinator/health") + .set("X-API-Key", "test-coordinator-key"); + + expect(response.status).toBe(HttpStatus.OK); + } + + expect(mockCoordinatorService.getIntegrationHealth).toHaveBeenCalledTimes(10); + }); + + it("should return 429 when health endpoint limit is exceeded", async () => { + // Exhaust health endpoint limit (300 requests) + for (let i = 0; i < 300; i++) { + await request(app.getHttpServer()) + .get("/coordinator/health") + .set("X-API-Key", "test-coordinator-key"); + } + + // The 301st request should be rate limited + const response = await request(app.getHttpServer()) + .get("/coordinator/health") + .set("X-API-Key", "test-coordinator-key"); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + }); + }); + + describe("Per-API-Key Rate Limiting", () => { + it("should enforce rate limits per API key independently", async () => { + const payload = { + workspaceId: "workspace-123", + type: "data-processing", + data: { input: "test" }, + }; + + // Exhaust rate limit for first API key (100 requests) + for (let i = 0; i < 100; i++) { + await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key-1") + .send(payload); + } + + // First API key should be rate limited + const response1 = await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key-1") + .send(payload); + + expect(response1.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + + // Second API key should still be allowed + const response2 = await request(app.getHttpServer()) + .post("/coordinator/jobs") + .set("X-API-Key", "test-coordinator-key-2") + .send(payload); + + expect(response2.status).toBe(HttpStatus.CREATED); + }); + }); +}); diff --git a/apps/api/src/coordinator-integration/coordinator-integration.service.ts b/apps/api/src/coordinator-integration/coordinator-integration.service.ts index 9fab5bf..82809f0 100644 --- a/apps/api/src/coordinator-integration/coordinator-integration.service.ts +++ b/apps/api/src/coordinator-integration/coordinator-integration.service.ts @@ -112,7 +112,7 @@ export class CoordinatorIntegrationService { // Use SELECT FOR UPDATE to lock the row during this transaction // This prevents concurrent updates from coordinator and ensures serialization const jobs = await tx.$queryRaw< - Array<{ id: string; status: RunnerJobStatus; workspace_id: string; version: number }> + { id: string; status: RunnerJobStatus; workspace_id: string; version: number }[] >` SELECT id, status, workspace_id, version FROM runner_jobs @@ -237,7 +237,7 @@ export class CoordinatorIntegrationService { return this.prisma.$transaction(async (tx) => { // Lock the row to prevent concurrent completion/failure const jobs = await tx.$queryRaw< - Array<{ id: string; status: RunnerJobStatus; started_at: Date | null; version: number }> + { id: string; status: RunnerJobStatus; started_at: Date | null; version: number }[] >` SELECT id, status, started_at, version FROM runner_jobs @@ -305,9 +305,7 @@ export class CoordinatorIntegrationService { return this.prisma.$transaction(async (tx) => { // Lock the row to prevent concurrent completion/failure - const jobs = await tx.$queryRaw< - Array<{ id: string; status: RunnerJobStatus; version: number }> - >` + const jobs = await tx.$queryRaw<{ id: string; status: RunnerJobStatus; version: number }[]>` SELECT id, status, version FROM runner_jobs WHERE id = ${jobId}::uuid diff --git a/apps/api/src/stitcher/stitcher.controller.ts b/apps/api/src/stitcher/stitcher.controller.ts index bc88449..45818a8 100644 --- a/apps/api/src/stitcher/stitcher.controller.ts +++ b/apps/api/src/stitcher/stitcher.controller.ts @@ -1,4 +1,5 @@ import { Controller, Post, Body, UseGuards } from "@nestjs/common"; +import { Throttle } from "@nestjs/throttler"; import { StitcherService } from "./stitcher.service"; import { WebhookPayloadDto, DispatchJobDto } from "./dto"; import type { JobDispatchResult, JobDispatchContext } from "./interfaces"; @@ -7,28 +8,37 @@ import { ApiKeyGuard } from "../common/guards"; /** * StitcherController - Webhook and job dispatch endpoints * - * SECURITY: All endpoints require API key authentication via X-API-Key header + * SECURITY: + * - All endpoints require API key authentication via X-API-Key header + * - Rate limiting: 60 requests per minute per IP/API key * * Handles incoming webhooks from @mosaic bot and provides * endpoints for manual job dispatch */ @Controller("stitcher") @UseGuards(ApiKeyGuard) +@Throttle({ default: { ttl: 60000, limit: 60 } }) // 60 requests per minute export class StitcherController { constructor(private readonly stitcherService: StitcherService) {} /** * Webhook endpoint for @mosaic bot + * + * Rate limit: 60 requests per minute per IP/API key */ @Post("webhook") + @Throttle({ default: { ttl: 60000, limit: 60 } }) async webhook(@Body() payload: WebhookPayloadDto): Promise { return this.stitcherService.handleWebhook(payload); } /** * Manual job dispatch endpoint + * + * Rate limit: 60 requests per minute per IP/API key */ @Post("dispatch") + @Throttle({ default: { ttl: 60000, limit: 60 } }) async dispatch(@Body() dto: DispatchJobDto): Promise { const context: JobDispatchContext = { workspaceId: dto.workspaceId, diff --git a/apps/api/src/stitcher/stitcher.rate-limit.spec.ts b/apps/api/src/stitcher/stitcher.rate-limit.spec.ts new file mode 100644 index 0000000..958f785 --- /dev/null +++ b/apps/api/src/stitcher/stitcher.rate-limit.spec.ts @@ -0,0 +1,238 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { INestApplication, HttpStatus } from "@nestjs/common"; +import request from "supertest"; +import { StitcherController } from "./stitcher.controller"; +import { StitcherService } from "./stitcher.service"; +import { ThrottlerModule } from "@nestjs/throttler"; +import { APP_GUARD } from "@nestjs/core"; +import { ConfigService } from "@nestjs/config"; +import { ApiKeyGuard } from "../common/guards"; +import { ThrottlerApiKeyGuard } from "../common/throttler"; + +/** + * Rate Limiting Tests for Stitcher Endpoints + * + * These tests verify that rate limiting is properly enforced on webhook endpoints + * to prevent DoS attacks. + * + * Test Coverage: + * - Rate limit enforcement (429 status) + * - Retry-After header inclusion + * - Per-IP rate limiting + * - Requests within limit are allowed + */ +describe("StitcherController - Rate Limiting", () => { + let app: INestApplication; + let service: StitcherService; + + const mockStitcherService = { + dispatchJob: vi.fn().mockResolvedValue({ + jobId: "job-123", + queueName: "mosaic-jobs", + status: "PENDING", + }), + handleWebhook: vi.fn().mockResolvedValue({ + jobId: "job-456", + queueName: "mosaic-jobs", + status: "PENDING", + }), + }; + + const mockConfigService = { + get: vi.fn((key: string) => { + const config: Record = { + STITCHER_API_KEY: "test-api-key-12345", + RATE_LIMIT_TTL: "1", // 1 second for faster tests + RATE_LIMIT_WEBHOOK_LIMIT: "5", + }; + return config[key]; + }), + }; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [ + ThrottlerModule.forRoot([ + { + ttl: 1000, // 1 second for testing + limit: 5, // 5 requests per window + }, + ]), + ], + controllers: [StitcherController], + providers: [ + { provide: StitcherService, useValue: mockStitcherService }, + { provide: ConfigService, useValue: mockConfigService }, + { + provide: APP_GUARD, + useClass: ThrottlerApiKeyGuard, + }, + ], + }) + .overrideGuard(ApiKeyGuard) + .useValue({ canActivate: () => true }) + .compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + + service = moduleFixture.get(StitcherService); + vi.clearAllMocks(); + }); + + afterEach(async () => { + await app.close(); + }); + + describe("POST /stitcher/webhook - Rate Limiting", () => { + it("should allow requests within rate limit", async () => { + const payload = { + issueNumber: "42", + repository: "mosaic/stack", + action: "assigned", + }; + + // Make 3 requests (within limit of 60 as configured in controller) + for (let i = 0; i < 3; i++) { + const response = await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + + expect(response.status).toBe(HttpStatus.CREATED); + expect(response.body).toHaveProperty("jobId"); + } + + expect(mockStitcherService.handleWebhook).toHaveBeenCalledTimes(3); + }); + + it("should return 429 when rate limit is exceeded", async () => { + const payload = { + issueNumber: "42", + repository: "mosaic/stack", + action: "assigned", + }; + + // Make requests up to the limit (60 as configured in controller) + for (let i = 0; i < 60; i++) { + await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + } + + // The 61st request should be rate limited + const response = await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + }); + + it("should include Retry-After header in 429 response", async () => { + const payload = { + issueNumber: "42", + repository: "mosaic/stack", + action: "assigned", + }; + + // Exhaust rate limit (60 requests) + for (let i = 0; i < 60; i++) { + await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + } + + // Get rate limited response + const response = await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + expect(response.headers).toHaveProperty("retry-after"); + expect(parseInt(response.headers["retry-after"])).toBeGreaterThan(0); + }); + + it("should enforce rate limits per API key", async () => { + const payload = { + issueNumber: "42", + repository: "mosaic/stack", + action: "assigned", + }; + + // Exhaust rate limit from first API key + for (let i = 0; i < 60; i++) { + await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-1") + .send(payload); + } + + // First API key should be rate limited + const response1 = await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-1") + .send(payload); + + expect(response1.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + + // Second API key should still be allowed + const response2 = await request(app.getHttpServer()) + .post("/stitcher/webhook") + .set("X-API-Key", "test-api-key-2") + .send(payload); + + expect(response2.status).toBe(HttpStatus.CREATED); + }); + }); + + describe("POST /stitcher/dispatch - Rate Limiting", () => { + it("should allow requests within rate limit", async () => { + const payload = { + workspaceId: "workspace-123", + type: "code-task", + context: { issueId: "42" }, + }; + + // Make 3 requests (within limit of 60) + for (let i = 0; i < 3; i++) { + const response = await request(app.getHttpServer()) + .post("/stitcher/dispatch") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + + expect(response.status).toBe(HttpStatus.CREATED); + } + + expect(mockStitcherService.dispatchJob).toHaveBeenCalledTimes(3); + }); + + it("should return 429 when rate limit is exceeded", async () => { + const payload = { + workspaceId: "workspace-123", + type: "code-task", + context: { issueId: "42" }, + }; + + // Exhaust rate limit (60 requests) + for (let i = 0; i < 60; i++) { + await request(app.getHttpServer()) + .post("/stitcher/dispatch") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + } + + // The 61st request should be rate limited + const response = await request(app.getHttpServer()) + .post("/stitcher/dispatch") + .set("X-API-Key", "test-api-key-12345") + .send(payload); + + expect(response.status).toBe(HttpStatus.TOO_MANY_REQUESTS); + }); + }); +}); diff --git a/apps/api/src/websocket/websocket.gateway.ts b/apps/api/src/websocket/websocket.gateway.ts index 6542512..79caa61 100644 --- a/apps/api/src/websocket/websocket.gateway.ts +++ b/apps/api/src/websocket/websocket.gateway.ts @@ -173,19 +173,19 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec */ private extractTokenFromHandshake(client: Socket): string | undefined { // Check handshake.auth.token (preferred method) - const authToken = client.handshake.auth?.token; + const authToken = client.handshake.auth.token as unknown; if (typeof authToken === "string" && authToken.length > 0) { return authToken; } // Fallback: check query parameters - const queryToken = client.handshake.query?.token; + const queryToken = client.handshake.query.token as unknown; if (typeof queryToken === "string" && queryToken.length > 0) { return queryToken; } // Fallback: check Authorization header - const authHeader = client.handshake.headers?.authorization; + const authHeader = client.handshake.headers.authorization as unknown; if (typeof authHeader === "string") { const parts = authHeader.split(" "); const [type, token] = parts; diff --git a/docs/scratchpads/199-implement-rate-limiting.md b/docs/scratchpads/199-implement-rate-limiting.md new file mode 100644 index 0000000..ee1fa9b --- /dev/null +++ b/docs/scratchpads/199-implement-rate-limiting.md @@ -0,0 +1,167 @@ +# Issue #199: Implement rate limiting on webhook endpoints + +## Objective +Implement rate limiting on webhook and public-facing API endpoints to prevent DoS attacks and ensure system stability under high load conditions. + +## Approach + +### TDD Implementation Plan +1. **RED**: Write failing tests for rate limiting + - Test rate limit enforcement (429 status) + - Test Retry-After header inclusion + - Test per-IP rate limiting + - Test per-API-key rate limiting + - Test that legitimate requests are not blocked + - Test storage mechanism (Redis/in-memory) + +2. **GREEN**: Implement NestJS throttler + - Install @nestjs/throttler package + - Configure global rate limits + - Configure per-endpoint rate limits + - Add custom guards for per-API-key limiting + - Integrate with Valkey (Redis) for distributed limiting + - Add Retry-After headers to 429 responses + +3. **REFACTOR**: Optimize and document + - Extract configuration to environment variables + - Add documentation + - Ensure code quality + +### Identified Webhook Endpoints + +**Stitcher Module** (`apps/api/src/stitcher/stitcher.controller.ts`): +- `POST /stitcher/webhook` - Webhook endpoint for @mosaic bot +- `POST /stitcher/dispatch` - Manual job dispatch endpoint + +**Coordinator Integration Module** (`apps/api/src/coordinator-integration/coordinator-integration.controller.ts`): +- `POST /coordinator/jobs` - Create a job from coordinator +- `PATCH /coordinator/jobs/:id/status` - Update job status +- `PATCH /coordinator/jobs/:id/progress` - Update job progress +- `POST /coordinator/jobs/:id/complete` - Mark job as complete +- `POST /coordinator/jobs/:id/fail` - Mark job as failed +- `GET /coordinator/jobs/:id` - Get job details +- `GET /coordinator/health` - Integration health check + +### Rate Limit Configuration + +**Proposed limits**: +- Global default: 100 requests per minute +- Webhook endpoints: 60 requests per minute per IP +- Coordinator endpoints: 100 requests per minute per API key +- Health endpoints: 300 requests per minute (higher for monitoring) + +**Storage**: Use Valkey (Redis-compatible) for distributed rate limiting across multiple API instances. + +### Technology Stack +- `@nestjs/throttler` - NestJS rate limiting module +- Valkey (already in project) - Redis-compatible cache for distributed rate limiting +- Custom guards for per-API-key limiting + +## Progress +- [x] Create scratchpad +- [x] Identify webhook endpoints requiring rate limiting +- [x] Define rate limit configuration strategy +- [x] Write failing tests for rate limiting (RED phase - TDD) +- [x] Install @nestjs/throttler package +- [x] Implement ThrottlerModule configuration +- [x] Implement custom guards for per-API-key limiting +- [x] Implement ThrottlerValkeyStorageService for distributed rate limiting +- [x] Add rate limiting decorators to endpoints (GREEN phase - TDD) +- [x] Add environment variables for rate limiting configuration +- [x] Verify all tests pass (14/14 tests pass) +- [x] Commit changes +- [ ] Update issue #199 + +## Testing Plan + +### Unit Tests +1. **Rate limit enforcement** + - Verify 429 status code after exceeding limit + - Verify requests within limit are allowed + +2. **Retry-After header** + - Verify header is present in 429 responses + - Verify header value is correct + +3. **Per-IP limiting** + - Verify different IPs have independent limits + - Verify same IP is rate limited + +4. **Per-API-key limiting** + - Verify different API keys have independent limits + - Verify same API key is rate limited + +5. **Storage mechanism** + - Verify Redis/Valkey integration works + - Verify fallback to in-memory if Redis unavailable + +### Integration Tests +1. **E2E rate limiting** + - Test actual HTTP requests hitting rate limits + - Test rate limits reset after time window + +## Environment Variables + +```bash +# Rate limiting configuration +RATE_LIMIT_TTL=60 # Time window in seconds +RATE_LIMIT_GLOBAL_LIMIT=100 # Global requests per window +RATE_LIMIT_WEBHOOK_LIMIT=60 # Webhook endpoint limit +RATE_LIMIT_COORDINATOR_LIMIT=100 # Coordinator endpoint limit +RATE_LIMIT_HEALTH_LIMIT=300 # Health endpoint limit +RATE_LIMIT_STORAGE=redis # redis or memory +``` + +## Implementation Summary + +### Files Created +1. `/home/localadmin/src/mosaic-stack/apps/api/src/common/throttler/throttler-api-key.guard.ts` - Custom guard for API-key based rate limiting +2. `/home/localadmin/src/mosaic-stack/apps/api/src/common/throttler/throttler-storage.service.ts` - Valkey/Redis storage for distributed rate limiting +3. `/home/localadmin/src/mosaic-stack/apps/api/src/common/throttler/index.ts` - Export barrel file +4. `/home/localadmin/src/mosaic-stack/apps/api/src/stitcher/stitcher.rate-limit.spec.ts` - Rate limiting tests for stitcher endpoints (6 tests) +5. `/home/localadmin/src/mosaic-stack/apps/api/src/coordinator-integration/coordinator-integration.rate-limit.spec.ts` - Rate limiting tests for coordinator endpoints (8 tests) + +### Files Modified +1. `/home/localadmin/src/mosaic-stack/apps/api/src/app.module.ts` - Added ThrottlerModule and ThrottlerApiKeyGuard +2. `/home/localadmin/src/mosaic-stack/apps/api/src/stitcher/stitcher.controller.ts` - Added @Throttle decorators (60 req/min) +3. `/home/localadmin/src/mosaic-stack/apps/api/src/coordinator-integration/coordinator-integration.controller.ts` - Added @Throttle decorators (100 req/min, health: 300 req/min) +4. `/home/localadmin/src/mosaic-stack/.env.example` - Added rate limiting environment variables +5. `/home/localadmin/src/mosaic-stack/.env` - Added rate limiting environment variables +6. `/home/localadmin/src/mosaic-stack/apps/api/package.json` - Added @nestjs/throttler dependency + +### Test Results +- All 14 rate limiting tests pass (6 stitcher + 8 coordinator) +- Tests verify: rate limit enforcement, Retry-After headers, per-API-key limiting, independent API key tracking +- TDD approach followed: RED (failing tests) → GREEN (implementation) → REFACTOR + +### Rate Limits Configured +- Stitcher endpoints: 60 requests/minute per API key +- Coordinator endpoints: 100 requests/minute per API key +- Health endpoint: 300 requests/minute per API key (higher for monitoring) +- Storage: Valkey (Redis) for distributed limiting with in-memory fallback + +## Notes + +### Why @nestjs/throttler? +- Official NestJS package with good TypeScript support +- Supports Redis for distributed rate limiting +- Flexible per-route configuration +- Built-in guard system +- Active maintenance + +### Security Considerations +- Rate limiting by IP can be bypassed by rotating IPs +- Implement per-API-key limiting as primary defense +- Log rate limit violations for monitoring +- Consider implementing progressive delays for repeated violations +- Ensure rate limiting doesn't block legitimate traffic + +### Implementation Details +- Use `@Throttle()` decorator for per-endpoint limits +- Use `@SkipThrottle()` to exclude specific endpoints +- Custom ThrottlerGuard to extract API key from X-API-Key header +- Use Valkey connection from existing ValkeyModule + +## References +- [NestJS Throttler Documentation](https://docs.nestjs.com/security/rate-limiting) +- [OWASP Rate Limiting Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Denial_of_Service_Cheat_Sheet.html) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b8b374c..6c3f986 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -81,6 +81,9 @@ importers: '@nestjs/platform-socket.io': specifier: ^11.1.12 version: 11.1.12(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/websockets@11.1.12)(rxjs@7.8.2) + '@nestjs/throttler': + specifier: ^6.5.0 + version: 6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2) '@nestjs/websockets': specifier: ^11.1.12 version: 11.1.12(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(@nestjs/platform-socket.io@11.1.12)(reflect-metadata@0.2.2)(rxjs@7.8.2) @@ -1488,6 +1491,13 @@ packages: '@nestjs/platform-express': optional: true + '@nestjs/throttler@6.5.0': + resolution: {integrity: sha512-9j0ZRfH0QE1qyrj9JjIRDz5gQLPqq9yVC2nHsrosDVAfI5HHw08/aUAWx9DZLSdQf4HDkmhTTEGLrRFHENvchQ==} + peerDependencies: + '@nestjs/common': ^7.0.0 || ^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0 + '@nestjs/core': ^7.0.0 || ^8.0.0 || ^9.0.0 || ^10.0.0 || ^11.0.0 + reflect-metadata: ^0.1.13 || ^0.2.0 + '@nestjs/websockets@11.1.12': resolution: {integrity: sha512-ulSOYcgosx1TqY425cRC5oXtAu1R10+OSmVfgyR9ueR25k4luekURt8dzAZxhxSCI0OsDj9WKCFLTkEuAwg0wg==} peerDependencies: @@ -7528,6 +7538,12 @@ snapshots: optionalDependencies: '@nestjs/platform-express': 11.1.12(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12) + '@nestjs/throttler@6.5.0(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(reflect-metadata@0.2.2)': + dependencies: + '@nestjs/common': 11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2) + '@nestjs/core': 11.1.12(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@11.1.12)(@nestjs/websockets@11.1.12)(reflect-metadata@0.2.2)(rxjs@7.8.2) + reflect-metadata: 0.2.2 + '@nestjs/websockets@11.1.12(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(@nestjs/platform-socket.io@11.1.12)(reflect-metadata@0.2.2)(rxjs@7.8.2)': dependencies: '@nestjs/common': 11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2)