Files
stack/apps/api/src/websocket/websocket.gateway.ts
Jason Woltje 41d56dadf0
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix(#199): implement rate limiting on webhook endpoints
Implements comprehensive rate limiting on all webhook and coordinator endpoints
to prevent DoS attacks. Follows TDD protocol with 14 passing tests.

Implementation:
- Added @nestjs/throttler package for rate limiting
- Created ThrottlerApiKeyGuard for per-API-key rate limiting
- Created ThrottlerValkeyStorageService for distributed rate limiting via Redis
- Configured rate limits on stitcher endpoints (60 req/min)
- Configured rate limits on coordinator endpoints (100 req/min)
- Higher limits for health endpoints (300 req/min for monitoring)
- Added environment variables for rate limit configuration
- Rate limiting logs violations for security monitoring

Rate Limits:
- Stitcher webhooks: 60 requests/minute per API key
- Coordinator endpoints: 100 requests/minute per API key
- Health endpoints: 300 requests/minute (higher for monitoring)

Storage:
- Uses Valkey (Redis) for distributed rate limiting across API instances
- Falls back to in-memory storage if Redis unavailable

Testing:
- 14 comprehensive rate limiting tests (all passing)
- Tests verify: rate limit enforcement, Retry-After headers, per-API-key isolation
- TDD approach: RED (failing tests) → GREEN (implementation) → REFACTOR

Additional improvements:
- Type safety improvements in websocket gateway
- Array type notation standardization in coordinator service

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-02 13:07:16 -06:00

458 lines
15 KiB
TypeScript

import {
WebSocketGateway as WSGateway,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
} from "@nestjs/websockets";
import { Logger } from "@nestjs/common";
import { Server, Socket } from "socket.io";
import { AuthService } from "../auth/auth.service";
import { PrismaService } from "../prisma/prisma.service";
interface AuthenticatedSocket extends Socket {
data: {
userId?: string;
workspaceId?: string;
};
}
interface Task {
id: string;
workspaceId: string;
[key: string]: unknown;
}
interface Event {
id: string;
workspaceId: string;
[key: string]: unknown;
}
interface Project {
id: string;
workspaceId: string;
[key: string]: unknown;
}
interface Job {
id: string;
workspaceId: string;
[key: string]: unknown;
}
interface JobStatusData {
id: string;
workspaceId: string;
status: string;
previousStatus?: string;
[key: string]: unknown;
}
interface JobProgressData {
id: string;
workspaceId: string;
progressPercent: number;
message?: string;
[key: string]: unknown;
}
interface StepData {
id: string;
jobId: string;
workspaceId: string;
[key: string]: unknown;
}
interface StepOutputData {
id: string;
jobId: string;
workspaceId: string;
output: string;
timestamp: string;
[key: string]: unknown;
}
/**
* @description WebSocket Gateway for real-time updates. Handles workspace-scoped rooms for broadcasting events.
*/
@WSGateway({
cors: {
origin: process.env.WEB_URL ?? "http://localhost:3000",
credentials: true,
},
})
export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server!: Server;
private readonly logger = new Logger(WebSocketGateway.name);
private readonly CONNECTION_TIMEOUT_MS = 5000; // 5 seconds
constructor(
private readonly authService: AuthService,
private readonly prisma: PrismaService
) {}
/**
* @description Handle client connection by authenticating and joining the workspace-specific room.
* @param client - The socket client that will be authenticated and joined to workspace room.
* @returns Promise that resolves when the client is joined to the workspace room or disconnected.
*/
async handleConnection(client: Socket): Promise<void> {
const authenticatedClient = client as AuthenticatedSocket;
// Set connection timeout
const timeoutId = setTimeout(() => {
if (!authenticatedClient.data.userId) {
this.logger.warn(`Client ${authenticatedClient.id} timed out during authentication`);
authenticatedClient.disconnect();
}
}, this.CONNECTION_TIMEOUT_MS);
try {
// Extract token from handshake
const token = this.extractTokenFromHandshake(authenticatedClient);
if (!token) {
this.logger.warn(`Client ${authenticatedClient.id} connected without token`);
authenticatedClient.disconnect();
clearTimeout(timeoutId);
return;
}
// Verify session
const sessionData = await this.authService.verifySession(token);
if (!sessionData) {
this.logger.warn(`Client ${authenticatedClient.id} has invalid token`);
authenticatedClient.disconnect();
clearTimeout(timeoutId);
return;
}
const user = sessionData.user as { id: string };
const userId = user.id;
// Verify workspace access
const workspaceMembership = await this.prisma.workspaceMember.findFirst({
where: { userId },
select: { workspaceId: true, userId: true, role: true },
});
if (!workspaceMembership) {
this.logger.warn(`User ${userId} has no workspace access`);
authenticatedClient.disconnect();
clearTimeout(timeoutId);
return;
}
// Populate socket data
authenticatedClient.data.userId = userId;
authenticatedClient.data.workspaceId = workspaceMembership.workspaceId;
// Join workspace room
const room = this.getWorkspaceRoom(workspaceMembership.workspaceId);
await authenticatedClient.join(room);
clearTimeout(timeoutId);
this.logger.log(`Client ${authenticatedClient.id} joined room ${room}`);
} catch (error) {
clearTimeout(timeoutId);
this.logger.error(
`Authentication failed for client ${authenticatedClient.id}:`,
error instanceof Error ? error.message : "Unknown error"
);
authenticatedClient.disconnect();
}
}
/**
* @description Extract authentication token from Socket.IO handshake
* @param client - The socket client
* @returns The token string or undefined if not found
*/
private extractTokenFromHandshake(client: Socket): string | undefined {
// Check handshake.auth.token (preferred method)
const authToken = client.handshake.auth.token as unknown;
if (typeof authToken === "string" && authToken.length > 0) {
return authToken;
}
// Fallback: check query parameters
const queryToken = client.handshake.query.token as unknown;
if (typeof queryToken === "string" && queryToken.length > 0) {
return queryToken;
}
// Fallback: check Authorization header
const authHeader = client.handshake.headers.authorization as unknown;
if (typeof authHeader === "string") {
const parts = authHeader.split(" ");
const [type, token] = parts;
if (type === "Bearer" && token) {
return token;
}
}
return undefined;
}
/**
* @description Handle client disconnect by leaving the workspace room.
* @param client - The socket client containing workspaceId in data.
* @returns void
*/
handleDisconnect(client: Socket): void {
const authenticatedClient = client as AuthenticatedSocket;
const { workspaceId } = authenticatedClient.data;
if (workspaceId) {
const room = this.getWorkspaceRoom(workspaceId);
void authenticatedClient.leave(room);
this.logger.log(`Client ${authenticatedClient.id} left room ${room}`);
}
}
/**
* @description Emit task:created event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param task - The task object that was created.
* @returns void
*/
emitTaskCreated(workspaceId: string, task: Task): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("task:created", task);
this.logger.debug(`Emitted task:created to ${room}`);
}
/**
* @description Emit task:updated event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param task - The task object that was updated.
* @returns void
*/
emitTaskUpdated(workspaceId: string, task: Task): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("task:updated", task);
this.logger.debug(`Emitted task:updated to ${room}`);
}
/**
* @description Emit task:deleted event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param taskId - The ID of the task that was deleted.
* @returns void
*/
emitTaskDeleted(workspaceId: string, taskId: string): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("task:deleted", { id: taskId });
this.logger.debug(`Emitted task:deleted to ${room}`);
}
/**
* @description Emit event:created event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param event - The event object that was created.
* @returns void
*/
emitEventCreated(workspaceId: string, event: Event): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("event:created", event);
this.logger.debug(`Emitted event:created to ${room}`);
}
/**
* @description Emit event:updated event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param event - The event object that was updated.
* @returns void
*/
emitEventUpdated(workspaceId: string, event: Event): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("event:updated", event);
this.logger.debug(`Emitted event:updated to ${room}`);
}
/**
* @description Emit event:deleted event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param eventId - The ID of the event that was deleted.
* @returns void
*/
emitEventDeleted(workspaceId: string, eventId: string): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("event:deleted", { id: eventId });
this.logger.debug(`Emitted event:deleted to ${room}`);
}
/**
* @description Emit project:created event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param project - The project object that was created.
* @returns void
*/
emitProjectCreated(workspaceId: string, project: Project): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("project:created", project);
this.logger.debug(`Emitted project:created to ${room}`);
}
/**
* @description Emit project:updated event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param project - The project object that was updated.
* @returns void
*/
emitProjectUpdated(workspaceId: string, project: Project): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("project:updated", project);
this.logger.debug(`Emitted project:updated to ${room}`);
}
/**
* @description Emit project:deleted event to all clients in the workspace room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param projectId - The ID of the project that was deleted.
* @returns void
*/
emitProjectDeleted(workspaceId: string, projectId: string): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("project:deleted", { id: projectId });
this.logger.debug(`Emitted project:deleted to ${room}`);
}
/**
* Emit cron:executed event when a scheduled command fires
*/
emitCronExecuted(
workspaceId: string,
data: { scheduleId: string; command: string; executedAt: Date }
): void {
const room = this.getWorkspaceRoom(workspaceId);
this.server.to(room).emit("cron:executed", data);
this.logger.debug(`Emitted cron:executed to ${room}`);
}
/**
* @description Emit job:created event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param job - The job object that was created.
* @returns void
*/
emitJobCreated(workspaceId: string, job: Job): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(job.id);
this.server.to(workspaceJobsRoom).emit("job:created", job);
this.server.to(jobRoom).emit("job:created", job);
this.logger.debug(`Emitted job:created to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* @description Emit job:status event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param jobId - The job identifier.
* @param data - The status change data including current and previous status.
* @returns void
*/
emitJobStatusChanged(workspaceId: string, jobId: string, data: JobStatusData): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(jobId);
this.server.to(workspaceJobsRoom).emit("job:status", data);
this.server.to(jobRoom).emit("job:status", data);
this.logger.debug(`Emitted job:status to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* @description Emit job:progress event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param jobId - The job identifier.
* @param data - The progress data including percentage and optional message.
* @returns void
*/
emitJobProgress(workspaceId: string, jobId: string, data: JobProgressData): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(jobId);
this.server.to(workspaceJobsRoom).emit("job:progress", data);
this.server.to(jobRoom).emit("job:progress", data);
this.logger.debug(`Emitted job:progress to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* @description Emit step:started event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param jobId - The job identifier.
* @param data - The step data including step ID and name.
* @returns void
*/
emitStepStarted(workspaceId: string, jobId: string, data: StepData): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(jobId);
this.server.to(workspaceJobsRoom).emit("step:started", data);
this.server.to(jobRoom).emit("step:started", data);
this.logger.debug(`Emitted step:started to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* @description Emit step:completed event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param jobId - The job identifier.
* @param data - The step completion data including success status.
* @returns void
*/
emitStepCompleted(workspaceId: string, jobId: string, data: StepData): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(jobId);
this.server.to(workspaceJobsRoom).emit("step:completed", data);
this.server.to(jobRoom).emit("step:completed", data);
this.logger.debug(`Emitted step:completed to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* @description Emit step:output event to workspace jobs room and specific job room.
* @param workspaceId - The workspace identifier for the room to broadcast to.
* @param jobId - The job identifier.
* @param data - The step output data including output text and timestamp.
* @returns void
*/
emitStepOutput(workspaceId: string, jobId: string, data: StepOutputData): void {
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
const jobRoom = this.getJobRoom(jobId);
this.server.to(workspaceJobsRoom).emit("step:output", data);
this.server.to(jobRoom).emit("step:output", data);
this.logger.debug(`Emitted step:output to ${workspaceJobsRoom} and ${jobRoom}`);
}
/**
* Get workspace room name
*/
private getWorkspaceRoom(workspaceId: string): string {
return `workspace:${workspaceId}`;
}
/**
* Get workspace jobs room name
*/
private getWorkspaceJobsRoom(workspaceId: string): string {
return `workspace:${workspaceId}:jobs`;
}
/**
* Get job-specific room name
*/
private getJobRoom(jobId: string): string {
return `job:${jobId}`;
}
}