Files
stack/apps/api/src/federation/event.controller.ts
Jason Woltje ca4f5ec011 feat(#90): implement EVENT subscriptions for federation
Implement event pub/sub messaging for federation to enable real-time
event streaming between federated instances.

Features:
- Event subscription management (subscribe/unsubscribe)
- Event publishing to subscribed instances
- Event acknowledgment protocol
- Server-side event filtering based on subscriptions
- Full signature verification and connection validation

Implementation:
- FederationEventSubscription model for storing subscriptions
- EventService with complete event lifecycle management
- EventController with authenticated and public endpoints
- EventMessage, EventAck, and SubscriptionDetails types
- Comprehensive DTOs for all event operations

API Endpoints:
- POST /api/v1/federation/events/subscribe
- POST /api/v1/federation/events/unsubscribe
- POST /api/v1/federation/events/publish
- GET /api/v1/federation/events/subscriptions
- GET /api/v1/federation/events/messages
- POST /api/v1/federation/incoming/event (public)
- POST /api/v1/federation/incoming/event/ack (public)

Testing:
- 18 unit tests for EventService (89.09% coverage)
- 11 unit tests for EventController (83.87% coverage)
- All 29 tests passing
- Follows TDD red-green-refactor cycle

Technical Notes:
- Reuses existing FederationMessage model with eventType field
- Follows patterns from QueryService and CommandService
- Uses existing signature and connection infrastructure
- Supports hierarchical event type naming (e.g., "task.created")

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 13:45:00 -06:00

198 lines
5.5 KiB
TypeScript

/**
* Event Controller
*
* API endpoints for event subscriptions and publishing.
*/
import { Controller, Get, Post, UseGuards, Logger, Req, Body, Param, Query } from "@nestjs/common";
import { EventService } from "./event.service";
import { AuthGuard } from "../auth/guards/auth.guard";
import { FederationMessageStatus } from "@prisma/client";
import type { AuthenticatedRequest } from "../common/types/user.types";
import type {
EventMessage,
EventAck,
EventMessageDetails,
SubscriptionDetails,
} from "./types/message.types";
import {
SubscribeToEventDto,
UnsubscribeFromEventDto,
PublishEventDto,
IncomingEventDto,
IncomingEventAckDto,
} from "./dto/event.dto";
@Controller("api/v1/federation")
export class EventController {
private readonly logger = new Logger(EventController.name);
constructor(private readonly eventService: EventService) {}
/**
* Subscribe to an event type from a remote instance
* Requires authentication
*/
@Post("events/subscribe")
@UseGuards(AuthGuard)
async subscribeToEvent(
@Req() req: AuthenticatedRequest,
@Body() dto: SubscribeToEventDto
): Promise<SubscriptionDetails> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
this.logger.log(
`User ${req.user.id} subscribing to event type ${dto.eventType} on connection ${dto.connectionId}`
);
return this.eventService.subscribeToEventType(
req.user.workspaceId,
dto.connectionId,
dto.eventType,
dto.metadata
);
}
/**
* Unsubscribe from an event type
* Requires authentication
*/
@Post("events/unsubscribe")
@UseGuards(AuthGuard)
async unsubscribeFromEvent(
@Req() req: AuthenticatedRequest,
@Body() dto: UnsubscribeFromEventDto
): Promise<{ status: string }> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
this.logger.log(
`User ${req.user.id} unsubscribing from event type ${dto.eventType} on connection ${dto.connectionId}`
);
await this.eventService.unsubscribeFromEventType(
req.user.workspaceId,
dto.connectionId,
dto.eventType
);
return { status: "unsubscribed" };
}
/**
* Publish an event to subscribed instances
* Requires authentication
*/
@Post("events/publish")
@UseGuards(AuthGuard)
async publishEvent(
@Req() req: AuthenticatedRequest,
@Body() dto: PublishEventDto
): Promise<EventMessageDetails[]> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
this.logger.log(`User ${req.user.id} publishing event type ${dto.eventType}`);
return this.eventService.publishEvent(req.user.workspaceId, dto.eventType, dto.payload);
}
/**
* Get all event subscriptions for the workspace
* Requires authentication
*/
@Get("events/subscriptions")
@UseGuards(AuthGuard)
async getSubscriptions(
@Req() req: AuthenticatedRequest,
@Query("connectionId") connectionId?: string
): Promise<SubscriptionDetails[]> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
return this.eventService.getEventSubscriptions(req.user.workspaceId, connectionId);
}
/**
* Get all event messages for the workspace
* Requires authentication
*/
@Get("events/messages")
@UseGuards(AuthGuard)
async getEventMessages(
@Req() req: AuthenticatedRequest,
@Query("status") status?: FederationMessageStatus
): Promise<EventMessageDetails[]> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
return this.eventService.getEventMessages(req.user.workspaceId, status);
}
/**
* Get a single event message
* Requires authentication
*/
@Get("events/messages/:id")
@UseGuards(AuthGuard)
async getEventMessage(
@Req() req: AuthenticatedRequest,
@Param("id") messageId: string
): Promise<EventMessageDetails> {
if (!req.user?.workspaceId) {
throw new Error("Workspace ID not found in request");
}
return this.eventService.getEventMessage(req.user.workspaceId, messageId);
}
/**
* Handle incoming event from remote instance
* Public endpoint - no authentication required (signature-based verification)
*/
@Post("incoming/event")
async handleIncomingEvent(@Body() dto: IncomingEventDto): Promise<EventAck> {
this.logger.log(`Received event from ${dto.instanceId}: ${dto.messageId}`);
const eventMessage: EventMessage = {
messageId: dto.messageId,
instanceId: dto.instanceId,
eventType: dto.eventType,
payload: dto.payload,
timestamp: dto.timestamp,
signature: dto.signature,
};
return this.eventService.handleIncomingEvent(eventMessage);
}
/**
* Handle incoming event acknowledgment from remote instance
* Public endpoint - no authentication required (signature-based verification)
*/
@Post("incoming/event/ack")
async handleIncomingEventAck(@Body() dto: IncomingEventAckDto): Promise<{ status: string }> {
this.logger.log(`Received acknowledgment for event: ${dto.correlationId}`);
const ack: EventAck = {
messageId: dto.messageId,
correlationId: dto.correlationId,
instanceId: dto.instanceId,
received: dto.received,
...(dto.error !== undefined ? { error: dto.error } : {}),
timestamp: dto.timestamp,
signature: dto.signature,
};
await this.eventService.processEventAck(ack);
return { status: "acknowledged" };
}
}