/** * Event Service * * Handles federated event messages and subscriptions. */ import { Injectable, Logger } from "@nestjs/common"; import { HttpService } from "@nestjs/axios"; import { randomUUID } from "crypto"; import { firstValueFrom } from "rxjs"; import { PrismaService } from "../prisma/prisma.service"; import { FederationService } from "./federation.service"; import { SignatureService } from "./signature.service"; import { FederationConnectionStatus, FederationMessageType, FederationMessageStatus, } from "@prisma/client"; import type { EventMessage, EventAck, EventMessageDetails, SubscriptionDetails, } from "./types/message.types"; @Injectable() export class EventService { private readonly logger = new Logger(EventService.name); constructor( private readonly prisma: PrismaService, private readonly federationService: FederationService, private readonly signatureService: SignatureService, private readonly httpService: HttpService ) {} /** * Subscribe to an event type from a remote instance */ async subscribeToEventType( workspaceId: string, connectionId: string, eventType: string, metadata?: Record ): Promise { // Validate connection exists and is active const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, workspaceId }, }); if (!connection) { throw new Error("Connection not found"); } if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Create subscription const subscription = await this.prisma.federationEventSubscription.create({ data: { workspaceId, connectionId, eventType, metadata: (metadata ?? {}) as never, }, }); this.logger.log(`Subscribed to event type ${eventType} on connection ${connectionId}`); return this.mapToSubscriptionDetails(subscription); } /** * Unsubscribe from an event type */ async unsubscribeFromEventType( workspaceId: string, connectionId: string, eventType: string ): Promise { // Find subscription const subscription = await this.prisma.federationEventSubscription.findFirst({ where: { workspaceId, connectionId, eventType, }, }); if (!subscription) { throw new Error("Subscription not found"); } // Delete subscription await this.prisma.federationEventSubscription.delete({ where: { id: subscription.id }, }); this.logger.log(`Unsubscribed from event type ${eventType} on connection ${connectionId}`); } /** * Publish an event to all subscribed instances */ async publishEvent( workspaceId: string, eventType: string, payload: Record ): Promise { // Find all active subscriptions for this event type const subscriptions = await this.prisma.federationEventSubscription.findMany({ where: { workspaceId, eventType, isActive: true, }, include: { connection: true, }, }); if (subscriptions.length === 0) { this.logger.debug(`No active subscriptions for event type ${eventType}`); return []; } // Get local instance identity const identity = await this.federationService.getInstanceIdentity(); const results: EventMessageDetails[] = []; // Publish to each subscribed connection for (const subscription of subscriptions) { const connection = subscription.connection; // Skip if connection is not active if (connection.status !== FederationConnectionStatus.ACTIVE) { this.logger.warn(`Skipping inactive connection ${connection.id} for event ${eventType}`); continue; } try { // Create event message const messageId = randomUUID(); const timestamp = Date.now(); const eventPayload: Record = { messageId, instanceId: identity.instanceId, eventType, payload, timestamp, }; // Sign the event const signature = await this.signatureService.signMessage(eventPayload); const signedEvent = { messageId, instanceId: identity.instanceId, eventType, payload, timestamp, signature, } as EventMessage; // Store message in database const message = await this.prisma.federationMessage.create({ data: { workspaceId, connectionId: connection.id, messageType: FederationMessageType.EVENT, messageId, eventType, payload: payload as never, status: FederationMessageStatus.PENDING, signature, }, }); // Send event to remote instance try { const remoteUrl = `${connection.remoteUrl}/api/v1/federation/incoming/event`; await firstValueFrom(this.httpService.post(remoteUrl, signedEvent)); this.logger.log(`Event sent to ${connection.remoteUrl}: ${messageId}`); results.push(this.mapToEventMessageDetails(message)); } catch (error) { this.logger.error(`Failed to send event to ${connection.remoteUrl}`, error); // Update message status to failed await this.prisma.federationMessage.update({ where: { id: message.id }, data: { status: FederationMessageStatus.FAILED, error: error instanceof Error ? error.message : "Unknown error", }, }); results.push( this.mapToEventMessageDetails({ ...message, status: FederationMessageStatus.FAILED, error: error instanceof Error ? error.message : "Unknown error", }) ); } } catch (error) { this.logger.error(`Failed to publish event to connection ${connection.id}`, error); } } return results; } /** * Handle incoming event from remote instance */ async handleIncomingEvent(eventMessage: EventMessage): Promise { this.logger.log(`Received event from ${eventMessage.instanceId}: ${eventMessage.messageId}`); // Validate timestamp if (!this.signatureService.validateTimestamp(eventMessage.timestamp)) { throw new Error("Event timestamp is outside acceptable range"); } // Find connection for remote instance const connection = await this.prisma.federationConnection.findFirst({ where: { remoteInstanceId: eventMessage.instanceId, status: FederationConnectionStatus.ACTIVE, }, }); if (!connection) { throw new Error("No connection found for remote instance"); } // Validate connection is active if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Verify signature const { signature, ...messageToVerify } = eventMessage; const verificationResult = await this.signatureService.verifyMessage( messageToVerify, signature, eventMessage.instanceId ); if (!verificationResult.valid) { throw new Error(verificationResult.error ?? "Invalid signature"); } // Store received event await this.prisma.federationMessage.create({ data: { workspaceId: connection.workspaceId, connectionId: connection.id, messageType: FederationMessageType.EVENT, messageId: eventMessage.messageId, eventType: eventMessage.eventType, payload: eventMessage.payload as never, status: FederationMessageStatus.DELIVERED, signature: eventMessage.signature, deliveredAt: new Date(), }, }); // Get local instance identity const identity = await this.federationService.getInstanceIdentity(); // Create acknowledgment const ackMessageId = randomUUID(); const ackTimestamp = Date.now(); const ackPayload: Record = { messageId: ackMessageId, correlationId: eventMessage.messageId, instanceId: identity.instanceId, received: true, timestamp: ackTimestamp, }; // Sign the acknowledgment const ackSignature = await this.signatureService.signMessage(ackPayload); const ack = { messageId: ackMessageId, correlationId: eventMessage.messageId, instanceId: identity.instanceId, received: true, timestamp: ackTimestamp, signature: ackSignature, } as EventAck; return ack; } /** * Process an event acknowledgment from remote instance */ async processEventAck(ack: EventAck): Promise { this.logger.log(`Received acknowledgment for event: ${ack.correlationId}`); // Validate timestamp if (!this.signatureService.validateTimestamp(ack.timestamp)) { throw new Error("Acknowledgment timestamp is outside acceptable range"); } // Find original event message const message = await this.prisma.federationMessage.findFirst({ where: { messageId: ack.correlationId, messageType: FederationMessageType.EVENT, }, }); if (!message) { throw new Error("Original event message not found"); } // Verify signature const { signature, ...ackToVerify } = ack; const verificationResult = await this.signatureService.verifyMessage( ackToVerify, signature, ack.instanceId ); if (!verificationResult.valid) { throw new Error(verificationResult.error ?? "Invalid signature"); } // Update message with acknowledgment const updateData: Record = { status: ack.received ? FederationMessageStatus.DELIVERED : FederationMessageStatus.FAILED, deliveredAt: new Date(), }; if (ack.error !== undefined) { updateData.error = ack.error; } await this.prisma.federationMessage.update({ where: { id: message.id }, data: updateData, }); this.logger.log(`Event acknowledgment processed: ${ack.correlationId}`); } /** * Get all event subscriptions for a workspace */ async getEventSubscriptions( workspaceId: string, connectionId?: string ): Promise { const where: Record = { workspaceId, }; if (connectionId) { where.connectionId = connectionId; } const subscriptions = await this.prisma.federationEventSubscription.findMany({ where, orderBy: { createdAt: "desc" }, }); return subscriptions.map((sub) => this.mapToSubscriptionDetails(sub)); } /** * Get all event messages for a workspace */ async getEventMessages( workspaceId: string, status?: FederationMessageStatus ): Promise { const where: Record = { workspaceId, messageType: FederationMessageType.EVENT, }; if (status) { where.status = status; } const messages = await this.prisma.federationMessage.findMany({ where, orderBy: { createdAt: "desc" }, }); return messages.map((msg) => this.mapToEventMessageDetails(msg)); } /** * Get a single event message */ async getEventMessage(workspaceId: string, messageId: string): Promise { const message = await this.prisma.federationMessage.findUnique({ where: { id: messageId, workspaceId }, }); if (!message) { throw new Error("Event message not found"); } return this.mapToEventMessageDetails(message); } /** * Map Prisma FederationMessage to EventMessageDetails */ private mapToEventMessageDetails(message: { id: string; workspaceId: string; connectionId: string; messageType: FederationMessageType; messageId: string; correlationId: string | null; query: string | null; commandType: string | null; eventType: string | null; payload: unknown; response: unknown; status: FederationMessageStatus; error: string | null; createdAt: Date; updatedAt: Date; deliveredAt: Date | null; }): EventMessageDetails { const details: EventMessageDetails = { id: message.id, workspaceId: message.workspaceId, connectionId: message.connectionId, messageType: message.messageType, messageId: message.messageId, response: message.response, status: message.status, createdAt: message.createdAt, updatedAt: message.updatedAt, }; if (message.correlationId !== null) { details.correlationId = message.correlationId; } if (message.eventType !== null) { details.eventType = message.eventType; } if (message.payload !== null && typeof message.payload === "object") { details.payload = message.payload as Record; } if (message.error !== null) { details.error = message.error; } if (message.deliveredAt !== null) { details.deliveredAt = message.deliveredAt; } return details; } /** * Map Prisma FederationEventSubscription to SubscriptionDetails */ private mapToSubscriptionDetails(subscription: { id: string; workspaceId: string; connectionId: string; eventType: string; metadata: unknown; isActive: boolean; createdAt: Date; updatedAt: Date; }): SubscriptionDetails { return { id: subscription.id, workspaceId: subscription.workspaceId, connectionId: subscription.connectionId, eventType: subscription.eventType, metadata: typeof subscription.metadata === "object" && subscription.metadata !== null ? (subscription.metadata as Record) : {}, isActive: subscription.isActive, createdAt: subscription.createdAt, updatedAt: subscription.updatedAt, }; } }