/** * Connection Service * * Manages federation connections between instances. */ import { Injectable, Logger, NotFoundException, UnauthorizedException, ServiceUnavailableException, BadRequestException, } from "@nestjs/common"; import { HttpService } from "@nestjs/axios"; import { FederationConnectionStatus, Prisma } from "@prisma/client"; import { PrismaService } from "../prisma/prisma.service"; import { FederationService } from "./federation.service"; import { SignatureService } from "./signature.service"; import { firstValueFrom } from "rxjs"; import type { ConnectionRequest, ConnectionDetails } from "./types/connection.types"; import type { PublicInstanceIdentity } from "./types/instance.types"; @Injectable() export class ConnectionService { private readonly logger = new Logger(ConnectionService.name); constructor( private readonly prisma: PrismaService, private readonly federationService: FederationService, private readonly signatureService: SignatureService, private readonly httpService: HttpService ) {} /** * Initiate a connection to a remote instance */ async initiateConnection(workspaceId: string, remoteUrl: string): Promise { this.logger.log(`Initiating connection to ${remoteUrl} for workspace ${workspaceId}`); // Fetch remote instance identity const remoteIdentity = await this.fetchRemoteIdentity(remoteUrl); // Get our instance identity const localIdentity = await this.federationService.getInstanceIdentity(); // Create connection record with PENDING status const connection = await this.prisma.federationConnection.create({ data: { workspaceId, remoteInstanceId: remoteIdentity.instanceId, remoteUrl: this.normalizeUrl(remoteUrl), remotePublicKey: remoteIdentity.publicKey, remoteCapabilities: remoteIdentity.capabilities as Prisma.JsonObject, status: FederationConnectionStatus.PENDING, metadata: {}, }, }); // Create signed connection request const request: Omit = { instanceId: localIdentity.instanceId, instanceUrl: localIdentity.url, publicKey: localIdentity.publicKey, capabilities: localIdentity.capabilities, timestamp: Date.now(), }; const signature = await this.signatureService.signMessage(request); const signedRequest: ConnectionRequest = { ...request, signature }; // Send connection request to remote instance try { await firstValueFrom( this.httpService.post(`${remoteUrl}/api/v1/federation/incoming/connect`, signedRequest) ); this.logger.log(`Connection request sent to ${remoteUrl}`); } catch (error) { this.logger.error(`Failed to send connection request to ${remoteUrl}`, error); // Delete the failed connection to prevent zombie connections in PENDING state await this.prisma.federationConnection.delete({ where: { id: connection.id }, }); const errorMessage = error instanceof Error ? error.message : "Unknown error"; throw new BadRequestException( `Failed to initiate connection to ${remoteUrl}: ${errorMessage}` ); } return this.mapToConnectionDetails(connection); } /** * Accept a pending connection */ async acceptConnection( workspaceId: string, connectionId: string, metadata?: Record ): Promise { this.logger.log(`Accepting connection ${connectionId} for workspace ${workspaceId}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to ACTIVE const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.ACTIVE, connectedAt: new Date(), metadata: (metadata ?? connection.metadata) as Prisma.JsonObject, }, }); this.logger.log(`Connection ${connectionId} activated`); return this.mapToConnectionDetails(updated); } /** * Reject a pending connection */ async rejectConnection( workspaceId: string, connectionId: string, reason: string ): Promise { this.logger.log(`Rejecting connection ${connectionId}: ${reason}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to DISCONNECTED with rejection reason const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.DISCONNECTED, metadata: { ...(connection.metadata as Record), rejectionReason: reason, } as Prisma.JsonObject, }, }); return this.mapToConnectionDetails(updated); } /** * Disconnect an active connection */ async disconnect( workspaceId: string, connectionId: string, reason?: string ): Promise { this.logger.log(`Disconnecting connection ${connectionId}`); // Verify connection exists and belongs to workspace const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } // Update status to DISCONNECTED const updated = await this.prisma.federationConnection.update({ where: { id: connectionId, }, data: { status: FederationConnectionStatus.DISCONNECTED, disconnectedAt: new Date(), metadata: { ...(connection.metadata as Record), ...(reason ? { disconnectReason: reason } : {}), } as Prisma.JsonObject, }, }); return this.mapToConnectionDetails(updated); } /** * Get all connections for a workspace */ async getConnections( workspaceId: string, status?: FederationConnectionStatus ): Promise { const connections = await this.prisma.federationConnection.findMany({ where: { workspaceId, ...(status ? { status } : {}), }, orderBy: { createdAt: "desc", }, }); return connections.map((conn) => this.mapToConnectionDetails(conn)); } /** * Get a single connection */ async getConnection(workspaceId: string, connectionId: string): Promise { const connection = await this.prisma.federationConnection.findFirst({ where: { id: connectionId, workspaceId, }, }); if (!connection) { throw new NotFoundException("Connection not found"); } return this.mapToConnectionDetails(connection); } /** * Get connection by ID (without workspace filter) * Used by CapabilityGuard for authorization checks */ async getConnectionById(connectionId: string): Promise { const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, }, }); if (!connection) { return null; } return this.mapToConnectionDetails(connection); } /** * Handle incoming connection request from remote instance */ async handleIncomingConnectionRequest( workspaceId: string, request: ConnectionRequest ): Promise { this.logger.log(`Received connection request from ${request.instanceId}`); // Verify signature const validation = this.signatureService.verifyConnectionRequest(request); if (!validation.valid) { const errorMsg: string = validation.error ?? "Unknown error"; this.logger.warn(`Invalid connection request from ${request.instanceId}: ${errorMsg}`); throw new UnauthorizedException("Invalid connection request signature"); } // Create pending connection const connection = await this.prisma.federationConnection.create({ data: { workspaceId, remoteInstanceId: request.instanceId, remoteUrl: this.normalizeUrl(request.instanceUrl), remotePublicKey: request.publicKey, remoteCapabilities: request.capabilities as Prisma.JsonObject, status: FederationConnectionStatus.PENDING, metadata: { requestTimestamp: request.timestamp, } as Prisma.JsonObject, }, }); this.logger.log(`Created pending connection ${connection.id} from ${request.instanceId}`); return this.mapToConnectionDetails(connection); } /** * Fetch remote instance identity via HTTP */ private async fetchRemoteIdentity(remoteUrl: string): Promise { try { const normalizedUrl = this.normalizeUrl(remoteUrl); const response = await firstValueFrom( this.httpService.get(`${normalizedUrl}/api/v1/federation/instance`) ); return response.data; } catch (error: unknown) { this.logger.error(`Failed to fetch remote identity from ${remoteUrl}`, error); const errorMessage = error instanceof Error ? error.message : "Unknown error"; throw new ServiceUnavailableException( `Could not connect to remote instance: ${remoteUrl}: ${errorMessage}` ); } } /** * Normalize URL (remove trailing slash) */ private normalizeUrl(url: string): string { return url.replace(/\/$/, ""); } /** * Map Prisma FederationConnection to ConnectionDetails type */ private mapToConnectionDetails(connection: { id: string; workspaceId: string; remoteInstanceId: string; remoteUrl: string; remotePublicKey: string; remoteCapabilities: unknown; status: FederationConnectionStatus; metadata: unknown; createdAt: Date; updatedAt: Date; connectedAt: Date | null; disconnectedAt: Date | null; }): ConnectionDetails { return { id: connection.id, workspaceId: connection.workspaceId, remoteInstanceId: connection.remoteInstanceId, remoteUrl: connection.remoteUrl, remotePublicKey: connection.remotePublicKey, remoteCapabilities: connection.remoteCapabilities as Record, status: connection.status, metadata: connection.metadata as Record, createdAt: connection.createdAt, updatedAt: connection.updatedAt, connectedAt: connection.connectedAt, disconnectedAt: connection.disconnectedAt, }; } }