Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Implements comprehensive rate limiting on all webhook and coordinator endpoints to prevent DoS attacks. Follows TDD protocol with 14 passing tests. Implementation: - Added @nestjs/throttler package for rate limiting - Created ThrottlerApiKeyGuard for per-API-key rate limiting - Created ThrottlerValkeyStorageService for distributed rate limiting via Redis - Configured rate limits on stitcher endpoints (60 req/min) - Configured rate limits on coordinator endpoints (100 req/min) - Higher limits for health endpoints (300 req/min for monitoring) - Added environment variables for rate limit configuration - Rate limiting logs violations for security monitoring Rate Limits: - Stitcher webhooks: 60 requests/minute per API key - Coordinator endpoints: 100 requests/minute per API key - Health endpoints: 300 requests/minute (higher for monitoring) Storage: - Uses Valkey (Redis) for distributed rate limiting across API instances - Falls back to in-memory storage if Redis unavailable Testing: - 14 comprehensive rate limiting tests (all passing) - Tests verify: rate limit enforcement, Retry-After headers, per-API-key isolation - TDD approach: RED (failing tests) → GREEN (implementation) → REFACTOR Additional improvements: - Type safety improvements in websocket gateway - Array type notation standardization in coordinator service Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
458 lines
15 KiB
TypeScript
458 lines
15 KiB
TypeScript
import {
|
|
WebSocketGateway as WSGateway,
|
|
WebSocketServer,
|
|
OnGatewayConnection,
|
|
OnGatewayDisconnect,
|
|
} from "@nestjs/websockets";
|
|
import { Logger } from "@nestjs/common";
|
|
import { Server, Socket } from "socket.io";
|
|
import { AuthService } from "../auth/auth.service";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
|
|
interface AuthenticatedSocket extends Socket {
|
|
data: {
|
|
userId?: string;
|
|
workspaceId?: string;
|
|
};
|
|
}
|
|
|
|
interface Task {
|
|
id: string;
|
|
workspaceId: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface Event {
|
|
id: string;
|
|
workspaceId: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface Project {
|
|
id: string;
|
|
workspaceId: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface Job {
|
|
id: string;
|
|
workspaceId: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface JobStatusData {
|
|
id: string;
|
|
workspaceId: string;
|
|
status: string;
|
|
previousStatus?: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface JobProgressData {
|
|
id: string;
|
|
workspaceId: string;
|
|
progressPercent: number;
|
|
message?: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface StepData {
|
|
id: string;
|
|
jobId: string;
|
|
workspaceId: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
interface StepOutputData {
|
|
id: string;
|
|
jobId: string;
|
|
workspaceId: string;
|
|
output: string;
|
|
timestamp: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
/**
|
|
* @description WebSocket Gateway for real-time updates. Handles workspace-scoped rooms for broadcasting events.
|
|
*/
|
|
@WSGateway({
|
|
cors: {
|
|
origin: process.env.WEB_URL ?? "http://localhost:3000",
|
|
credentials: true,
|
|
},
|
|
})
|
|
export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
|
@WebSocketServer()
|
|
server!: Server;
|
|
|
|
private readonly logger = new Logger(WebSocketGateway.name);
|
|
private readonly CONNECTION_TIMEOUT_MS = 5000; // 5 seconds
|
|
|
|
constructor(
|
|
private readonly authService: AuthService,
|
|
private readonly prisma: PrismaService
|
|
) {}
|
|
|
|
/**
|
|
* @description Handle client connection by authenticating and joining the workspace-specific room.
|
|
* @param client - The socket client that will be authenticated and joined to workspace room.
|
|
* @returns Promise that resolves when the client is joined to the workspace room or disconnected.
|
|
*/
|
|
async handleConnection(client: Socket): Promise<void> {
|
|
const authenticatedClient = client as AuthenticatedSocket;
|
|
|
|
// Set connection timeout
|
|
const timeoutId = setTimeout(() => {
|
|
if (!authenticatedClient.data.userId) {
|
|
this.logger.warn(`Client ${authenticatedClient.id} timed out during authentication`);
|
|
authenticatedClient.disconnect();
|
|
}
|
|
}, this.CONNECTION_TIMEOUT_MS);
|
|
|
|
try {
|
|
// Extract token from handshake
|
|
const token = this.extractTokenFromHandshake(authenticatedClient);
|
|
|
|
if (!token) {
|
|
this.logger.warn(`Client ${authenticatedClient.id} connected without token`);
|
|
authenticatedClient.disconnect();
|
|
clearTimeout(timeoutId);
|
|
return;
|
|
}
|
|
|
|
// Verify session
|
|
const sessionData = await this.authService.verifySession(token);
|
|
|
|
if (!sessionData) {
|
|
this.logger.warn(`Client ${authenticatedClient.id} has invalid token`);
|
|
authenticatedClient.disconnect();
|
|
clearTimeout(timeoutId);
|
|
return;
|
|
}
|
|
|
|
const user = sessionData.user as { id: string };
|
|
const userId = user.id;
|
|
|
|
// Verify workspace access
|
|
const workspaceMembership = await this.prisma.workspaceMember.findFirst({
|
|
where: { userId },
|
|
select: { workspaceId: true, userId: true, role: true },
|
|
});
|
|
|
|
if (!workspaceMembership) {
|
|
this.logger.warn(`User ${userId} has no workspace access`);
|
|
authenticatedClient.disconnect();
|
|
clearTimeout(timeoutId);
|
|
return;
|
|
}
|
|
|
|
// Populate socket data
|
|
authenticatedClient.data.userId = userId;
|
|
authenticatedClient.data.workspaceId = workspaceMembership.workspaceId;
|
|
|
|
// Join workspace room
|
|
const room = this.getWorkspaceRoom(workspaceMembership.workspaceId);
|
|
await authenticatedClient.join(room);
|
|
|
|
clearTimeout(timeoutId);
|
|
this.logger.log(`Client ${authenticatedClient.id} joined room ${room}`);
|
|
} catch (error) {
|
|
clearTimeout(timeoutId);
|
|
this.logger.error(
|
|
`Authentication failed for client ${authenticatedClient.id}:`,
|
|
error instanceof Error ? error.message : "Unknown error"
|
|
);
|
|
authenticatedClient.disconnect();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @description Extract authentication token from Socket.IO handshake
|
|
* @param client - The socket client
|
|
* @returns The token string or undefined if not found
|
|
*/
|
|
private extractTokenFromHandshake(client: Socket): string | undefined {
|
|
// Check handshake.auth.token (preferred method)
|
|
const authToken = client.handshake.auth.token as unknown;
|
|
if (typeof authToken === "string" && authToken.length > 0) {
|
|
return authToken;
|
|
}
|
|
|
|
// Fallback: check query parameters
|
|
const queryToken = client.handshake.query.token as unknown;
|
|
if (typeof queryToken === "string" && queryToken.length > 0) {
|
|
return queryToken;
|
|
}
|
|
|
|
// Fallback: check Authorization header
|
|
const authHeader = client.handshake.headers.authorization as unknown;
|
|
if (typeof authHeader === "string") {
|
|
const parts = authHeader.split(" ");
|
|
const [type, token] = parts;
|
|
if (type === "Bearer" && token) {
|
|
return token;
|
|
}
|
|
}
|
|
|
|
return undefined;
|
|
}
|
|
|
|
/**
|
|
* @description Handle client disconnect by leaving the workspace room.
|
|
* @param client - The socket client containing workspaceId in data.
|
|
* @returns void
|
|
*/
|
|
handleDisconnect(client: Socket): void {
|
|
const authenticatedClient = client as AuthenticatedSocket;
|
|
const { workspaceId } = authenticatedClient.data;
|
|
|
|
if (workspaceId) {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
void authenticatedClient.leave(room);
|
|
this.logger.log(`Client ${authenticatedClient.id} left room ${room}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @description Emit task:created event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param task - The task object that was created.
|
|
* @returns void
|
|
*/
|
|
emitTaskCreated(workspaceId: string, task: Task): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("task:created", task);
|
|
this.logger.debug(`Emitted task:created to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit task:updated event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param task - The task object that was updated.
|
|
* @returns void
|
|
*/
|
|
emitTaskUpdated(workspaceId: string, task: Task): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("task:updated", task);
|
|
this.logger.debug(`Emitted task:updated to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit task:deleted event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param taskId - The ID of the task that was deleted.
|
|
* @returns void
|
|
*/
|
|
emitTaskDeleted(workspaceId: string, taskId: string): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("task:deleted", { id: taskId });
|
|
this.logger.debug(`Emitted task:deleted to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit event:created event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param event - The event object that was created.
|
|
* @returns void
|
|
*/
|
|
emitEventCreated(workspaceId: string, event: Event): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("event:created", event);
|
|
this.logger.debug(`Emitted event:created to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit event:updated event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param event - The event object that was updated.
|
|
* @returns void
|
|
*/
|
|
emitEventUpdated(workspaceId: string, event: Event): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("event:updated", event);
|
|
this.logger.debug(`Emitted event:updated to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit event:deleted event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param eventId - The ID of the event that was deleted.
|
|
* @returns void
|
|
*/
|
|
emitEventDeleted(workspaceId: string, eventId: string): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("event:deleted", { id: eventId });
|
|
this.logger.debug(`Emitted event:deleted to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit project:created event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param project - The project object that was created.
|
|
* @returns void
|
|
*/
|
|
emitProjectCreated(workspaceId: string, project: Project): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("project:created", project);
|
|
this.logger.debug(`Emitted project:created to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit project:updated event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param project - The project object that was updated.
|
|
* @returns void
|
|
*/
|
|
emitProjectUpdated(workspaceId: string, project: Project): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("project:updated", project);
|
|
this.logger.debug(`Emitted project:updated to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit project:deleted event to all clients in the workspace room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param projectId - The ID of the project that was deleted.
|
|
* @returns void
|
|
*/
|
|
emitProjectDeleted(workspaceId: string, projectId: string): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("project:deleted", { id: projectId });
|
|
this.logger.debug(`Emitted project:deleted to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* Emit cron:executed event when a scheduled command fires
|
|
*/
|
|
emitCronExecuted(
|
|
workspaceId: string,
|
|
data: { scheduleId: string; command: string; executedAt: Date }
|
|
): void {
|
|
const room = this.getWorkspaceRoom(workspaceId);
|
|
this.server.to(room).emit("cron:executed", data);
|
|
this.logger.debug(`Emitted cron:executed to ${room}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit job:created event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param job - The job object that was created.
|
|
* @returns void
|
|
*/
|
|
emitJobCreated(workspaceId: string, job: Job): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(job.id);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("job:created", job);
|
|
this.server.to(jobRoom).emit("job:created", job);
|
|
|
|
this.logger.debug(`Emitted job:created to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit job:status event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param jobId - The job identifier.
|
|
* @param data - The status change data including current and previous status.
|
|
* @returns void
|
|
*/
|
|
emitJobStatusChanged(workspaceId: string, jobId: string, data: JobStatusData): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(jobId);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("job:status", data);
|
|
this.server.to(jobRoom).emit("job:status", data);
|
|
|
|
this.logger.debug(`Emitted job:status to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit job:progress event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param jobId - The job identifier.
|
|
* @param data - The progress data including percentage and optional message.
|
|
* @returns void
|
|
*/
|
|
emitJobProgress(workspaceId: string, jobId: string, data: JobProgressData): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(jobId);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("job:progress", data);
|
|
this.server.to(jobRoom).emit("job:progress", data);
|
|
|
|
this.logger.debug(`Emitted job:progress to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit step:started event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param jobId - The job identifier.
|
|
* @param data - The step data including step ID and name.
|
|
* @returns void
|
|
*/
|
|
emitStepStarted(workspaceId: string, jobId: string, data: StepData): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(jobId);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("step:started", data);
|
|
this.server.to(jobRoom).emit("step:started", data);
|
|
|
|
this.logger.debug(`Emitted step:started to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit step:completed event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param jobId - The job identifier.
|
|
* @param data - The step completion data including success status.
|
|
* @returns void
|
|
*/
|
|
emitStepCompleted(workspaceId: string, jobId: string, data: StepData): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(jobId);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("step:completed", data);
|
|
this.server.to(jobRoom).emit("step:completed", data);
|
|
|
|
this.logger.debug(`Emitted step:completed to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* @description Emit step:output event to workspace jobs room and specific job room.
|
|
* @param workspaceId - The workspace identifier for the room to broadcast to.
|
|
* @param jobId - The job identifier.
|
|
* @param data - The step output data including output text and timestamp.
|
|
* @returns void
|
|
*/
|
|
emitStepOutput(workspaceId: string, jobId: string, data: StepOutputData): void {
|
|
const workspaceJobsRoom = this.getWorkspaceJobsRoom(workspaceId);
|
|
const jobRoom = this.getJobRoom(jobId);
|
|
|
|
this.server.to(workspaceJobsRoom).emit("step:output", data);
|
|
this.server.to(jobRoom).emit("step:output", data);
|
|
|
|
this.logger.debug(`Emitted step:output to ${workspaceJobsRoom} and ${jobRoom}`);
|
|
}
|
|
|
|
/**
|
|
* Get workspace room name
|
|
*/
|
|
private getWorkspaceRoom(workspaceId: string): string {
|
|
return `workspace:${workspaceId}`;
|
|
}
|
|
|
|
/**
|
|
* Get workspace jobs room name
|
|
*/
|
|
private getWorkspaceJobsRoom(workspaceId: string): string {
|
|
return `workspace:${workspaceId}:jobs`;
|
|
}
|
|
|
|
/**
|
|
* Get job-specific room name
|
|
*/
|
|
private getJobRoom(jobId: string): string {
|
|
return `job:${jobId}`;
|
|
}
|
|
}
|