Files
stack/docs/scratchpads/90-event-subscriptions.md
Jason Woltje ca4f5ec011 feat(#90): implement EVENT subscriptions for federation
Implement event pub/sub messaging for federation to enable real-time
event streaming between federated instances.

Features:
- Event subscription management (subscribe/unsubscribe)
- Event publishing to subscribed instances
- Event acknowledgment protocol
- Server-side event filtering based on subscriptions
- Full signature verification and connection validation

Implementation:
- FederationEventSubscription model for storing subscriptions
- EventService with complete event lifecycle management
- EventController with authenticated and public endpoints
- EventMessage, EventAck, and SubscriptionDetails types
- Comprehensive DTOs for all event operations

API Endpoints:
- POST /api/v1/federation/events/subscribe
- POST /api/v1/federation/events/unsubscribe
- POST /api/v1/federation/events/publish
- GET /api/v1/federation/events/subscriptions
- GET /api/v1/federation/events/messages
- POST /api/v1/federation/incoming/event (public)
- POST /api/v1/federation/incoming/event/ack (public)

Testing:
- 18 unit tests for EventService (89.09% coverage)
- 11 unit tests for EventController (83.87% coverage)
- All 29 tests passing
- Follows TDD red-green-refactor cycle

Technical Notes:
- Reuses existing FederationMessage model with eventType field
- Follows patterns from QueryService and CommandService
- Uses existing signature and connection infrastructure
- Supports hierarchical event type naming (e.g., "task.created")

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 13:45:00 -06:00

6.4 KiB

Issue #90: EVENT Subscriptions

Objective

Implement EVENT message type for federation to enable pub/sub event streaming between federated instances.

Context

  • FED-005 (QUERY) and FED-006 (COMMAND) already implemented
  • FederationMessage model already supports EVENT type
  • Pattern established: Service layer handles business logic, controller exposes HTTP endpoints
  • Signature verification infrastructure exists (SignatureService)
  • Connection validation infrastructure exists (FederationService, ConnectionService)

Requirements

Event Message Structure

Based on existing QUERY/COMMAND patterns:

EventMessage (outgoing):

  • messageId: string (UUID)
  • instanceId: string (sender)
  • eventType: string (e.g., "task.created", "project.updated")
  • payload: Record<string, unknown>
  • timestamp: number (Unix ms)
  • signature: string (RSA signature)

EventAck (acknowledgment):

  • messageId: string (UUID)
  • correlationId: string (original event messageId)
  • instanceId: string (responder)
  • received: boolean
  • timestamp: number
  • signature: string

Subscription Management

  • Subscribe to event types from remote instances
  • Unsubscribe from event types
  • Store subscriptions in database (new model: FederationEventSubscription)
  • Filter events based on subscriptions before sending

Event Publishing

  • Publish events to subscribed remote instances
  • Track delivery status
  • Handle failed deliveries with retry logic
  • Acknowledge received events

API Endpoints

  1. POST /api/v1/federation/events/subscribe - Subscribe to event type
  2. POST /api/v1/federation/events/unsubscribe - Unsubscribe from event type
  3. GET /api/v1/federation/events/subscriptions - List subscriptions
  4. POST /api/v1/federation/events/publish - Publish event
  5. GET /api/v1/federation/events/messages - List event messages
  6. POST /api/v1/federation/incoming/event - Handle incoming event (public)

Approach

Phase 1: Database Schema (Already Done)

  • FederationMessage model supports EVENT type (line 179 in schema.prisma)
  • Need to add FederationEventSubscription model

Phase 2: Type Definitions (TDD - Test First)

  • Add EventMessage, EventAck, EventMessageDetails to message.types.ts
  • Add SubscriptionDetails type for subscription management

Phase 3: EventService (TDD - Test First)

Following QueryService/CommandService pattern:

  • subscribeToEventType(): Create subscription
  • unsubscribeFromEventType(): Remove subscription
  • publishEvent(): Send event to subscribed instances
  • handleIncomingEvent(): Process received event, return ack
  • processEventAck(): Update delivery status
  • getEventMessages(): List events for workspace
  • getEventSubscriptions(): List subscriptions for workspace

Phase 4: EventController (TDD - Test First)

  • Authenticated endpoints for event management
  • Public endpoint for incoming events (signature-verified)

Phase 5: Integration

  • Add EventService to FederationModule
  • Add EventController to FederationModule
  • Update exports

Design Decisions

  1. Subscription Model: Store subscriptions in database for persistence
  2. Event Filtering: Server-side filtering based on subscriptions (don't send unsubscribed events)
  3. Acknowledgment: Simple ACK pattern (not full response like QUERY/COMMAND)
  4. Event Types: Free-form strings (e.g., "task.created", "user.login") for flexibility
  5. Retry Logic: Store failed deliveries for manual retry (Phase 6 enhancement)

Implementation Order (TDD)

  1. Write test for FederationEventSubscription model migration
  2. Create migration for FederationEventSubscription
  3. Write tests for EventMessage/EventAck types
  4. Add EventMessage/EventAck/EventMessageDetails to message.types.ts
  5. Write tests for EventService.subscribeToEventType()
  6. Implement EventService.subscribeToEventType()
  7. Write tests for EventService.unsubscribeFromEventType()
  8. Implement EventService.unsubscribeFromEventType()
  9. Write tests for EventService.publishEvent()
  10. Implement EventService.publishEvent()
  11. Write tests for EventService.handleIncomingEvent()
  12. Implement EventService.handleIncomingEvent()
  13. Write tests for EventService.processEventAck()
  14. Implement EventService.processEventAck()
  15. Write tests for EventController endpoints
  16. Implement EventController
  17. Integration tests
  18. Update module exports

Testing Strategy

Unit Tests

  • EventService: All methods with mocked dependencies
  • EventController: All endpoints with mocked service

Integration Tests

  • End-to-end event flow: subscribe → publish → receive → ack
  • Signature verification
  • Connection validation
  • Error handling

Coverage Target

  • Minimum 85% code coverage (project standard)

Progress

  • Create FederationEventSubscription Prisma model
  • Generate Prisma migration
  • Add event message types to message.types.ts
  • Create event.service.ts (TDD)
  • Create event.service.spec.ts (18 tests - all passing)
  • Create event.controller.ts (TDD)
  • Create event.controller.spec.ts (11 tests - all passing)
  • Add DTO files (subscribe, unsubscribe, publish)
  • Update federation.module.ts
  • Run integration tests (29 tests passing)
  • Verify 85%+ coverage (89.09% service, 83.87% controller)
  • Manual testing with two instances (optional)

Files to Create/Modify

New Files

  • apps/api/src/federation/event.service.ts
  • apps/api/src/federation/event.service.spec.ts
  • apps/api/src/federation/event.controller.ts
  • apps/api/src/federation/event.controller.spec.ts
  • apps/api/src/federation/dto/event.dto.ts
  • apps/api/prisma/migrations/XXXXXXXX_add_federation_event_subscriptions/migration.sql

Modified Files

  • apps/api/src/federation/types/message.types.ts (add EVENT types)
  • apps/api/src/federation/federation.module.ts (add EventService, EventController)
  • apps/api/prisma/schema.prisma (add FederationEventSubscription model)

Notes

Event Type Naming Convention

Use dot-notation for hierarchical event types:

  • entity.action (e.g., "task.created", "user.updated")
  • entity.action.detail (e.g., "task.status.changed")

Security Considerations

  • All events must be signature-verified
  • Only send events to active connections
  • Rate limiting should be considered for event publishing (future enhancement)
  • Event payload should not contain sensitive data (responsibility of publisher)

Future Enhancements (Not in This Issue)

  • Event replay/history
  • Event filtering by payload fields
  • Webhook support for event delivery
  • Event schema validation
  • Rate limiting
  • Batch event delivery