/** * Federation Agent Service * * Handles spawning and managing agents on remote federated instances. */ import { Injectable, Logger } from "@nestjs/common"; import { HttpService } from "@nestjs/axios"; import { ConfigService } from "@nestjs/config"; import { firstValueFrom } from "rxjs"; import { PrismaService } from "../prisma/prisma.service"; import { CommandService } from "./command.service"; import { FederationAuditService } from "./audit.service"; import { FederationConnectionStatus } from "@prisma/client"; import { validateUrl } from "./utils/url-validator"; import type { CommandMessageDetails } from "./types/message.types"; import type { SpawnAgentCommandPayload, AgentStatusCommandPayload, KillAgentCommandPayload, SpawnAgentResponseData, AgentStatusResponseData, KillAgentResponseData, } from "./types/federation-agent.types"; /** * Agent command response structure */ export interface AgentCommandResponse { /** Whether the command was successful */ success: boolean; /** Response data if successful */ data?: | SpawnAgentResponseData | AgentStatusResponseData | KillAgentResponseData | Record; /** Error message if failed */ error?: string; } @Injectable() export class FederationAgentService { private readonly logger = new Logger(FederationAgentService.name); private readonly orchestratorUrl: string; constructor( private readonly prisma: PrismaService, private readonly commandService: CommandService, private readonly httpService: HttpService, private readonly configService: ConfigService, private readonly auditService: FederationAuditService ) { const url = this.configService.get("orchestrator.url") ?? ""; const nodeEnv = this.configService.get("NODE_ENV") ?? "production"; const isDevelopment = nodeEnv === "development" || nodeEnv === "test"; // Validate orchestrator URL (SSRF prevention) const validationResult = validateUrl(url, isDevelopment); if (!validationResult.valid) { const errorMessage = validationResult.error ?? "Unknown validation error"; this.logger.error(`Invalid orchestrator URL: ${errorMessage}`); // Log security event this.auditService.logInvalidOrchestratorUrl(url, errorMessage); throw new Error(errorMessage); } this.orchestratorUrl = url; this.logger.log( `FederationAgentService initialized with orchestrator URL: ${this.orchestratorUrl}` ); } /** * Spawn an agent on a remote federated instance * @param workspaceId Workspace ID * @param connectionId Federation connection ID * @param payload Agent spawn command payload * @returns Command message details */ async spawnAgentOnRemote( workspaceId: string, connectionId: string, payload: SpawnAgentCommandPayload ): Promise { this.logger.log( `Spawning agent on remote instance via connection ${connectionId} for task ${payload.taskId}` ); // Validate connection exists and is active const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, workspaceId }, }); if (!connection) { throw new Error("Connection not found"); } if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Send command via federation const result = await this.commandService.sendCommand( workspaceId, connectionId, "agent.spawn", payload as unknown as Record ); this.logger.log(`Agent spawn command sent successfully: ${result.messageId}`); return result; } /** * Get agent status from remote instance * @param workspaceId Workspace ID * @param connectionId Federation connection ID * @param agentId Agent ID * @returns Command message details */ async getAgentStatus( workspaceId: string, connectionId: string, agentId: string ): Promise { this.logger.log(`Getting agent status for ${agentId} via connection ${connectionId}`); // Validate connection exists and is active const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, workspaceId }, }); if (!connection) { throw new Error("Connection not found"); } if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Send status command const payload: AgentStatusCommandPayload = { agentId }; const result = await this.commandService.sendCommand( workspaceId, connectionId, "agent.status", payload as unknown as Record ); this.logger.log(`Agent status command sent successfully: ${result.messageId}`); return result; } /** * Kill an agent on remote instance * @param workspaceId Workspace ID * @param connectionId Federation connection ID * @param agentId Agent ID * @returns Command message details */ async killAgentOnRemote( workspaceId: string, connectionId: string, agentId: string ): Promise { this.logger.log(`Killing agent ${agentId} via connection ${connectionId}`); // Validate connection exists and is active const connection = await this.prisma.federationConnection.findUnique({ where: { id: connectionId, workspaceId }, }); if (!connection) { throw new Error("Connection not found"); } if (connection.status !== FederationConnectionStatus.ACTIVE) { throw new Error("Connection is not active"); } // Send kill command const payload: KillAgentCommandPayload = { agentId }; const result = await this.commandService.sendCommand( workspaceId, connectionId, "agent.kill", payload as unknown as Record ); this.logger.log(`Agent kill command sent successfully: ${result.messageId}`); return result; } /** * Handle incoming agent command from remote instance * @param remoteInstanceId Remote instance ID that sent the command * @param commandType Command type (agent.spawn, agent.status, agent.kill) * @param payload Command payload * @returns Agent command response */ async handleAgentCommand( remoteInstanceId: string, commandType: string, payload: Record ): Promise { this.logger.log(`Handling agent command ${commandType} from ${remoteInstanceId}`); // Verify connection exists for remote instance const connection = await this.prisma.federationConnection.findFirst({ where: { remoteInstanceId, status: FederationConnectionStatus.ACTIVE, }, }); if (!connection) { throw new Error("No connection found for remote instance"); } // Route command to appropriate handler try { switch (commandType) { case "agent.spawn": return await this.handleSpawnCommand(payload as unknown as SpawnAgentCommandPayload); case "agent.status": return await this.handleStatusCommand(payload as unknown as AgentStatusCommandPayload); case "agent.kill": return await this.handleKillCommand(payload as unknown as KillAgentCommandPayload); default: throw new Error(`Unknown agent command type: ${commandType}`); } } catch (error) { this.logger.error(`Error handling agent command: ${String(error)}`); return { success: false, error: error instanceof Error ? error.message : "Unknown error", }; } } /** * Handle agent spawn command by calling local orchestrator * @param payload Spawn command payload * @returns Spawn response */ private async handleSpawnCommand( payload: SpawnAgentCommandPayload ): Promise { this.logger.log(`Processing spawn command for task ${payload.taskId}`); try { const orchestratorPayload = { taskId: payload.taskId, agentType: payload.agentType, context: payload.context, options: payload.options, }; const response = await firstValueFrom( this.httpService.post<{ agentId: string; status: string }>( `${this.orchestratorUrl}/agents/spawn`, orchestratorPayload ) ); const spawnedAt = new Date().toISOString(); const responseData: SpawnAgentResponseData = { agentId: response.data.agentId, status: response.data.status as "spawning", spawnedAt, }; this.logger.log(`Agent spawned successfully: ${responseData.agentId}`); return { success: true, data: responseData, }; } catch (error) { this.logger.error(`Failed to spawn agent: ${String(error)}`); throw error; } } /** * Handle agent status command by calling local orchestrator * @param payload Status command payload * @returns Status response */ private async handleStatusCommand( payload: AgentStatusCommandPayload ): Promise { this.logger.log(`Processing status command for agent ${payload.agentId}`); try { const response = await firstValueFrom( this.httpService.get(`${this.orchestratorUrl}/agents/${payload.agentId}/status`) ); const responseData: AgentStatusResponseData = response.data as AgentStatusResponseData; this.logger.log(`Agent status retrieved: ${responseData.status}`); return { success: true, data: responseData, }; } catch (error) { this.logger.error(`Failed to get agent status: ${String(error)}`); throw error; } } /** * Handle agent kill command by calling local orchestrator * @param payload Kill command payload * @returns Kill response */ private async handleKillCommand(payload: KillAgentCommandPayload): Promise { this.logger.log(`Processing kill command for agent ${payload.agentId}`); try { await firstValueFrom( this.httpService.post(`${this.orchestratorUrl}/agents/${payload.agentId}/kill`, {}) ); const killedAt = new Date().toISOString(); const responseData: KillAgentResponseData = { agentId: payload.agentId, status: "killed", killedAt, }; this.logger.log(`Agent killed successfully: ${payload.agentId}`); return { success: true, data: responseData, }; } catch (error) { this.logger.error(`Failed to kill agent: ${String(error)}`); throw error; } } }