/** * Query Service * * Handles federated query messages. */ import { Injectable, Logger } from "@nestjs/common"; import { HttpService } from "@nestjs/axios"; import { randomUUID } from "crypto"; import { firstValueFrom } from "rxjs"; import { PrismaService } from "../prisma/prisma.service"; import { FederationService } from "./federation.service"; import { SignatureService } from "./signature.service"; import { FederationConnectionStatus, FederationMessageType, FederationMessageStatus, } from "@prisma/client"; import type { QueryMessage, QueryResponse, QueryMessageDetails } from "./types/message.types"; @Injectable() export class QueryService { private readonly logger = new Logger(QueryService.name); constructor( private readonly prisma: PrismaService, private readonly federationService: FederationService, private readonly signatureService: SignatureService, private readonly httpService: HttpService ) {} /** * Send a query to a remote instance */ async sendQuery( workspaceId: string, connectionId: string, query: string, context?: Record ): Promise { // Validate connection exists and is active const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, workspaceId }, }); if (!connection) { throw new Error("Connection not found"); } if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Get local instance identity const identity = await this.federationService.getInstanceIdentity(); // Create query message const messageId = randomUUID(); const timestamp = Date.now(); const queryPayload: Record = { messageId, instanceId: identity.instanceId, query, timestamp, }; if (context) { queryPayload.context = context; } // Sign the query const signature = await this.signatureService.signMessage(queryPayload); const signedQuery = { messageId, instanceId: identity.instanceId, query, ...(context ? { context } : {}), timestamp, signature, } as QueryMessage; // Store message in database const message = await this.prisma.federationMessage.create({ data: { workspaceId, connectionId, messageType: FederationMessageType.QUERY, messageId, query, status: FederationMessageStatus.PENDING, signature, }, }); // Send query to remote instance try { const remoteUrl = `${connection.remoteUrl}/api/v1/federation/incoming/query`; await firstValueFrom(this.httpService.post(remoteUrl, signedQuery)); this.logger.log(`Query sent to ${connection.remoteUrl}: ${messageId}`); } catch (error) { this.logger.error(`Failed to send query to ${connection.remoteUrl}`, error); // Update message status to failed await this.prisma.federationMessage.update({ where: { id: message.id }, data: { status: FederationMessageStatus.FAILED, error: error instanceof Error ? error.message : "Unknown error", }, }); throw new Error("Failed to send query"); } return this.mapToQueryMessageDetails(message); } /** * Handle incoming query from remote instance */ async handleIncomingQuery(queryMessage: QueryMessage): Promise { this.logger.log(`Received query from ${queryMessage.instanceId}: ${queryMessage.messageId}`); // Validate timestamp if (!this.signatureService.validateTimestamp(queryMessage.timestamp)) { throw new Error("Query timestamp is outside acceptable range"); } // Find connection for remote instance const connection = await this.prisma.federationConnection.findFirst({ where: { remoteInstanceId: queryMessage.instanceId, status: FederationConnectionStatus.ACTIVE, }, }); if (!connection) { throw new Error("No connection found for remote instance"); } // Validate connection is active if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Verify signature const { signature, ...messageToVerify } = queryMessage; const verificationResult = await this.signatureService.verifyMessage( messageToVerify, signature, queryMessage.instanceId ); if (!verificationResult.valid) { throw new Error(verificationResult.error ?? "Invalid signature"); } // Process query (placeholder - would delegate to actual query processor) let responseData: unknown; let success = true; let errorMessage: string | undefined; try { // TODO: Implement actual query processing // For now, return a placeholder response responseData = { message: "Query received and processed" }; } catch (error) { success = false; errorMessage = error instanceof Error ? error.message : "Query processing failed"; this.logger.error(`Query processing failed: ${errorMessage}`); } // Get local instance identity const identity = await this.federationService.getInstanceIdentity(); // Create response const responseMessageId = randomUUID(); const responseTimestamp = Date.now(); const responsePayload: Record = { messageId: responseMessageId, correlationId: queryMessage.messageId, instanceId: identity.instanceId, success, timestamp: responseTimestamp, }; if (responseData !== undefined) { responsePayload.data = responseData; } if (errorMessage !== undefined) { responsePayload.error = errorMessage; } // Sign the response const responseSignature = await this.signatureService.signMessage(responsePayload); const response = { messageId: responseMessageId, correlationId: queryMessage.messageId, instanceId: identity.instanceId, success, ...(responseData !== undefined ? { data: responseData } : {}), ...(errorMessage !== undefined ? { error: errorMessage } : {}), timestamp: responseTimestamp, signature: responseSignature, } as QueryResponse; return response; } /** * Get all query messages for a workspace */ async getQueryMessages( workspaceId: string, status?: FederationMessageStatus ): Promise { const where: Record = { workspaceId, messageType: FederationMessageType.QUERY, }; if (status) { where.status = status; } const messages = await this.prisma.federationMessage.findMany({ where, orderBy: { createdAt: "desc" }, }); return messages.map((msg) => this.mapToQueryMessageDetails(msg)); } /** * Get a single query message */ async getQueryMessage(workspaceId: string, messageId: string): Promise { const message = await this.prisma.federationMessage.findUnique({ where: { id: messageId, workspaceId }, }); if (!message) { throw new Error("Query message not found"); } return this.mapToQueryMessageDetails(message); } /** * Process a query response from remote instance */ async processQueryResponse(response: QueryResponse): Promise { this.logger.log(`Received response for query: ${response.correlationId}`); // Validate timestamp if (!this.signatureService.validateTimestamp(response.timestamp)) { throw new Error("Response timestamp is outside acceptable range"); } // Find original query message const message = await this.prisma.federationMessage.findFirst({ where: { messageId: response.correlationId, messageType: FederationMessageType.QUERY, }, }); if (!message) { throw new Error("Original query message not found"); } // Verify signature const { signature, ...responseToVerify } = response; const verificationResult = await this.signatureService.verifyMessage( responseToVerify, signature, response.instanceId ); if (!verificationResult.valid) { throw new Error(verificationResult.error ?? "Invalid signature"); } // Update message with response const updateData: Record = { status: response.success ? FederationMessageStatus.DELIVERED : FederationMessageStatus.FAILED, deliveredAt: new Date(), }; if (response.data !== undefined) { updateData.response = response.data; } if (response.error !== undefined) { updateData.error = response.error; } await this.prisma.federationMessage.update({ where: { id: message.id }, data: updateData, }); this.logger.log(`Query response processed: ${response.correlationId}`); } /** * Map Prisma FederationMessage to QueryMessageDetails */ private mapToQueryMessageDetails(message: { id: string; workspaceId: string; connectionId: string; messageType: FederationMessageType; messageId: string; correlationId: string | null; query: string | null; response: unknown; status: FederationMessageStatus; error: string | null; createdAt: Date; updatedAt: Date; deliveredAt: Date | null; }): QueryMessageDetails { const details: QueryMessageDetails = { id: message.id, workspaceId: message.workspaceId, connectionId: message.connectionId, messageType: message.messageType, messageId: message.messageId, response: message.response, status: message.status, createdAt: message.createdAt, updatedAt: message.updatedAt, }; if (message.correlationId !== null) { details.correlationId = message.correlationId; } if (message.query !== null) { details.query = message.query; } if (message.error !== null) { details.error = message.error; } if (message.deliveredAt !== null) { details.deliveredAt = message.deliveredAt; } return details; } }