Files
stack/docs/scratchpads/88-query-message-type.md
Jason Woltje 12abdfe81d feat(#93): implement agent spawn via federation
Implements FED-010: Agent Spawn via Federation feature that enables
spawning and managing Claude agents on remote federated Mosaic Stack
instances via COMMAND message type.

Features:
- Federation agent command types (spawn, status, kill)
- FederationAgentService for handling agent operations
- Integration with orchestrator's agent spawner/lifecycle services
- API endpoints for spawning, querying status, and killing agents
- Full command routing through federation COMMAND infrastructure
- Comprehensive test coverage (12/12 tests passing)

Architecture:
- Hub → Spoke: Spawn agents on remote instances
- Command flow: FederationController → FederationAgentService →
  CommandService → Remote Orchestrator
- Response handling: Remote orchestrator returns agent status/results
- Security: Connection validation, signature verification

Files created:
- apps/api/src/federation/types/federation-agent.types.ts
- apps/api/src/federation/federation-agent.service.ts
- apps/api/src/federation/federation-agent.service.spec.ts

Files modified:
- apps/api/src/federation/command.service.ts (agent command routing)
- apps/api/src/federation/federation.controller.ts (agent endpoints)
- apps/api/src/federation/federation.module.ts (service registration)
- apps/orchestrator/src/api/agents/agents.controller.ts (status endpoint)
- apps/orchestrator/src/api/agents/agents.module.ts (lifecycle integration)

Testing:
- 12/12 tests passing for FederationAgentService
- All command service tests passing
- TypeScript compilation successful
- Linting passed

Refs #93

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 14:37:06 -06:00

9.4 KiB

Issue #88: [FED-005] QUERY Message Type

Objective

Implement the QUERY message type for federation, building on the existing connection infrastructure from issues #84 and #85. This includes:

  • Query message structure and protocol
  • Request/response handling for federated queries
  • Query routing and authorization
  • API endpoints for sending and receiving queries
  • Proper TypeScript types (no explicit 'any')
  • Error handling and validation

Context

Previous issues provide the foundation:

  • Issue #84 (FED-001): Instance Identity Model with keypair signing
  • Issue #85 (FED-002): CONNECT/DISCONNECT Protocol with signature verification
  • Issue #86 (FED-003): Authentik OIDC Integration
  • Issue #87 (FED-004): Cross-Instance Identity Linking

Existing infrastructure:

  • SignatureService for message signing/verification
  • FederationConnection model with workspace scoping
  • FederationService for instance identity
  • Connection management with ACTIVE/PENDING/DISCONNECTED states

Approach

1. Database Schema Updates

Add FederationMessage model to track query messages:

enum FederationMessageType {
  QUERY
  COMMAND
  EVENT
}

enum FederationMessageStatus {
  PENDING
  DELIVERED
  FAILED
  TIMEOUT
}

model FederationMessage {
  id              String   @id @default(uuid()) @db.Uuid
  workspaceId     String   @map("workspace_id") @db.Uuid
  connectionId    String   @map("connection_id") @db.Uuid

  // Message metadata
  messageType     FederationMessageType @map("message_type")
  messageId       String   @unique @map("message_id") // UUID for deduplication
  correlationId   String?  @map("correlation_id") // For request/response tracking

  // Message content
  query           String?  @db.Text
  response        Json?    @default("{}")

  // Status tracking
  status          FederationMessageStatus @default(PENDING)
  error           String?  @db.Text

  // Security
  signature       String   @db.Text

  // Timestamps
  createdAt       DateTime @default(now()) @map("created_at") @db.Timestamptz
  updatedAt       DateTime @updatedAt @map("updated_at") @db.Timestamptz
  deliveredAt     DateTime? @map("delivered_at") @db.Timestamptz

  // Relations
  connection      FederationConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade)
  workspace       Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)

  @@index([workspaceId])
  @@index([connectionId])
  @@index([messageId])
  @@index([correlationId])
  @@map("federation_messages")
}

2. Create Types

Create /apps/api/src/federation/types/message.types.ts:

// Query message payload
interface QueryMessage {
  messageId: string;
  instanceId: string;
  query: string;
  context?: Record<string, unknown>;
  timestamp: number;
  signature: string;
}

// Query response payload
interface QueryResponse {
  messageId: string;
  correlationId: string; // Original query messageId
  instanceId: string;
  success: boolean;
  data?: unknown;
  error?: string;
  timestamp: number;
  signature: string;
}

// Query request DTO
interface SendQueryDto {
  connectionId: string;
  query: string;
  context?: Record<string, unknown>;
}

// Query message details
interface QueryMessageDetails {
  id: string;
  workspaceId: string;
  connectionId: string;
  messageType: string;
  messageId: string;
  correlationId?: string;
  query?: string;
  response?: unknown;
  status: string;
  error?: string;
  createdAt: Date;
  updatedAt: Date;
  deliveredAt?: Date;
}

3. Create Query Service

Create /apps/api/src/federation/query.service.ts:

Methods:

  • sendQuery(workspaceId, connectionId, query, context) - Send query to remote instance
  • handleIncomingQuery(queryMessage) - Process incoming query
  • getQueryMessages(workspaceId, filters) - List query messages
  • getQueryMessage(workspaceId, messageId) - Get single query message
  • createQueryMessage(workspaceId, connectionId, query) - Create signed query message
  • processQueryResponse(response) - Handle query response

4. Add API Endpoints

Extend FederationController or create QueryController:

  • POST /api/v1/federation/query - Send query to remote instance
  • POST /api/v1/federation/incoming/query - Receive query from remote instance (public endpoint)
  • GET /api/v1/federation/queries - List query messages
  • GET /api/v1/federation/queries/:id - Get query message details

5. Query Protocol Flow

Sender (Instance A) → Receiver (Instance B)

  1. Instance A validates connection is ACTIVE
  2. Instance A creates QueryMessage with unique messageId
  3. Instance A signs query with private key
  4. Instance A sends signed query to POST {remoteUrl}/api/v1/federation/incoming/query
  5. Instance B receives query, validates signature
  6. Instance B checks connection is ACTIVE
  7. Instance B processes query (delegates to workspace services)
  8. Instance B creates QueryResponse with correlationId = original messageId
  9. Instance B signs response with private key
  10. Instance B sends response back to Instance A
  11. Instance A receives response, validates signature
  12. Instance A updates message status to DELIVERED

6. Security Considerations

  • All queries must be signed with instance private key
  • All responses must be verified using remote instance public key
  • Timestamps must be within 5 minutes to prevent replay attacks
  • Only ACTIVE connections can send/receive queries
  • Workspace isolation enforced (RLS)
  • Message deduplication using messageId
  • Query content sanitization to prevent injection attacks

7. Query Authorization

Queries should be authorized based on:

  • Connection status (must be ACTIVE)
  • Workspace permissions (sender must have access)
  • Query type (different queries may have different permissions)
  • Rate limiting (prevent abuse)

8. Testing Strategy

Unit Tests (TDD approach):

  • QueryService.sendQuery() creates signed query message
  • QueryService.handleIncomingQuery() validates signature
  • QueryService.handleIncomingQuery() rejects invalid signatures
  • QueryService.handleIncomingQuery() rejects expired timestamps
  • QueryService.processQueryResponse() updates message status
  • Query deduplication works correctly
  • Workspace isolation enforced

Integration Tests:

  • POST /federation/query sends query to remote instance
  • POST /incoming/query validates signature and processes query
  • POST /incoming/query rejects inactive connections
  • GET /queries returns workspace query messages
  • GET /queries/:id returns query message details
  • Workspace isolation (can't access other workspace queries)
  • Connection requirement (can't query without ACTIVE connection)

Progress

  • Create scratchpad
  • Create database schema for FederationMessage model
  • Create message.types.ts with protocol types
  • Write tests for QueryService (15 tests)
  • Implement QueryService
  • Write tests for query API endpoints (9 tests)
  • Implement query API endpoints
  • Update FederationModule with QueryService
  • Verify all tests pass (24/24 tests passing)
  • Verify type checking passes
  • Verify test coverage ≥85% (100% coverage on new code)
  • Commit changes (commit 1159ca4)

Design Decisions

  1. Message Model: Separate FederationMessage model for tracking all message types (QUERY, COMMAND, EVENT)
  2. Correlation IDs: Use correlationId to link responses to requests
  3. Message Deduplication: Use unique messageId to prevent duplicate processing
  4. Workspace Scoping: All messages belong to a workspace for RLS
  5. Stateless Protocol: Each message is independently signed and verified
  6. Public Query Endpoint: /incoming/query is public (no auth) but requires valid signature
  7. Status Tracking: Track message status (PENDING, DELIVERED, FAILED, TIMEOUT)

Notes

  • Query messages are workspace-scoped (authenticated users only)
  • Incoming query endpoint is public but cryptographically verified
  • Need to handle network errors gracefully when calling remote instances
  • Should validate connection is ACTIVE before sending queries
  • Consider timeout handling for queries that don't receive responses
  • Rate limiting should be considered for production (future enhancement)

Testing Plan

Unit Tests

  1. QueryService:
    • Should create signed query message
    • Should send query to remote instance
    • Should validate incoming query signature
    • Should reject queries with invalid signatures
    • Should reject queries with expired timestamps
    • Should reject queries from inactive connections
    • Should deduplicate messages by messageId
    • Should process query responses correctly
    • Should update message status appropriately
    • Should enforce workspace isolation

Integration Tests

  1. POST /api/v1/federation/query:

    • Should require authentication
    • Should require ACTIVE connection
    • Should create query message record
    • Should send signed query to remote instance
    • Should return query message details
  2. POST /api/v1/federation/incoming/query:

    • Should validate query signature
    • Should reject queries with invalid signatures
    • Should reject queries with old timestamps
    • Should reject queries from inactive connections
    • Should process valid queries
    • Should return signed response
  3. GET /api/v1/federation/queries:

    • Should list workspace query messages
    • Should filter by status if provided
    • Should enforce workspace isolation
  4. GET /api/v1/federation/queries/:id:

    • Should return query message details
    • Should enforce workspace ownership