Implement event pub/sub messaging for federation to enable real-time event streaming between federated instances. Features: - Event subscription management (subscribe/unsubscribe) - Event publishing to subscribed instances - Event acknowledgment protocol - Server-side event filtering based on subscriptions - Full signature verification and connection validation Implementation: - FederationEventSubscription model for storing subscriptions - EventService with complete event lifecycle management - EventController with authenticated and public endpoints - EventMessage, EventAck, and SubscriptionDetails types - Comprehensive DTOs for all event operations API Endpoints: - POST /api/v1/federation/events/subscribe - POST /api/v1/federation/events/unsubscribe - POST /api/v1/federation/events/publish - GET /api/v1/federation/events/subscriptions - GET /api/v1/federation/events/messages - POST /api/v1/federation/incoming/event (public) - POST /api/v1/federation/incoming/event/ack (public) Testing: - 18 unit tests for EventService (89.09% coverage) - 11 unit tests for EventController (83.87% coverage) - All 29 tests passing - Follows TDD red-green-refactor cycle Technical Notes: - Reuses existing FederationMessage model with eventType field - Follows patterns from QueryService and CommandService - Uses existing signature and connection infrastructure - Supports hierarchical event type naming (e.g., "task.created") Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
501 lines
14 KiB
TypeScript
501 lines
14 KiB
TypeScript
/**
|
|
* Event Service
|
|
*
|
|
* Handles federated event messages and subscriptions.
|
|
*/
|
|
|
|
import { Injectable, Logger } from "@nestjs/common";
|
|
import { HttpService } from "@nestjs/axios";
|
|
import { randomUUID } from "crypto";
|
|
import { firstValueFrom } from "rxjs";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import { FederationService } from "./federation.service";
|
|
import { SignatureService } from "./signature.service";
|
|
import {
|
|
FederationConnectionStatus,
|
|
FederationMessageType,
|
|
FederationMessageStatus,
|
|
} from "@prisma/client";
|
|
import type {
|
|
EventMessage,
|
|
EventAck,
|
|
EventMessageDetails,
|
|
SubscriptionDetails,
|
|
} from "./types/message.types";
|
|
|
|
@Injectable()
|
|
export class EventService {
|
|
private readonly logger = new Logger(EventService.name);
|
|
|
|
constructor(
|
|
private readonly prisma: PrismaService,
|
|
private readonly federationService: FederationService,
|
|
private readonly signatureService: SignatureService,
|
|
private readonly httpService: HttpService
|
|
) {}
|
|
|
|
/**
|
|
* Subscribe to an event type from a remote instance
|
|
*/
|
|
async subscribeToEventType(
|
|
workspaceId: string,
|
|
connectionId: string,
|
|
eventType: string,
|
|
metadata?: Record<string, unknown>
|
|
): Promise<SubscriptionDetails> {
|
|
// Validate connection exists and is active
|
|
const connection = await this.prisma.federationConnection.findUnique({
|
|
where: { id: connectionId, workspaceId },
|
|
});
|
|
|
|
if (!connection) {
|
|
throw new Error("Connection not found");
|
|
}
|
|
|
|
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
|
throw new Error("Connection is not active");
|
|
}
|
|
|
|
// Create subscription
|
|
const subscription = await this.prisma.federationEventSubscription.create({
|
|
data: {
|
|
workspaceId,
|
|
connectionId,
|
|
eventType,
|
|
metadata: (metadata ?? {}) as never,
|
|
},
|
|
});
|
|
|
|
this.logger.log(`Subscribed to event type ${eventType} on connection ${connectionId}`);
|
|
|
|
return this.mapToSubscriptionDetails(subscription);
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe from an event type
|
|
*/
|
|
async unsubscribeFromEventType(
|
|
workspaceId: string,
|
|
connectionId: string,
|
|
eventType: string
|
|
): Promise<void> {
|
|
// Find subscription
|
|
const subscription = await this.prisma.federationEventSubscription.findFirst({
|
|
where: {
|
|
workspaceId,
|
|
connectionId,
|
|
eventType,
|
|
},
|
|
});
|
|
|
|
if (!subscription) {
|
|
throw new Error("Subscription not found");
|
|
}
|
|
|
|
// Delete subscription
|
|
await this.prisma.federationEventSubscription.delete({
|
|
where: { id: subscription.id },
|
|
});
|
|
|
|
this.logger.log(`Unsubscribed from event type ${eventType} on connection ${connectionId}`);
|
|
}
|
|
|
|
/**
|
|
* Publish an event to all subscribed instances
|
|
*/
|
|
async publishEvent(
|
|
workspaceId: string,
|
|
eventType: string,
|
|
payload: Record<string, unknown>
|
|
): Promise<EventMessageDetails[]> {
|
|
// Find all active subscriptions for this event type
|
|
const subscriptions = await this.prisma.federationEventSubscription.findMany({
|
|
where: {
|
|
workspaceId,
|
|
eventType,
|
|
isActive: true,
|
|
},
|
|
include: {
|
|
connection: true,
|
|
},
|
|
});
|
|
|
|
if (subscriptions.length === 0) {
|
|
this.logger.debug(`No active subscriptions for event type ${eventType}`);
|
|
return [];
|
|
}
|
|
|
|
// Get local instance identity
|
|
const identity = await this.federationService.getInstanceIdentity();
|
|
|
|
const results: EventMessageDetails[] = [];
|
|
|
|
// Publish to each subscribed connection
|
|
for (const subscription of subscriptions) {
|
|
const connection = subscription.connection;
|
|
|
|
// Skip if connection is not active
|
|
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
|
this.logger.warn(`Skipping inactive connection ${connection.id} for event ${eventType}`);
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
// Create event message
|
|
const messageId = randomUUID();
|
|
const timestamp = Date.now();
|
|
|
|
const eventPayload: Record<string, unknown> = {
|
|
messageId,
|
|
instanceId: identity.instanceId,
|
|
eventType,
|
|
payload,
|
|
timestamp,
|
|
};
|
|
|
|
// Sign the event
|
|
const signature = await this.signatureService.signMessage(eventPayload);
|
|
|
|
const signedEvent = {
|
|
messageId,
|
|
instanceId: identity.instanceId,
|
|
eventType,
|
|
payload,
|
|
timestamp,
|
|
signature,
|
|
} as EventMessage;
|
|
|
|
// Store message in database
|
|
const message = await this.prisma.federationMessage.create({
|
|
data: {
|
|
workspaceId,
|
|
connectionId: connection.id,
|
|
messageType: FederationMessageType.EVENT,
|
|
messageId,
|
|
eventType,
|
|
payload: payload as never,
|
|
status: FederationMessageStatus.PENDING,
|
|
signature,
|
|
},
|
|
});
|
|
|
|
// Send event to remote instance
|
|
try {
|
|
const remoteUrl = `${connection.remoteUrl}/api/v1/federation/incoming/event`;
|
|
await firstValueFrom(this.httpService.post(remoteUrl, signedEvent));
|
|
|
|
this.logger.log(`Event sent to ${connection.remoteUrl}: ${messageId}`);
|
|
results.push(this.mapToEventMessageDetails(message));
|
|
} catch (error) {
|
|
this.logger.error(`Failed to send event to ${connection.remoteUrl}`, error);
|
|
|
|
// Update message status to failed
|
|
await this.prisma.federationMessage.update({
|
|
where: { id: message.id },
|
|
data: {
|
|
status: FederationMessageStatus.FAILED,
|
|
error: error instanceof Error ? error.message : "Unknown error",
|
|
},
|
|
});
|
|
|
|
results.push(
|
|
this.mapToEventMessageDetails({
|
|
...message,
|
|
status: FederationMessageStatus.FAILED,
|
|
error: error instanceof Error ? error.message : "Unknown error",
|
|
})
|
|
);
|
|
}
|
|
} catch (error) {
|
|
this.logger.error(`Failed to publish event to connection ${connection.id}`, error);
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Handle incoming event from remote instance
|
|
*/
|
|
async handleIncomingEvent(eventMessage: EventMessage): Promise<EventAck> {
|
|
this.logger.log(`Received event from ${eventMessage.instanceId}: ${eventMessage.messageId}`);
|
|
|
|
// Validate timestamp
|
|
if (!this.signatureService.validateTimestamp(eventMessage.timestamp)) {
|
|
throw new Error("Event timestamp is outside acceptable range");
|
|
}
|
|
|
|
// Find connection for remote instance
|
|
const connection = await this.prisma.federationConnection.findFirst({
|
|
where: {
|
|
remoteInstanceId: eventMessage.instanceId,
|
|
status: FederationConnectionStatus.ACTIVE,
|
|
},
|
|
});
|
|
|
|
if (!connection) {
|
|
throw new Error("No connection found for remote instance");
|
|
}
|
|
|
|
// Validate connection is active
|
|
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
|
throw new Error("Connection is not active");
|
|
}
|
|
|
|
// Verify signature
|
|
const { signature, ...messageToVerify } = eventMessage;
|
|
const verificationResult = await this.signatureService.verifyMessage(
|
|
messageToVerify,
|
|
signature,
|
|
eventMessage.instanceId
|
|
);
|
|
|
|
if (!verificationResult.valid) {
|
|
throw new Error(verificationResult.error ?? "Invalid signature");
|
|
}
|
|
|
|
// Store received event
|
|
await this.prisma.federationMessage.create({
|
|
data: {
|
|
workspaceId: connection.workspaceId,
|
|
connectionId: connection.id,
|
|
messageType: FederationMessageType.EVENT,
|
|
messageId: eventMessage.messageId,
|
|
eventType: eventMessage.eventType,
|
|
payload: eventMessage.payload as never,
|
|
status: FederationMessageStatus.DELIVERED,
|
|
signature: eventMessage.signature,
|
|
deliveredAt: new Date(),
|
|
},
|
|
});
|
|
|
|
// Get local instance identity
|
|
const identity = await this.federationService.getInstanceIdentity();
|
|
|
|
// Create acknowledgment
|
|
const ackMessageId = randomUUID();
|
|
const ackTimestamp = Date.now();
|
|
|
|
const ackPayload: Record<string, unknown> = {
|
|
messageId: ackMessageId,
|
|
correlationId: eventMessage.messageId,
|
|
instanceId: identity.instanceId,
|
|
received: true,
|
|
timestamp: ackTimestamp,
|
|
};
|
|
|
|
// Sign the acknowledgment
|
|
const ackSignature = await this.signatureService.signMessage(ackPayload);
|
|
|
|
const ack = {
|
|
messageId: ackMessageId,
|
|
correlationId: eventMessage.messageId,
|
|
instanceId: identity.instanceId,
|
|
received: true,
|
|
timestamp: ackTimestamp,
|
|
signature: ackSignature,
|
|
} as EventAck;
|
|
|
|
return ack;
|
|
}
|
|
|
|
/**
|
|
* Process an event acknowledgment from remote instance
|
|
*/
|
|
async processEventAck(ack: EventAck): Promise<void> {
|
|
this.logger.log(`Received acknowledgment for event: ${ack.correlationId}`);
|
|
|
|
// Validate timestamp
|
|
if (!this.signatureService.validateTimestamp(ack.timestamp)) {
|
|
throw new Error("Acknowledgment timestamp is outside acceptable range");
|
|
}
|
|
|
|
// Find original event message
|
|
const message = await this.prisma.federationMessage.findFirst({
|
|
where: {
|
|
messageId: ack.correlationId,
|
|
messageType: FederationMessageType.EVENT,
|
|
},
|
|
});
|
|
|
|
if (!message) {
|
|
throw new Error("Original event message not found");
|
|
}
|
|
|
|
// Verify signature
|
|
const { signature, ...ackToVerify } = ack;
|
|
const verificationResult = await this.signatureService.verifyMessage(
|
|
ackToVerify,
|
|
signature,
|
|
ack.instanceId
|
|
);
|
|
|
|
if (!verificationResult.valid) {
|
|
throw new Error(verificationResult.error ?? "Invalid signature");
|
|
}
|
|
|
|
// Update message with acknowledgment
|
|
const updateData: Record<string, unknown> = {
|
|
status: ack.received ? FederationMessageStatus.DELIVERED : FederationMessageStatus.FAILED,
|
|
deliveredAt: new Date(),
|
|
};
|
|
|
|
if (ack.error !== undefined) {
|
|
updateData.error = ack.error;
|
|
}
|
|
|
|
await this.prisma.federationMessage.update({
|
|
where: { id: message.id },
|
|
data: updateData,
|
|
});
|
|
|
|
this.logger.log(`Event acknowledgment processed: ${ack.correlationId}`);
|
|
}
|
|
|
|
/**
|
|
* Get all event subscriptions for a workspace
|
|
*/
|
|
async getEventSubscriptions(
|
|
workspaceId: string,
|
|
connectionId?: string
|
|
): Promise<SubscriptionDetails[]> {
|
|
const where: Record<string, unknown> = {
|
|
workspaceId,
|
|
};
|
|
|
|
if (connectionId) {
|
|
where.connectionId = connectionId;
|
|
}
|
|
|
|
const subscriptions = await this.prisma.federationEventSubscription.findMany({
|
|
where,
|
|
orderBy: { createdAt: "desc" },
|
|
});
|
|
|
|
return subscriptions.map((sub) => this.mapToSubscriptionDetails(sub));
|
|
}
|
|
|
|
/**
|
|
* Get all event messages for a workspace
|
|
*/
|
|
async getEventMessages(
|
|
workspaceId: string,
|
|
status?: FederationMessageStatus
|
|
): Promise<EventMessageDetails[]> {
|
|
const where: Record<string, unknown> = {
|
|
workspaceId,
|
|
messageType: FederationMessageType.EVENT,
|
|
};
|
|
|
|
if (status) {
|
|
where.status = status;
|
|
}
|
|
|
|
const messages = await this.prisma.federationMessage.findMany({
|
|
where,
|
|
orderBy: { createdAt: "desc" },
|
|
});
|
|
|
|
return messages.map((msg) => this.mapToEventMessageDetails(msg));
|
|
}
|
|
|
|
/**
|
|
* Get a single event message
|
|
*/
|
|
async getEventMessage(workspaceId: string, messageId: string): Promise<EventMessageDetails> {
|
|
const message = await this.prisma.federationMessage.findUnique({
|
|
where: { id: messageId, workspaceId },
|
|
});
|
|
|
|
if (!message) {
|
|
throw new Error("Event message not found");
|
|
}
|
|
|
|
return this.mapToEventMessageDetails(message);
|
|
}
|
|
|
|
/**
|
|
* Map Prisma FederationMessage to EventMessageDetails
|
|
*/
|
|
private mapToEventMessageDetails(message: {
|
|
id: string;
|
|
workspaceId: string;
|
|
connectionId: string;
|
|
messageType: FederationMessageType;
|
|
messageId: string;
|
|
correlationId: string | null;
|
|
query: string | null;
|
|
commandType: string | null;
|
|
eventType: string | null;
|
|
payload: unknown;
|
|
response: unknown;
|
|
status: FederationMessageStatus;
|
|
error: string | null;
|
|
createdAt: Date;
|
|
updatedAt: Date;
|
|
deliveredAt: Date | null;
|
|
}): EventMessageDetails {
|
|
const details: EventMessageDetails = {
|
|
id: message.id,
|
|
workspaceId: message.workspaceId,
|
|
connectionId: message.connectionId,
|
|
messageType: message.messageType,
|
|
messageId: message.messageId,
|
|
response: message.response,
|
|
status: message.status,
|
|
createdAt: message.createdAt,
|
|
updatedAt: message.updatedAt,
|
|
};
|
|
|
|
if (message.correlationId !== null) {
|
|
details.correlationId = message.correlationId;
|
|
}
|
|
|
|
if (message.eventType !== null) {
|
|
details.eventType = message.eventType;
|
|
}
|
|
|
|
if (message.payload !== null && typeof message.payload === "object") {
|
|
details.payload = message.payload as Record<string, unknown>;
|
|
}
|
|
|
|
if (message.error !== null) {
|
|
details.error = message.error;
|
|
}
|
|
|
|
if (message.deliveredAt !== null) {
|
|
details.deliveredAt = message.deliveredAt;
|
|
}
|
|
|
|
return details;
|
|
}
|
|
|
|
/**
|
|
* Map Prisma FederationEventSubscription to SubscriptionDetails
|
|
*/
|
|
private mapToSubscriptionDetails(subscription: {
|
|
id: string;
|
|
workspaceId: string;
|
|
connectionId: string;
|
|
eventType: string;
|
|
metadata: unknown;
|
|
isActive: boolean;
|
|
createdAt: Date;
|
|
updatedAt: Date;
|
|
}): SubscriptionDetails {
|
|
return {
|
|
id: subscription.id,
|
|
workspaceId: subscription.workspaceId,
|
|
connectionId: subscription.connectionId,
|
|
eventType: subscription.eventType,
|
|
metadata:
|
|
typeof subscription.metadata === "object" && subscription.metadata !== null
|
|
? (subscription.metadata as Record<string, unknown>)
|
|
: {},
|
|
isActive: subscription.isActive,
|
|
createdAt: subscription.createdAt,
|
|
updatedAt: subscription.updatedAt,
|
|
};
|
|
}
|
|
}
|