/** * Connection Service * * Manages federation connections between instances. */ import { Injectable, Logger, NotFoundException, UnauthorizedException, ServiceUnavailableException, BadRequestException, } from "@nestjs/common"; import { HttpService } from "@nestjs/axios"; import { FederationConnectionStatus, Prisma } from "@prisma/client"; import { PrismaService } from "../prisma/prisma.service"; import { FederationService } from "./federation.service"; import { SignatureService } from "./signature.service"; import { FederationAuditService } from "./audit.service"; import { firstValueFrom } from "rxjs"; import type { ConnectionRequest, ConnectionDetails } from "./types/connection.types"; import type { PublicInstanceIdentity } from "./types/instance.types"; import { FEDERATION_PROTOCOL_VERSION } from "./constants"; import { withRetry } from "./utils/retry"; @Injectable() export class ConnectionService { private readonly logger = new Logger(ConnectionService.name); private readonly MAX_CONNECTIONS_PER_WORKSPACE = 100; constructor( private readonly prisma: PrismaService, private readonly federationService: FederationService, private readonly signatureService: SignatureService, private readonly httpService: HttpService, private readonly auditService: FederationAuditService ) {} /** * Initiate a connection to a remote instance */ async initiateConnection(workspaceId: string, remoteUrl: string): Promise { this.logger.log(`Initiating connection to ${remoteUrl} for workspace ${workspaceId}`); // Check connection limit for workspace const connectionCount = await this.prisma.federationConnection.count({ where: { workspaceId }, }); if (connectionCount >= this.MAX_CONNECTIONS_PER_WORKSPACE) { throw new BadRequestException( `Connection limit reached for workspace. Maximum ${String(this.MAX_CONNECTIONS_PER_WORKSPACE)} connections allowed per workspace.` ); } // Fetch remote instance identity const remoteIdentity = await this.fetchRemoteIdentity(remoteUrl); // Validate protocol version compatibility this.validateProtocolVersion(remoteIdentity.capabilities.protocolVersion); // Get our instance identity const localIdentity = await this.federationService.getInstanceIdentity(); // Create connection record with PENDING status const connection = await this.prisma.federationConnection.create({ data: { workspaceId, remoteInstanceId: remoteIdentity.instanceId, remoteUrl: this.normalizeUrl(remoteUrl), remotePublicKey: remoteIdentity.publicKey, remoteCapabilities: remoteIdentity.capabilities as Prisma.JsonObject, status: FederationConnectionStatus.PENDING, metadata: {}, }, }); // Create signed connection request const request: Omit = { instanceId: localIdentity.instanceId, instanceUrl: localIdentity.url, publicKey: localIdentity.publicKey, capabilities: localIdentity.capabilities, timestamp: Date.now(), }; const signature = await this.signatureService.signMessage(request); const signedRequest: ConnectionRequest = { ...request, signature }; // Send connection request to remote instance with retry logic try { await withRetry( async () => { return await firstValueFrom( this.httpService.post(`${remoteUrl}/api/v1/federation/incoming/connect`, signedRequest) ); }, { maxRetries: 3, initialDelay: 1000, // 1s maxDelay: 8000, // 8s } ); this.logger.log(`Connection request sent to ${remoteUrl}`); } catch (error) { this.logger.error(`Failed to send connection request to ${remoteUrl}`, error); // Delete the failed connection to prevent zombie connections in PENDING state await this.prisma.federationConnection.delete({ where: { id: connection.id }, }); const errorMessage = error instanceof Error ? error.message : "Unknown error"; throw new BadRequestException( `Failed to initiate connection to ${remoteUrl}: ${errorMessage}` ); } return this.mapToConnectionDetails(connection); } /** * Accept a pending connection */ async acceptConnection( workspaceId: string, connectionId: string, metadata?: Record ): Promise { this.logger.log(`Accepting connection ${connectionId} for workspace ${workspaceId}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to ACTIVE const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.ACTIVE, connectedAt: new Date(), metadata: (metadata ?? connection.metadata) as Prisma.JsonObject, }, }); this.logger.log(`Connection ${connectionId} activated`); return this.mapToConnectionDetails(updated); } /** * Reject a pending connection */ async rejectConnection( workspaceId: string, connectionId: string, reason: string ): Promise { this.logger.log(`Rejecting connection ${connectionId}: ${reason}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to DISCONNECTED with rejection reason const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.DISCONNECTED, metadata: { ...(connection.metadata as Record), rejectionReason: reason, } as Prisma.JsonObject, }, }); return this.mapToConnectionDetails(updated); } /** * Disconnect an active connection */ async disconnect( workspaceId: string, connectionId: string, reason?: string ): Promise { this.logger.log(`Disconnecting connection ${connectionId}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to DISCONNECTED const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.DISCONNECTED, disconnectedAt: new Date(), metadata: { ...(connection.metadata as Record), ...(reason ? { disconnectReason: reason } : {}), } as Prisma.JsonObject, }, }); return this.mapToConnectionDetails(updated); } /** * Get all connections for a workspace */ async getConnections( workspaceId: string, status?: FederationConnectionStatus ): Promise { const connections = await this.prisma.federationConnection.findMany({ where: { workspaceId, ...(status ? { status } : {}), }, orderBy: { createdAt: "desc", }, }); return connections.map((conn) => this.mapToConnectionDetails(conn)); } /** * Get a single connection */ async getConnection(workspaceId: string, connectionId: string): Promise { const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } return this.mapToConnectionDetails(connection); } /** * Get connection by ID (without workspace filter) * Used by CapabilityGuard for authorization checks */ async getConnectionById(connectionId: string): Promise { const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, }, }); if (!connection) { return null; } return this.mapToConnectionDetails(connection); } /** * Handle incoming connection request from remote instance */ async handleIncomingConnectionRequest( workspaceId: string, request: ConnectionRequest ): Promise { this.logger.log(`Received connection request from ${request.instanceId}`); // Audit log: Incoming connection attempt this.auditService.logIncomingConnectionAttempt({ workspaceId, remoteInstanceId: request.instanceId, remoteUrl: request.instanceUrl, timestamp: request.timestamp, }); // Verify signature const validation = await this.signatureService.verifyConnectionRequest(request); if (!validation.valid) { const errorMsg: string = validation.error ?? "Unknown error"; this.logger.warn(`Invalid connection request from ${request.instanceId}: ${errorMsg}`); // Audit log: Connection rejected this.auditService.logIncomingConnectionRejected({ workspaceId, remoteInstanceId: request.instanceId, remoteUrl: request.instanceUrl, reason: "Invalid signature", error: errorMsg, }); throw new UnauthorizedException("Invalid connection request signature"); } // Validate protocol version compatibility try { this.validateProtocolVersion(request.capabilities.protocolVersion); } catch (error) { const errorMsg = error instanceof Error ? error.message : "Unknown error"; this.logger.warn(`Incompatible protocol version from ${request.instanceId}: ${errorMsg}`); // Audit log: Connection rejected this.auditService.logIncomingConnectionRejected({ workspaceId, remoteInstanceId: request.instanceId, remoteUrl: request.instanceUrl, reason: "Incompatible protocol version", error: errorMsg, }); throw error; } // Create pending connection const connection = await this.prisma.federationConnection.create({ data: { workspaceId, remoteInstanceId: request.instanceId, remoteUrl: this.normalizeUrl(request.instanceUrl), remotePublicKey: request.publicKey, remoteCapabilities: request.capabilities as Prisma.JsonObject, status: FederationConnectionStatus.PENDING, metadata: { requestTimestamp: request.timestamp, } as Prisma.JsonObject, }, }); this.logger.log(`Created pending connection ${connection.id} from ${request.instanceId}`); // Audit log: Connection created this.auditService.logIncomingConnectionCreated({ workspaceId, connectionId: connection.id, remoteInstanceId: request.instanceId, remoteUrl: request.instanceUrl, }); return this.mapToConnectionDetails(connection); } /** * Fetch remote instance identity via HTTP with retry logic */ private async fetchRemoteIdentity(remoteUrl: string): Promise { try { const normalizedUrl = this.normalizeUrl(remoteUrl); const response = await withRetry( async () => { return await firstValueFrom( this.httpService.get( `${normalizedUrl}/api/v1/federation/instance` ) ); }, { maxRetries: 3, initialDelay: 1000, // 1s maxDelay: 8000, // 8s } ); return response.data; } catch (error: unknown) { this.logger.error(`Failed to fetch remote identity from ${remoteUrl}`, error); const errorMessage = error instanceof Error ? error.message : "Unknown error"; throw new ServiceUnavailableException( `Could not connect to remote instance: ${remoteUrl}: ${errorMessage}` ); } } /** * Normalize URL (remove trailing slash) */ private normalizeUrl(url: string): string { return url.replace(/\/$/, ""); } /** * Map Prisma FederationConnection to ConnectionDetails type */ private mapToConnectionDetails(connection: { id: string; workspaceId: string; remoteInstanceId: string; remoteUrl: string; remotePublicKey: string; remoteCapabilities: unknown; status: FederationConnectionStatus; metadata: unknown; createdAt: Date; updatedAt: Date; connectedAt: Date | null; disconnectedAt: Date | null; }): ConnectionDetails { return { id: connection.id, workspaceId: connection.workspaceId, remoteInstanceId: connection.remoteInstanceId, remoteUrl: connection.remoteUrl, remotePublicKey: connection.remotePublicKey, remoteCapabilities: connection.remoteCapabilities as Record, status: connection.status, metadata: connection.metadata as Record, createdAt: connection.createdAt, updatedAt: connection.updatedAt, connectedAt: connection.connectedAt, disconnectedAt: connection.disconnectedAt, }; } /** * Validate protocol version compatibility * Currently requires exact version match */ private validateProtocolVersion(remoteVersion: string | undefined): void { if (!remoteVersion) { throw new BadRequestException( `Protocol version not specified. Expected ${FEDERATION_PROTOCOL_VERSION}` ); } if (remoteVersion !== FEDERATION_PROTOCOL_VERSION) { throw new BadRequestException( `Incompatible protocol version. Expected ${FEDERATION_PROTOCOL_VERSION}, received ${remoteVersion}` ); } } }