Implements FED-010: Agent Spawn via Federation feature that enables spawning and managing Claude agents on remote federated Mosaic Stack instances via COMMAND message type. Features: - Federation agent command types (spawn, status, kill) - FederationAgentService for handling agent operations - Integration with orchestrator's agent spawner/lifecycle services - API endpoints for spawning, querying status, and killing agents - Full command routing through federation COMMAND infrastructure - Comprehensive test coverage (12/12 tests passing) Architecture: - Hub → Spoke: Spawn agents on remote instances - Command flow: FederationController → FederationAgentService → CommandService → Remote Orchestrator - Response handling: Remote orchestrator returns agent status/results - Security: Connection validation, signature verification Files created: - apps/api/src/federation/types/federation-agent.types.ts - apps/api/src/federation/federation-agent.service.ts - apps/api/src/federation/federation-agent.service.spec.ts Files modified: - apps/api/src/federation/command.service.ts (agent command routing) - apps/api/src/federation/federation.controller.ts (agent endpoints) - apps/api/src/federation/federation.module.ts (service registration) - apps/orchestrator/src/api/agents/agents.controller.ts (status endpoint) - apps/orchestrator/src/api/agents/agents.module.ts (lifecycle integration) Testing: - 12/12 tests passing for FederationAgentService - All command service tests passing - TypeScript compilation successful - Linting passed Refs #93 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
9.4 KiB
9.4 KiB
Issue #88: [FED-005] QUERY Message Type
Objective
Implement the QUERY message type for federation, building on the existing connection infrastructure from issues #84 and #85. This includes:
- Query message structure and protocol
- Request/response handling for federated queries
- Query routing and authorization
- API endpoints for sending and receiving queries
- Proper TypeScript types (no explicit 'any')
- Error handling and validation
Context
Previous issues provide the foundation:
- Issue #84 (FED-001): Instance Identity Model with keypair signing
- Issue #85 (FED-002): CONNECT/DISCONNECT Protocol with signature verification
- Issue #86 (FED-003): Authentik OIDC Integration
- Issue #87 (FED-004): Cross-Instance Identity Linking
Existing infrastructure:
SignatureServicefor message signing/verificationFederationConnectionmodel with workspace scopingFederationServicefor instance identity- Connection management with ACTIVE/PENDING/DISCONNECTED states
Approach
1. Database Schema Updates
Add FederationMessage model to track query messages:
enum FederationMessageType {
QUERY
COMMAND
EVENT
}
enum FederationMessageStatus {
PENDING
DELIVERED
FAILED
TIMEOUT
}
model FederationMessage {
id String @id @default(uuid()) @db.Uuid
workspaceId String @map("workspace_id") @db.Uuid
connectionId String @map("connection_id") @db.Uuid
// Message metadata
messageType FederationMessageType @map("message_type")
messageId String @unique @map("message_id") // UUID for deduplication
correlationId String? @map("correlation_id") // For request/response tracking
// Message content
query String? @db.Text
response Json? @default("{}")
// Status tracking
status FederationMessageStatus @default(PENDING)
error String? @db.Text
// Security
signature String @db.Text
// Timestamps
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
deliveredAt DateTime? @map("delivered_at") @db.Timestamptz
// Relations
connection FederationConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade)
workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@index([workspaceId])
@@index([connectionId])
@@index([messageId])
@@index([correlationId])
@@map("federation_messages")
}
2. Create Types
Create /apps/api/src/federation/types/message.types.ts:
// Query message payload
interface QueryMessage {
messageId: string;
instanceId: string;
query: string;
context?: Record<string, unknown>;
timestamp: number;
signature: string;
}
// Query response payload
interface QueryResponse {
messageId: string;
correlationId: string; // Original query messageId
instanceId: string;
success: boolean;
data?: unknown;
error?: string;
timestamp: number;
signature: string;
}
// Query request DTO
interface SendQueryDto {
connectionId: string;
query: string;
context?: Record<string, unknown>;
}
// Query message details
interface QueryMessageDetails {
id: string;
workspaceId: string;
connectionId: string;
messageType: string;
messageId: string;
correlationId?: string;
query?: string;
response?: unknown;
status: string;
error?: string;
createdAt: Date;
updatedAt: Date;
deliveredAt?: Date;
}
3. Create Query Service
Create /apps/api/src/federation/query.service.ts:
Methods:
sendQuery(workspaceId, connectionId, query, context)- Send query to remote instancehandleIncomingQuery(queryMessage)- Process incoming querygetQueryMessages(workspaceId, filters)- List query messagesgetQueryMessage(workspaceId, messageId)- Get single query messagecreateQueryMessage(workspaceId, connectionId, query)- Create signed query messageprocessQueryResponse(response)- Handle query response
4. Add API Endpoints
Extend FederationController or create QueryController:
POST /api/v1/federation/query- Send query to remote instancePOST /api/v1/federation/incoming/query- Receive query from remote instance (public endpoint)GET /api/v1/federation/queries- List query messagesGET /api/v1/federation/queries/:id- Get query message details
5. Query Protocol Flow
Sender (Instance A) → Receiver (Instance B)
- Instance A validates connection is ACTIVE
- Instance A creates QueryMessage with unique messageId
- Instance A signs query with private key
- Instance A sends signed query to
POST {remoteUrl}/api/v1/federation/incoming/query - Instance B receives query, validates signature
- Instance B checks connection is ACTIVE
- Instance B processes query (delegates to workspace services)
- Instance B creates QueryResponse with correlationId = original messageId
- Instance B signs response with private key
- Instance B sends response back to Instance A
- Instance A receives response, validates signature
- Instance A updates message status to DELIVERED
6. Security Considerations
- All queries must be signed with instance private key
- All responses must be verified using remote instance public key
- Timestamps must be within 5 minutes to prevent replay attacks
- Only ACTIVE connections can send/receive queries
- Workspace isolation enforced (RLS)
- Message deduplication using messageId
- Query content sanitization to prevent injection attacks
7. Query Authorization
Queries should be authorized based on:
- Connection status (must be ACTIVE)
- Workspace permissions (sender must have access)
- Query type (different queries may have different permissions)
- Rate limiting (prevent abuse)
8. Testing Strategy
Unit Tests (TDD approach):
- QueryService.sendQuery() creates signed query message
- QueryService.handleIncomingQuery() validates signature
- QueryService.handleIncomingQuery() rejects invalid signatures
- QueryService.handleIncomingQuery() rejects expired timestamps
- QueryService.processQueryResponse() updates message status
- Query deduplication works correctly
- Workspace isolation enforced
Integration Tests:
- POST /federation/query sends query to remote instance
- POST /incoming/query validates signature and processes query
- POST /incoming/query rejects inactive connections
- GET /queries returns workspace query messages
- GET /queries/:id returns query message details
- Workspace isolation (can't access other workspace queries)
- Connection requirement (can't query without ACTIVE connection)
Progress
- Create scratchpad
- Create database schema for FederationMessage model
- Create message.types.ts with protocol types
- Write tests for QueryService (15 tests)
- Implement QueryService
- Write tests for query API endpoints (9 tests)
- Implement query API endpoints
- Update FederationModule with QueryService
- Verify all tests pass (24/24 tests passing)
- Verify type checking passes
- Verify test coverage ≥85% (100% coverage on new code)
- Commit changes (commit
1159ca4)
Design Decisions
- Message Model: Separate FederationMessage model for tracking all message types (QUERY, COMMAND, EVENT)
- Correlation IDs: Use correlationId to link responses to requests
- Message Deduplication: Use unique messageId to prevent duplicate processing
- Workspace Scoping: All messages belong to a workspace for RLS
- Stateless Protocol: Each message is independently signed and verified
- Public Query Endpoint:
/incoming/queryis public (no auth) but requires valid signature - Status Tracking: Track message status (PENDING, DELIVERED, FAILED, TIMEOUT)
Notes
- Query messages are workspace-scoped (authenticated users only)
- Incoming query endpoint is public but cryptographically verified
- Need to handle network errors gracefully when calling remote instances
- Should validate connection is ACTIVE before sending queries
- Consider timeout handling for queries that don't receive responses
- Rate limiting should be considered for production (future enhancement)
Testing Plan
Unit Tests
- QueryService:
- Should create signed query message
- Should send query to remote instance
- Should validate incoming query signature
- Should reject queries with invalid signatures
- Should reject queries with expired timestamps
- Should reject queries from inactive connections
- Should deduplicate messages by messageId
- Should process query responses correctly
- Should update message status appropriately
- Should enforce workspace isolation
Integration Tests
-
POST /api/v1/federation/query:
- Should require authentication
- Should require ACTIVE connection
- Should create query message record
- Should send signed query to remote instance
- Should return query message details
-
POST /api/v1/federation/incoming/query:
- Should validate query signature
- Should reject queries with invalid signatures
- Should reject queries with old timestamps
- Should reject queries from inactive connections
- Should process valid queries
- Should return signed response
-
GET /api/v1/federation/queries:
- Should list workspace query messages
- Should filter by status if provided
- Should enforce workspace isolation
-
GET /api/v1/federation/queries/:id:
- Should return query message details
- Should enforce workspace ownership