Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
198 lines
5.5 KiB
TypeScript
198 lines
5.5 KiB
TypeScript
/**
|
|
* Event Controller
|
|
*
|
|
* API endpoints for event subscriptions and publishing.
|
|
*/
|
|
|
|
import { Controller, Get, Post, UseGuards, Logger, Req, Body, Param, Query } from "@nestjs/common";
|
|
import { EventService } from "./event.service";
|
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
|
import { FederationMessageStatus } from "@prisma/client";
|
|
import type { AuthenticatedRequest } from "../common/types/user.types";
|
|
import type {
|
|
EventMessage,
|
|
EventAck,
|
|
EventMessageDetails,
|
|
SubscriptionDetails,
|
|
} from "./types/message.types";
|
|
import {
|
|
SubscribeToEventDto,
|
|
UnsubscribeFromEventDto,
|
|
PublishEventDto,
|
|
IncomingEventDto,
|
|
IncomingEventAckDto,
|
|
} from "./dto/event.dto";
|
|
|
|
@Controller("v1/federation")
|
|
export class EventController {
|
|
private readonly logger = new Logger(EventController.name);
|
|
|
|
constructor(private readonly eventService: EventService) {}
|
|
|
|
/**
|
|
* Subscribe to an event type from a remote instance
|
|
* Requires authentication
|
|
*/
|
|
@Post("events/subscribe")
|
|
@UseGuards(AuthGuard)
|
|
async subscribeToEvent(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Body() dto: SubscribeToEventDto
|
|
): Promise<SubscriptionDetails> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
this.logger.log(
|
|
`User ${req.user.id} subscribing to event type ${dto.eventType} on connection ${dto.connectionId}`
|
|
);
|
|
|
|
return this.eventService.subscribeToEventType(
|
|
req.user.workspaceId,
|
|
dto.connectionId,
|
|
dto.eventType,
|
|
dto.metadata
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe from an event type
|
|
* Requires authentication
|
|
*/
|
|
@Post("events/unsubscribe")
|
|
@UseGuards(AuthGuard)
|
|
async unsubscribeFromEvent(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Body() dto: UnsubscribeFromEventDto
|
|
): Promise<{ status: string }> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
this.logger.log(
|
|
`User ${req.user.id} unsubscribing from event type ${dto.eventType} on connection ${dto.connectionId}`
|
|
);
|
|
|
|
await this.eventService.unsubscribeFromEventType(
|
|
req.user.workspaceId,
|
|
dto.connectionId,
|
|
dto.eventType
|
|
);
|
|
|
|
return { status: "unsubscribed" };
|
|
}
|
|
|
|
/**
|
|
* Publish an event to subscribed instances
|
|
* Requires authentication
|
|
*/
|
|
@Post("events/publish")
|
|
@UseGuards(AuthGuard)
|
|
async publishEvent(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Body() dto: PublishEventDto
|
|
): Promise<EventMessageDetails[]> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
this.logger.log(`User ${req.user.id} publishing event type ${dto.eventType}`);
|
|
|
|
return this.eventService.publishEvent(req.user.workspaceId, dto.eventType, dto.payload);
|
|
}
|
|
|
|
/**
|
|
* Get all event subscriptions for the workspace
|
|
* Requires authentication
|
|
*/
|
|
@Get("events/subscriptions")
|
|
@UseGuards(AuthGuard)
|
|
async getSubscriptions(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Query("connectionId") connectionId?: string
|
|
): Promise<SubscriptionDetails[]> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
return this.eventService.getEventSubscriptions(req.user.workspaceId, connectionId);
|
|
}
|
|
|
|
/**
|
|
* Get all event messages for the workspace
|
|
* Requires authentication
|
|
*/
|
|
@Get("events/messages")
|
|
@UseGuards(AuthGuard)
|
|
async getEventMessages(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Query("status") status?: FederationMessageStatus
|
|
): Promise<EventMessageDetails[]> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
return this.eventService.getEventMessages(req.user.workspaceId, status);
|
|
}
|
|
|
|
/**
|
|
* Get a single event message
|
|
* Requires authentication
|
|
*/
|
|
@Get("events/messages/:id")
|
|
@UseGuards(AuthGuard)
|
|
async getEventMessage(
|
|
@Req() req: AuthenticatedRequest,
|
|
@Param("id") messageId: string
|
|
): Promise<EventMessageDetails> {
|
|
if (!req.user?.workspaceId) {
|
|
throw new Error("Workspace ID not found in request");
|
|
}
|
|
|
|
return this.eventService.getEventMessage(req.user.workspaceId, messageId);
|
|
}
|
|
|
|
/**
|
|
* Handle incoming event from remote instance
|
|
* Public endpoint - no authentication required (signature-based verification)
|
|
*/
|
|
@Post("incoming/event")
|
|
async handleIncomingEvent(@Body() dto: IncomingEventDto): Promise<EventAck> {
|
|
this.logger.log(`Received event from ${dto.instanceId}: ${dto.messageId}`);
|
|
|
|
const eventMessage: EventMessage = {
|
|
messageId: dto.messageId,
|
|
instanceId: dto.instanceId,
|
|
eventType: dto.eventType,
|
|
payload: dto.payload,
|
|
timestamp: dto.timestamp,
|
|
signature: dto.signature,
|
|
};
|
|
|
|
return this.eventService.handleIncomingEvent(eventMessage);
|
|
}
|
|
|
|
/**
|
|
* Handle incoming event acknowledgment from remote instance
|
|
* Public endpoint - no authentication required (signature-based verification)
|
|
*/
|
|
@Post("incoming/event/ack")
|
|
async handleIncomingEventAck(@Body() dto: IncomingEventAckDto): Promise<{ status: string }> {
|
|
this.logger.log(`Received acknowledgment for event: ${dto.correlationId}`);
|
|
|
|
const ack: EventAck = {
|
|
messageId: dto.messageId,
|
|
correlationId: dto.correlationId,
|
|
instanceId: dto.instanceId,
|
|
received: dto.received,
|
|
...(dto.error !== undefined ? { error: dto.error } : {}),
|
|
timestamp: dto.timestamp,
|
|
signature: dto.signature,
|
|
};
|
|
|
|
await this.eventService.processEventAck(ack);
|
|
|
|
return { status: "acknowledged" };
|
|
}
|
|
}
|