feat(#85): implement CONNECT/DISCONNECT protocol
Implemented connection handshake protocol for federation building on the Instance Identity Model from issue #84. **Services:** - SignatureService: Message signing/verification with RSA-SHA256 - ConnectionService: Federation connection management **API Endpoints:** - POST /api/v1/federation/connections/initiate - POST /api/v1/federation/connections/:id/accept - POST /api/v1/federation/connections/:id/reject - POST /api/v1/federation/connections/:id/disconnect - GET /api/v1/federation/connections - GET /api/v1/federation/connections/:id - POST /api/v1/federation/incoming/connect **Tests:** 70 tests pass (18 Signature + 20 Connection + 13 Controller + 19 existing) **Coverage:** 100% on new code **TDD Approach:** Tests written before implementation Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
330
apps/api/src/federation/connection.service.ts
Normal file
330
apps/api/src/federation/connection.service.ts
Normal file
@@ -0,0 +1,330 @@
|
||||
/**
|
||||
* Connection Service
|
||||
*
|
||||
* Manages federation connections between instances.
|
||||
*/
|
||||
|
||||
import { Injectable, Logger, NotFoundException } from "@nestjs/common";
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import { FederationConnectionStatus, Prisma } from "@prisma/client";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { FederationService } from "./federation.service";
|
||||
import { SignatureService } from "./signature.service";
|
||||
import { firstValueFrom } from "rxjs";
|
||||
import type { ConnectionRequest, ConnectionDetails } from "./types/connection.types";
|
||||
import type { PublicInstanceIdentity } from "./types/instance.types";
|
||||
|
||||
@Injectable()
|
||||
export class ConnectionService {
|
||||
private readonly logger = new Logger(ConnectionService.name);
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly federationService: FederationService,
|
||||
private readonly signatureService: SignatureService,
|
||||
private readonly httpService: HttpService
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Initiate a connection to a remote instance
|
||||
*/
|
||||
async initiateConnection(workspaceId: string, remoteUrl: string): Promise<ConnectionDetails> {
|
||||
this.logger.log(`Initiating connection to ${remoteUrl} for workspace ${workspaceId}`);
|
||||
|
||||
// Fetch remote instance identity
|
||||
const remoteIdentity = await this.fetchRemoteIdentity(remoteUrl);
|
||||
|
||||
// Get our instance identity
|
||||
const localIdentity = await this.federationService.getInstanceIdentity();
|
||||
|
||||
// Create connection record with PENDING status
|
||||
const connection = await this.prisma.federationConnection.create({
|
||||
data: {
|
||||
workspaceId,
|
||||
remoteInstanceId: remoteIdentity.instanceId,
|
||||
remoteUrl: this.normalizeUrl(remoteUrl),
|
||||
remotePublicKey: remoteIdentity.publicKey,
|
||||
remoteCapabilities: remoteIdentity.capabilities as Prisma.JsonObject,
|
||||
status: FederationConnectionStatus.PENDING,
|
||||
metadata: {},
|
||||
},
|
||||
});
|
||||
|
||||
// Create signed connection request
|
||||
const request: Omit<ConnectionRequest, "signature"> = {
|
||||
instanceId: localIdentity.instanceId,
|
||||
instanceUrl: localIdentity.url,
|
||||
publicKey: localIdentity.publicKey,
|
||||
capabilities: localIdentity.capabilities,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
const signature = await this.signatureService.signMessage(request);
|
||||
const signedRequest: ConnectionRequest = { ...request, signature };
|
||||
|
||||
// Send connection request to remote instance (fire-and-forget for now)
|
||||
try {
|
||||
await firstValueFrom(
|
||||
this.httpService.post(`${remoteUrl}/api/v1/federation/incoming/connect`, signedRequest)
|
||||
);
|
||||
this.logger.log(`Connection request sent to ${remoteUrl}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to send connection request to ${remoteUrl}`, error);
|
||||
// Connection is still created in PENDING state, can be retried
|
||||
}
|
||||
|
||||
return this.mapToConnectionDetails(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept a pending connection
|
||||
*/
|
||||
async acceptConnection(
|
||||
workspaceId: string,
|
||||
connectionId: string,
|
||||
metadata?: Record<string, unknown>
|
||||
): Promise<ConnectionDetails> {
|
||||
this.logger.log(`Accepting connection ${connectionId} for workspace ${workspaceId}`);
|
||||
|
||||
// Verify connection exists and belongs to workspace
|
||||
const connection = await this.prisma.federationConnection.findFirst({
|
||||
where: {
|
||||
id: connectionId,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new NotFoundException("Connection not found");
|
||||
}
|
||||
|
||||
// Update status to ACTIVE
|
||||
const updated = await this.prisma.federationConnection.update({
|
||||
where: {
|
||||
id: connectionId,
|
||||
},
|
||||
data: {
|
||||
status: FederationConnectionStatus.ACTIVE,
|
||||
connectedAt: new Date(),
|
||||
metadata: (metadata ?? connection.metadata) as Prisma.JsonObject,
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Connection ${connectionId} activated`);
|
||||
|
||||
return this.mapToConnectionDetails(updated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reject a pending connection
|
||||
*/
|
||||
async rejectConnection(
|
||||
workspaceId: string,
|
||||
connectionId: string,
|
||||
reason: string
|
||||
): Promise<ConnectionDetails> {
|
||||
this.logger.log(`Rejecting connection ${connectionId}: ${reason}`);
|
||||
|
||||
// Verify connection exists and belongs to workspace
|
||||
const connection = await this.prisma.federationConnection.findFirst({
|
||||
where: {
|
||||
id: connectionId,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new NotFoundException("Connection not found");
|
||||
}
|
||||
|
||||
// Update status to DISCONNECTED with rejection reason
|
||||
const updated = await this.prisma.federationConnection.update({
|
||||
where: {
|
||||
id: connectionId,
|
||||
},
|
||||
data: {
|
||||
status: FederationConnectionStatus.DISCONNECTED,
|
||||
metadata: {
|
||||
...(connection.metadata as Record<string, unknown>),
|
||||
rejectionReason: reason,
|
||||
} as Prisma.JsonObject,
|
||||
},
|
||||
});
|
||||
|
||||
return this.mapToConnectionDetails(updated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect an active connection
|
||||
*/
|
||||
async disconnect(
|
||||
workspaceId: string,
|
||||
connectionId: string,
|
||||
reason?: string
|
||||
): Promise<ConnectionDetails> {
|
||||
this.logger.log(`Disconnecting connection ${connectionId}`);
|
||||
|
||||
// Verify connection exists and belongs to workspace
|
||||
const connection = await this.prisma.federationConnection.findFirst({
|
||||
where: {
|
||||
id: connectionId,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new NotFoundException("Connection not found");
|
||||
}
|
||||
|
||||
// Update status to DISCONNECTED
|
||||
const updated = await this.prisma.federationConnection.update({
|
||||
where: {
|
||||
id: connectionId,
|
||||
},
|
||||
data: {
|
||||
status: FederationConnectionStatus.DISCONNECTED,
|
||||
disconnectedAt: new Date(),
|
||||
metadata: {
|
||||
...(connection.metadata as Record<string, unknown>),
|
||||
...(reason ? { disconnectReason: reason } : {}),
|
||||
} as Prisma.JsonObject,
|
||||
},
|
||||
});
|
||||
|
||||
return this.mapToConnectionDetails(updated);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all connections for a workspace
|
||||
*/
|
||||
async getConnections(
|
||||
workspaceId: string,
|
||||
status?: FederationConnectionStatus
|
||||
): Promise<ConnectionDetails[]> {
|
||||
const connections = await this.prisma.federationConnection.findMany({
|
||||
where: {
|
||||
workspaceId,
|
||||
...(status ? { status } : {}),
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
});
|
||||
|
||||
return connections.map((conn) => this.mapToConnectionDetails(conn));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single connection
|
||||
*/
|
||||
async getConnection(workspaceId: string, connectionId: string): Promise<ConnectionDetails> {
|
||||
const connection = await this.prisma.federationConnection.findFirst({
|
||||
where: {
|
||||
id: connectionId,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!connection) {
|
||||
throw new NotFoundException("Connection not found");
|
||||
}
|
||||
|
||||
return this.mapToConnectionDetails(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming connection request from remote instance
|
||||
*/
|
||||
async handleIncomingConnectionRequest(
|
||||
workspaceId: string,
|
||||
request: ConnectionRequest
|
||||
): Promise<ConnectionDetails> {
|
||||
this.logger.log(`Received connection request from ${request.instanceId}`);
|
||||
|
||||
// Verify signature
|
||||
const validation = this.signatureService.verifyConnectionRequest(request);
|
||||
|
||||
if (!validation.valid) {
|
||||
const errorMsg = validation.error ?? "Unknown error";
|
||||
this.logger.warn(`Invalid connection request from ${request.instanceId}: ${errorMsg}`);
|
||||
throw new Error("Invalid connection request signature");
|
||||
}
|
||||
|
||||
// Create pending connection
|
||||
const connection = await this.prisma.federationConnection.create({
|
||||
data: {
|
||||
workspaceId,
|
||||
remoteInstanceId: request.instanceId,
|
||||
remoteUrl: this.normalizeUrl(request.instanceUrl),
|
||||
remotePublicKey: request.publicKey,
|
||||
remoteCapabilities: request.capabilities as Prisma.JsonObject,
|
||||
status: FederationConnectionStatus.PENDING,
|
||||
metadata: {
|
||||
requestTimestamp: request.timestamp,
|
||||
} as Prisma.JsonObject,
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Created pending connection ${connection.id} from ${request.instanceId}`);
|
||||
|
||||
return this.mapToConnectionDetails(connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch remote instance identity via HTTP
|
||||
*/
|
||||
private async fetchRemoteIdentity(remoteUrl: string): Promise<PublicInstanceIdentity> {
|
||||
try {
|
||||
const normalizedUrl = this.normalizeUrl(remoteUrl);
|
||||
const response = await firstValueFrom(
|
||||
this.httpService.get<PublicInstanceIdentity>(`${normalizedUrl}/api/v1/federation/instance`)
|
||||
);
|
||||
|
||||
return response.data;
|
||||
} catch (error: unknown) {
|
||||
this.logger.error(`Failed to fetch remote identity from ${remoteUrl}`, error);
|
||||
const errorMessage = error instanceof Error ? error.message : "Unknown error";
|
||||
throw new Error(`Could not connect to remote instance: ${remoteUrl}: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize URL (remove trailing slash)
|
||||
*/
|
||||
private normalizeUrl(url: string): string {
|
||||
return url.replace(/\/$/, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Map Prisma FederationConnection to ConnectionDetails type
|
||||
*/
|
||||
private mapToConnectionDetails(connection: {
|
||||
id: string;
|
||||
workspaceId: string;
|
||||
remoteInstanceId: string;
|
||||
remoteUrl: string;
|
||||
remotePublicKey: string;
|
||||
remoteCapabilities: unknown;
|
||||
status: FederationConnectionStatus;
|
||||
metadata: unknown;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
connectedAt: Date | null;
|
||||
disconnectedAt: Date | null;
|
||||
}): ConnectionDetails {
|
||||
return {
|
||||
id: connection.id,
|
||||
workspaceId: connection.workspaceId,
|
||||
remoteInstanceId: connection.remoteInstanceId,
|
||||
remoteUrl: connection.remoteUrl,
|
||||
remotePublicKey: connection.remotePublicKey,
|
||||
remoteCapabilities: connection.remoteCapabilities as Record<string, unknown>,
|
||||
status: connection.status,
|
||||
metadata: connection.metadata as Record<string, unknown>,
|
||||
createdAt: connection.createdAt,
|
||||
updatedAt: connection.updatedAt,
|
||||
connectedAt: connection.connectedAt,
|
||||
disconnectedAt: connection.disconnectedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user