feat(#90): implement EVENT subscriptions for federation
Implement event pub/sub messaging for federation to enable real-time event streaming between federated instances. Features: - Event subscription management (subscribe/unsubscribe) - Event publishing to subscribed instances - Event acknowledgment protocol - Server-side event filtering based on subscriptions - Full signature verification and connection validation Implementation: - FederationEventSubscription model for storing subscriptions - EventService with complete event lifecycle management - EventController with authenticated and public endpoints - EventMessage, EventAck, and SubscriptionDetails types - Comprehensive DTOs for all event operations API Endpoints: - POST /api/v1/federation/events/subscribe - POST /api/v1/federation/events/unsubscribe - POST /api/v1/federation/events/publish - GET /api/v1/federation/events/subscriptions - GET /api/v1/federation/events/messages - POST /api/v1/federation/incoming/event (public) - POST /api/v1/federation/incoming/event/ack (public) Testing: - 18 unit tests for EventService (89.09% coverage) - 11 unit tests for EventController (83.87% coverage) - All 29 tests passing - Follows TDD red-green-refactor cycle Technical Notes: - Reuses existing FederationMessage model with eventType field - Follows patterns from QueryService and CommandService - Uses existing signature and connection infrastructure - Supports hierarchical event type naming (e.g., "task.created") Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
500
apps/api/src/federation/event.service.ts
Normal file
500
apps/api/src/federation/event.service.ts
Normal file
@@ -0,0 +1,500 @@
|
||||
/**
|
||||
* Event Service
|
||||
*
|
||||
* Handles federated event messages and subscriptions.
|
||||
*/
|
||||
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import { randomUUID } from "crypto";
|
||||
import { firstValueFrom } from "rxjs";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { FederationService } from "./federation.service";
|
||||
import { SignatureService } from "./signature.service";
|
||||
import {
|
||||
FederationConnectionStatus,
|
||||
FederationMessageType,
|
||||
FederationMessageStatus,
|
||||
} from "@prisma/client";
|
||||
import type {
|
||||
EventMessage,
|
||||
EventAck,
|
||||
EventMessageDetails,
|
||||
SubscriptionDetails,
|
||||
} from "./types/message.types";
|
||||
|
||||
@Injectable()
|
||||
export class EventService {
|
||||
private readonly logger = new Logger(EventService.name);
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly federationService: FederationService,
|
||||
private readonly signatureService: SignatureService,
|
||||
private readonly httpService: HttpService
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Subscribe to an event type from a remote instance
|
||||
*/
|
||||
async subscribeToEventType(
|
||||
workspaceId: string,
|
||||
connectionId: string,
|
||||
eventType: string,
|
||||
metadata?: Record<string, unknown>
|
||||
): Promise<SubscriptionDetails> {
|
||||
// Validate connection exists and is active
|
||||
const connection = await this.prisma.federationConnection.findUnique({
|
||||
where: { id: connectionId, workspaceId },
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new Error("Connection not found");
|
||||
}
|
||||
|
||||
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
||||
throw new Error("Connection is not active");
|
||||
}
|
||||
|
||||
// Create subscription
|
||||
const subscription = await this.prisma.federationEventSubscription.create({
|
||||
data: {
|
||||
workspaceId,
|
||||
connectionId,
|
||||
eventType,
|
||||
metadata: (metadata ?? {}) as never,
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Subscribed to event type ${eventType} on connection ${connectionId}`);
|
||||
|
||||
return this.mapToSubscriptionDetails(subscription);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe from an event type
|
||||
*/
|
||||
async unsubscribeFromEventType(
|
||||
workspaceId: string,
|
||||
connectionId: string,
|
||||
eventType: string
|
||||
): Promise<void> {
|
||||
// Find subscription
|
||||
const subscription = await this.prisma.federationEventSubscription.findFirst({
|
||||
where: {
|
||||
workspaceId,
|
||||
connectionId,
|
||||
eventType,
|
||||
},
|
||||
});
|
||||
|
||||
if (!subscription) {
|
||||
throw new Error("Subscription not found");
|
||||
}
|
||||
|
||||
// Delete subscription
|
||||
await this.prisma.federationEventSubscription.delete({
|
||||
where: { id: subscription.id },
|
||||
});
|
||||
|
||||
this.logger.log(`Unsubscribed from event type ${eventType} on connection ${connectionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish an event to all subscribed instances
|
||||
*/
|
||||
async publishEvent(
|
||||
workspaceId: string,
|
||||
eventType: string,
|
||||
payload: Record<string, unknown>
|
||||
): Promise<EventMessageDetails[]> {
|
||||
// Find all active subscriptions for this event type
|
||||
const subscriptions = await this.prisma.federationEventSubscription.findMany({
|
||||
where: {
|
||||
workspaceId,
|
||||
eventType,
|
||||
isActive: true,
|
||||
},
|
||||
include: {
|
||||
connection: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (subscriptions.length === 0) {
|
||||
this.logger.debug(`No active subscriptions for event type ${eventType}`);
|
||||
return [];
|
||||
}
|
||||
|
||||
// Get local instance identity
|
||||
const identity = await this.federationService.getInstanceIdentity();
|
||||
|
||||
const results: EventMessageDetails[] = [];
|
||||
|
||||
// Publish to each subscribed connection
|
||||
for (const subscription of subscriptions) {
|
||||
const connection = subscription.connection;
|
||||
|
||||
// Skip if connection is not active
|
||||
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
||||
this.logger.warn(`Skipping inactive connection ${connection.id} for event ${eventType}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Create event message
|
||||
const messageId = randomUUID();
|
||||
const timestamp = Date.now();
|
||||
|
||||
const eventPayload: Record<string, unknown> = {
|
||||
messageId,
|
||||
instanceId: identity.instanceId,
|
||||
eventType,
|
||||
payload,
|
||||
timestamp,
|
||||
};
|
||||
|
||||
// Sign the event
|
||||
const signature = await this.signatureService.signMessage(eventPayload);
|
||||
|
||||
const signedEvent = {
|
||||
messageId,
|
||||
instanceId: identity.instanceId,
|
||||
eventType,
|
||||
payload,
|
||||
timestamp,
|
||||
signature,
|
||||
} as EventMessage;
|
||||
|
||||
// Store message in database
|
||||
const message = await this.prisma.federationMessage.create({
|
||||
data: {
|
||||
workspaceId,
|
||||
connectionId: connection.id,
|
||||
messageType: FederationMessageType.EVENT,
|
||||
messageId,
|
||||
eventType,
|
||||
payload: payload as never,
|
||||
status: FederationMessageStatus.PENDING,
|
||||
signature,
|
||||
},
|
||||
});
|
||||
|
||||
// Send event to remote instance
|
||||
try {
|
||||
const remoteUrl = `${connection.remoteUrl}/api/v1/federation/incoming/event`;
|
||||
await firstValueFrom(this.httpService.post(remoteUrl, signedEvent));
|
||||
|
||||
this.logger.log(`Event sent to ${connection.remoteUrl}: ${messageId}`);
|
||||
results.push(this.mapToEventMessageDetails(message));
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to send event to ${connection.remoteUrl}`, error);
|
||||
|
||||
// Update message status to failed
|
||||
await this.prisma.federationMessage.update({
|
||||
where: { id: message.id },
|
||||
data: {
|
||||
status: FederationMessageStatus.FAILED,
|
||||
error: error instanceof Error ? error.message : "Unknown error",
|
||||
},
|
||||
});
|
||||
|
||||
results.push(
|
||||
this.mapToEventMessageDetails({
|
||||
...message,
|
||||
status: FederationMessageStatus.FAILED,
|
||||
error: error instanceof Error ? error.message : "Unknown error",
|
||||
})
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to publish event to connection ${connection.id}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming event from remote instance
|
||||
*/
|
||||
async handleIncomingEvent(eventMessage: EventMessage): Promise<EventAck> {
|
||||
this.logger.log(`Received event from ${eventMessage.instanceId}: ${eventMessage.messageId}`);
|
||||
|
||||
// Validate timestamp
|
||||
if (!this.signatureService.validateTimestamp(eventMessage.timestamp)) {
|
||||
throw new Error("Event timestamp is outside acceptable range");
|
||||
}
|
||||
|
||||
// Find connection for remote instance
|
||||
const connection = await this.prisma.federationConnection.findFirst({
|
||||
where: {
|
||||
remoteInstanceId: eventMessage.instanceId,
|
||||
status: FederationConnectionStatus.ACTIVE,
|
||||
},
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new Error("No connection found for remote instance");
|
||||
}
|
||||
|
||||
// Validate connection is active
|
||||
if (connection.status !== FederationConnectionStatus.ACTIVE) {
|
||||
throw new Error("Connection is not active");
|
||||
}
|
||||
|
||||
// Verify signature
|
||||
const { signature, ...messageToVerify } = eventMessage;
|
||||
const verificationResult = await this.signatureService.verifyMessage(
|
||||
messageToVerify,
|
||||
signature,
|
||||
eventMessage.instanceId
|
||||
);
|
||||
|
||||
if (!verificationResult.valid) {
|
||||
throw new Error(verificationResult.error ?? "Invalid signature");
|
||||
}
|
||||
|
||||
// Store received event
|
||||
await this.prisma.federationMessage.create({
|
||||
data: {
|
||||
workspaceId: connection.workspaceId,
|
||||
connectionId: connection.id,
|
||||
messageType: FederationMessageType.EVENT,
|
||||
messageId: eventMessage.messageId,
|
||||
eventType: eventMessage.eventType,
|
||||
payload: eventMessage.payload as never,
|
||||
status: FederationMessageStatus.DELIVERED,
|
||||
signature: eventMessage.signature,
|
||||
deliveredAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
// Get local instance identity
|
||||
const identity = await this.federationService.getInstanceIdentity();
|
||||
|
||||
// Create acknowledgment
|
||||
const ackMessageId = randomUUID();
|
||||
const ackTimestamp = Date.now();
|
||||
|
||||
const ackPayload: Record<string, unknown> = {
|
||||
messageId: ackMessageId,
|
||||
correlationId: eventMessage.messageId,
|
||||
instanceId: identity.instanceId,
|
||||
received: true,
|
||||
timestamp: ackTimestamp,
|
||||
};
|
||||
|
||||
// Sign the acknowledgment
|
||||
const ackSignature = await this.signatureService.signMessage(ackPayload);
|
||||
|
||||
const ack = {
|
||||
messageId: ackMessageId,
|
||||
correlationId: eventMessage.messageId,
|
||||
instanceId: identity.instanceId,
|
||||
received: true,
|
||||
timestamp: ackTimestamp,
|
||||
signature: ackSignature,
|
||||
} as EventAck;
|
||||
|
||||
return ack;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process an event acknowledgment from remote instance
|
||||
*/
|
||||
async processEventAck(ack: EventAck): Promise<void> {
|
||||
this.logger.log(`Received acknowledgment for event: ${ack.correlationId}`);
|
||||
|
||||
// Validate timestamp
|
||||
if (!this.signatureService.validateTimestamp(ack.timestamp)) {
|
||||
throw new Error("Acknowledgment timestamp is outside acceptable range");
|
||||
}
|
||||
|
||||
// Find original event message
|
||||
const message = await this.prisma.federationMessage.findFirst({
|
||||
where: {
|
||||
messageId: ack.correlationId,
|
||||
messageType: FederationMessageType.EVENT,
|
||||
},
|
||||
});
|
||||
|
||||
if (!message) {
|
||||
throw new Error("Original event message not found");
|
||||
}
|
||||
|
||||
// Verify signature
|
||||
const { signature, ...ackToVerify } = ack;
|
||||
const verificationResult = await this.signatureService.verifyMessage(
|
||||
ackToVerify,
|
||||
signature,
|
||||
ack.instanceId
|
||||
);
|
||||
|
||||
if (!verificationResult.valid) {
|
||||
throw new Error(verificationResult.error ?? "Invalid signature");
|
||||
}
|
||||
|
||||
// Update message with acknowledgment
|
||||
const updateData: Record<string, unknown> = {
|
||||
status: ack.received ? FederationMessageStatus.DELIVERED : FederationMessageStatus.FAILED,
|
||||
deliveredAt: new Date(),
|
||||
};
|
||||
|
||||
if (ack.error !== undefined) {
|
||||
updateData.error = ack.error;
|
||||
}
|
||||
|
||||
await this.prisma.federationMessage.update({
|
||||
where: { id: message.id },
|
||||
data: updateData,
|
||||
});
|
||||
|
||||
this.logger.log(`Event acknowledgment processed: ${ack.correlationId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all event subscriptions for a workspace
|
||||
*/
|
||||
async getEventSubscriptions(
|
||||
workspaceId: string,
|
||||
connectionId?: string
|
||||
): Promise<SubscriptionDetails[]> {
|
||||
const where: Record<string, unknown> = {
|
||||
workspaceId,
|
||||
};
|
||||
|
||||
if (connectionId) {
|
||||
where.connectionId = connectionId;
|
||||
}
|
||||
|
||||
const subscriptions = await this.prisma.federationEventSubscription.findMany({
|
||||
where,
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
return subscriptions.map((sub) => this.mapToSubscriptionDetails(sub));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all event messages for a workspace
|
||||
*/
|
||||
async getEventMessages(
|
||||
workspaceId: string,
|
||||
status?: FederationMessageStatus
|
||||
): Promise<EventMessageDetails[]> {
|
||||
const where: Record<string, unknown> = {
|
||||
workspaceId,
|
||||
messageType: FederationMessageType.EVENT,
|
||||
};
|
||||
|
||||
if (status) {
|
||||
where.status = status;
|
||||
}
|
||||
|
||||
const messages = await this.prisma.federationMessage.findMany({
|
||||
where,
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
return messages.map((msg) => this.mapToEventMessageDetails(msg));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single event message
|
||||
*/
|
||||
async getEventMessage(workspaceId: string, messageId: string): Promise<EventMessageDetails> {
|
||||
const message = await this.prisma.federationMessage.findUnique({
|
||||
where: { id: messageId, workspaceId },
|
||||
});
|
||||
|
||||
if (!message) {
|
||||
throw new Error("Event message not found");
|
||||
}
|
||||
|
||||
return this.mapToEventMessageDetails(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Map Prisma FederationMessage to EventMessageDetails
|
||||
*/
|
||||
private mapToEventMessageDetails(message: {
|
||||
id: string;
|
||||
workspaceId: string;
|
||||
connectionId: string;
|
||||
messageType: FederationMessageType;
|
||||
messageId: string;
|
||||
correlationId: string | null;
|
||||
query: string | null;
|
||||
commandType: string | null;
|
||||
eventType: string | null;
|
||||
payload: unknown;
|
||||
response: unknown;
|
||||
status: FederationMessageStatus;
|
||||
error: string | null;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
deliveredAt: Date | null;
|
||||
}): EventMessageDetails {
|
||||
const details: EventMessageDetails = {
|
||||
id: message.id,
|
||||
workspaceId: message.workspaceId,
|
||||
connectionId: message.connectionId,
|
||||
messageType: message.messageType,
|
||||
messageId: message.messageId,
|
||||
response: message.response,
|
||||
status: message.status,
|
||||
createdAt: message.createdAt,
|
||||
updatedAt: message.updatedAt,
|
||||
};
|
||||
|
||||
if (message.correlationId !== null) {
|
||||
details.correlationId = message.correlationId;
|
||||
}
|
||||
|
||||
if (message.eventType !== null) {
|
||||
details.eventType = message.eventType;
|
||||
}
|
||||
|
||||
if (message.payload !== null && typeof message.payload === "object") {
|
||||
details.payload = message.payload as Record<string, unknown>;
|
||||
}
|
||||
|
||||
if (message.error !== null) {
|
||||
details.error = message.error;
|
||||
}
|
||||
|
||||
if (message.deliveredAt !== null) {
|
||||
details.deliveredAt = message.deliveredAt;
|
||||
}
|
||||
|
||||
return details;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map Prisma FederationEventSubscription to SubscriptionDetails
|
||||
*/
|
||||
private mapToSubscriptionDetails(subscription: {
|
||||
id: string;
|
||||
workspaceId: string;
|
||||
connectionId: string;
|
||||
eventType: string;
|
||||
metadata: unknown;
|
||||
isActive: boolean;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}): SubscriptionDetails {
|
||||
return {
|
||||
id: subscription.id,
|
||||
workspaceId: subscription.workspaceId,
|
||||
connectionId: subscription.connectionId,
|
||||
eventType: subscription.eventType,
|
||||
metadata:
|
||||
typeof subscription.metadata === "object" && subscription.metadata !== null
|
||||
? (subscription.metadata as Record<string, unknown>)
|
||||
: {},
|
||||
isActive: subscription.isActive,
|
||||
createdAt: subscription.createdAt,
|
||||
updatedAt: subscription.updatedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user