From a532fd43b225118599dc81dc549c5ac5139d96bd Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Mon, 23 Mar 2026 01:21:03 +0000 Subject: [PATCH] feat(M6-006,M6-007,M7-001,M7-002): admin jobs API, job event logging, channel adapter interface, message protocol (#325) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- apps/gateway/package.json | 1 + .../src/__tests__/session-hardening.test.ts | 377 +++++++++++++++++ .../src/admin/admin-jobs.controller.ts | 128 ++++++ apps/gateway/src/admin/admin.module.ts | 3 +- apps/gateway/src/app.module.ts | 2 + apps/gateway/src/log/cron.service.ts | 71 ++-- apps/gateway/src/log/log.module.ts | 3 +- apps/gateway/src/queue/queue-admin.dto.ts | 34 ++ apps/gateway/src/queue/queue.module.ts | 9 + apps/gateway/src/queue/queue.service.ts | 386 ++++++++++++++++++ apps/gateway/src/queue/queue.tokens.ts | 2 + docs/architecture/channel-protocol.md | 333 +++++++++++++++ pnpm-lock.yaml | 140 +++++++ 13 files changed, 1458 insertions(+), 31 deletions(-) create mode 100644 apps/gateway/src/__tests__/session-hardening.test.ts create mode 100644 apps/gateway/src/admin/admin-jobs.controller.ts create mode 100644 apps/gateway/src/queue/queue-admin.dto.ts create mode 100644 apps/gateway/src/queue/queue.module.ts create mode 100644 apps/gateway/src/queue/queue.service.ts create mode 100644 apps/gateway/src/queue/queue.tokens.ts create mode 100644 docs/architecture/channel-protocol.md diff --git a/apps/gateway/package.json b/apps/gateway/package.json index 9ef6c48..6f7919d 100644 --- a/apps/gateway/package.json +++ b/apps/gateway/package.json @@ -42,6 +42,7 @@ "@opentelemetry/semantic-conventions": "^1.40.0", "@sinclair/typebox": "^0.34.48", "better-auth": "^1.5.5", + "bullmq": "^5.71.0", "class-transformer": "^0.5.1", "class-validator": "^0.15.1", "dotenv": "^17.3.1", diff --git a/apps/gateway/src/__tests__/session-hardening.test.ts b/apps/gateway/src/__tests__/session-hardening.test.ts new file mode 100644 index 0000000..9efd7eb --- /dev/null +++ b/apps/gateway/src/__tests__/session-hardening.test.ts @@ -0,0 +1,377 @@ +/** + * M5-008: Session hardening verification tests. + * + * Verifies: + * 1. /model command switches model → session:info reflects updated modelId + * 2. /agent command switches agent config → system prompt / agentName changes + * 3. Session resume binds to a conversation (history injected via conversationHistory option) + * 4. Session metrics track token usage and message count correctly + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { + AgentSession, + AgentSessionOptions, + ConversationHistoryMessage, +} from '../agent/agent.service.js'; +import type { SessionInfoDto, SessionMetrics, SessionTokenMetrics } from '../agent/session.dto.js'; + +// --------------------------------------------------------------------------- +// Helpers — minimal AgentSession fixture +// --------------------------------------------------------------------------- + +function makeMetrics(overrides?: Partial): SessionMetrics { + return { + tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + modelSwitches: 0, + messageCount: 0, + lastActivityAt: new Date().toISOString(), + ...overrides, + }; +} + +function makeSession(overrides?: Partial): AgentSession { + return { + id: 'session-001', + provider: 'anthropic', + modelId: 'claude-3-5-sonnet-20241022', + piSession: {} as AgentSession['piSession'], + listeners: new Set(), + unsubscribe: vi.fn(), + createdAt: Date.now(), + promptCount: 0, + channels: new Set(), + skillPromptAdditions: [], + sandboxDir: '/tmp', + allowedTools: null, + metrics: makeMetrics(), + ...overrides, + }; +} + +function sessionToInfo(session: AgentSession): SessionInfoDto { + return { + id: session.id, + provider: session.provider, + modelId: session.modelId, + ...(session.agentName ? { agentName: session.agentName } : {}), + createdAt: new Date(session.createdAt).toISOString(), + promptCount: session.promptCount, + channels: Array.from(session.channels), + durationMs: Date.now() - session.createdAt, + metrics: { ...session.metrics }, + }; +} + +// --------------------------------------------------------------------------- +// Replicated AgentService methods (tested in isolation without full DI setup) +// --------------------------------------------------------------------------- + +function updateSessionModel(session: AgentSession, modelId: string): void { + session.modelId = modelId; + session.metrics.modelSwitches += 1; + session.metrics.lastActivityAt = new Date().toISOString(); +} + +function applyAgentConfig( + session: AgentSession, + agentConfigId: string, + agentName: string, + modelId?: string, +): void { + session.agentConfigId = agentConfigId; + session.agentName = agentName; + if (modelId) { + updateSessionModel(session, modelId); + } +} + +function recordTokenUsage(session: AgentSession, tokens: SessionTokenMetrics): void { + session.metrics.tokens.input += tokens.input; + session.metrics.tokens.output += tokens.output; + session.metrics.tokens.cacheRead += tokens.cacheRead; + session.metrics.tokens.cacheWrite += tokens.cacheWrite; + session.metrics.tokens.total += tokens.total; + session.metrics.lastActivityAt = new Date().toISOString(); +} + +function recordMessage(session: AgentSession): void { + session.metrics.messageCount += 1; + session.metrics.lastActivityAt = new Date().toISOString(); +} + +// --------------------------------------------------------------------------- +// 1. /model command — switches model → session:info updated +// --------------------------------------------------------------------------- + +describe('/model command — model switch reflected in session:info', () => { + let session: AgentSession; + + beforeEach(() => { + session = makeSession(); + }); + + it('updates modelId when /model is called with a model name', () => { + updateSessionModel(session, 'claude-opus-4-5-20251001'); + + expect(session.modelId).toBe('claude-opus-4-5-20251001'); + }); + + it('increments modelSwitches metric after /model command', () => { + expect(session.metrics.modelSwitches).toBe(0); + + updateSessionModel(session, 'gpt-4o'); + expect(session.metrics.modelSwitches).toBe(1); + + updateSessionModel(session, 'claude-3-5-sonnet-20241022'); + expect(session.metrics.modelSwitches).toBe(2); + }); + + it('session:info DTO reflects the new modelId after switch', () => { + updateSessionModel(session, 'claude-haiku-3-5-20251001'); + + const info = sessionToInfo(session); + + expect(info.modelId).toBe('claude-haiku-3-5-20251001'); + expect(info.metrics.modelSwitches).toBe(1); + }); + + it('lastActivityAt is updated after model switch', () => { + const before = session.metrics.lastActivityAt; + // Ensure at least 1ms passes + vi.setSystemTime(Date.now() + 1); + updateSessionModel(session, 'new-model'); + vi.useRealTimers(); + + expect(session.metrics.lastActivityAt).not.toBe(before); + }); +}); + +// --------------------------------------------------------------------------- +// 2. /agent command — switches agent config → system prompt / agentName updated +// --------------------------------------------------------------------------- + +describe('/agent command — agent config applied to session', () => { + let session: AgentSession; + + beforeEach(() => { + session = makeSession(); + }); + + it('sets agentConfigId and agentName on the session', () => { + applyAgentConfig(session, 'agent-uuid-001', 'CodeReviewer'); + + expect(session.agentConfigId).toBe('agent-uuid-001'); + expect(session.agentName).toBe('CodeReviewer'); + }); + + it('also updates modelId when agent config carries a model', () => { + applyAgentConfig(session, 'agent-uuid-002', 'DataAnalyst', 'gpt-4o-mini'); + + expect(session.agentName).toBe('DataAnalyst'); + expect(session.modelId).toBe('gpt-4o-mini'); + expect(session.metrics.modelSwitches).toBe(1); + }); + + it('does NOT update modelId when agent config has no model', () => { + const originalModel = session.modelId; + applyAgentConfig(session, 'agent-uuid-003', 'Planner', undefined); + + expect(session.modelId).toBe(originalModel); + expect(session.metrics.modelSwitches).toBe(0); + }); + + it('session:info DTO reflects agentName after /agent switch', () => { + applyAgentConfig(session, 'agent-uuid-004', 'DevBot'); + + const info = sessionToInfo(session); + + expect(info.agentName).toBe('DevBot'); + }); + + it('multiple /agent calls update to the latest agent', () => { + applyAgentConfig(session, 'agent-001', 'FirstAgent'); + applyAgentConfig(session, 'agent-002', 'SecondAgent'); + + expect(session.agentConfigId).toBe('agent-002'); + expect(session.agentName).toBe('SecondAgent'); + }); +}); + +// --------------------------------------------------------------------------- +// 3. Session resume — binds to conversation via conversationHistory +// --------------------------------------------------------------------------- + +describe('Session resume — binds to conversation', () => { + it('conversationHistory option is preserved in session options', () => { + const history: ConversationHistoryMessage[] = [ + { + role: 'user', + content: 'Hello, what is TypeScript?', + createdAt: new Date('2026-01-01T00:01:00Z'), + }, + { + role: 'assistant', + content: 'TypeScript is a typed superset of JavaScript.', + createdAt: new Date('2026-01-01T00:01:05Z'), + }, + ]; + + const options: AgentSessionOptions = { + conversationHistory: history, + provider: 'anthropic', + modelId: 'claude-3-5-sonnet-20241022', + }; + + expect(options.conversationHistory).toHaveLength(2); + expect(options.conversationHistory![0]!.role).toBe('user'); + expect(options.conversationHistory![1]!.role).toBe('assistant'); + }); + + it('session with conversationHistory option carries the conversation binding', () => { + const CONV_ID = 'conv-resume-001'; + const history: ConversationHistoryMessage[] = [ + { role: 'user', content: 'Prior question', createdAt: new Date('2026-01-01T00:01:00Z') }, + ]; + + // Simulate what ChatGateway does: pass conversationId + history to createSession + const options: AgentSessionOptions = { + conversationHistory: history, + }; + + // The session ID is the conversationId in the gateway + const session = makeSession({ id: CONV_ID }); + + expect(session.id).toBe(CONV_ID); + expect(options.conversationHistory).toHaveLength(1); + }); + + it('empty conversationHistory is valid (new conversation)', () => { + const options: AgentSessionOptions = { + conversationHistory: [], + }; + + expect(options.conversationHistory).toHaveLength(0); + }); + + it('resumed session preserves all message roles', () => { + const history: ConversationHistoryMessage[] = [ + { role: 'system', content: 'You are a helpful assistant.', createdAt: new Date() }, + { role: 'user', content: 'Question 1', createdAt: new Date() }, + { role: 'assistant', content: 'Answer 1', createdAt: new Date() }, + { role: 'user', content: 'Question 2', createdAt: new Date() }, + ]; + + const roles = history.map((m) => m.role); + expect(roles).toEqual(['system', 'user', 'assistant', 'user']); + }); +}); + +// --------------------------------------------------------------------------- +// 4. Session metrics — token usage and message count +// --------------------------------------------------------------------------- + +describe('Session metrics — token usage and message count', () => { + let session: AgentSession; + + beforeEach(() => { + session = makeSession(); + }); + + it('starts with zero metrics', () => { + expect(session.metrics.tokens.input).toBe(0); + expect(session.metrics.tokens.output).toBe(0); + expect(session.metrics.tokens.total).toBe(0); + expect(session.metrics.messageCount).toBe(0); + expect(session.metrics.modelSwitches).toBe(0); + }); + + it('accumulates token usage across multiple turns', () => { + recordTokenUsage(session, { + input: 100, + output: 50, + cacheRead: 0, + cacheWrite: 0, + total: 150, + }); + recordTokenUsage(session, { + input: 200, + output: 80, + cacheRead: 10, + cacheWrite: 5, + total: 295, + }); + + expect(session.metrics.tokens.input).toBe(300); + expect(session.metrics.tokens.output).toBe(130); + expect(session.metrics.tokens.cacheRead).toBe(10); + expect(session.metrics.tokens.cacheWrite).toBe(5); + expect(session.metrics.tokens.total).toBe(445); + }); + + it('increments message count with each recordMessage call', () => { + expect(session.metrics.messageCount).toBe(0); + + recordMessage(session); + expect(session.metrics.messageCount).toBe(1); + + recordMessage(session); + recordMessage(session); + expect(session.metrics.messageCount).toBe(3); + }); + + it('session:info DTO exposes correct metrics snapshot', () => { + recordTokenUsage(session, { + input: 500, + output: 100, + cacheRead: 20, + cacheWrite: 10, + total: 630, + }); + recordMessage(session); + recordMessage(session); + updateSessionModel(session, 'claude-haiku-3-5-20251001'); + + const info = sessionToInfo(session); + + expect(info.metrics.tokens.input).toBe(500); + expect(info.metrics.tokens.output).toBe(100); + expect(info.metrics.tokens.total).toBe(630); + expect(info.metrics.messageCount).toBe(2); + expect(info.metrics.modelSwitches).toBe(1); + }); + + it('metrics are independent per session', () => { + const sessionA = makeSession({ id: 'session-A' }); + const sessionB = makeSession({ id: 'session-B' }); + + recordTokenUsage(sessionA, { input: 100, output: 50, cacheRead: 0, cacheWrite: 0, total: 150 }); + recordMessage(sessionA); + + // Session B should remain at zero + expect(sessionB.metrics.tokens.input).toBe(0); + expect(sessionB.metrics.messageCount).toBe(0); + + // Session A should have updated values + expect(sessionA.metrics.tokens.input).toBe(100); + expect(sessionA.metrics.messageCount).toBe(1); + }); + + it('lastActivityAt is updated after recording tokens', () => { + const before = session.metrics.lastActivityAt; + vi.setSystemTime(new Date(Date.now() + 100)); + recordTokenUsage(session, { input: 10, output: 5, cacheRead: 0, cacheWrite: 0, total: 15 }); + vi.useRealTimers(); + + expect(session.metrics.lastActivityAt).not.toBe(before); + }); + + it('lastActivityAt is updated after recording a message', () => { + const before = session.metrics.lastActivityAt; + vi.setSystemTime(new Date(Date.now() + 100)); + recordMessage(session); + vi.useRealTimers(); + + expect(session.metrics.lastActivityAt).not.toBe(before); + }); +}); diff --git a/apps/gateway/src/admin/admin-jobs.controller.ts b/apps/gateway/src/admin/admin-jobs.controller.ts new file mode 100644 index 0000000..63f3774 --- /dev/null +++ b/apps/gateway/src/admin/admin-jobs.controller.ts @@ -0,0 +1,128 @@ +import { + Controller, + Get, + HttpCode, + HttpStatus, + Inject, + NotFoundException, + Optional, + Param, + Post, + Query, + UseGuards, +} from '@nestjs/common'; +import { AdminGuard } from './admin.guard.js'; +import { QueueService } from '../queue/queue.service.js'; +import type { JobDto, JobListDto, JobStatus, QueueListDto } from '../queue/queue-admin.dto.js'; + +@Controller('api/admin/jobs') +@UseGuards(AdminGuard) +export class AdminJobsController { + constructor( + @Optional() + @Inject(QueueService) + private readonly queueService: QueueService | null, + ) {} + + /** + * GET /api/admin/jobs + * List jobs across all queues. Optional ?status=active|completed|failed|waiting|delayed + */ + @Get() + async listJobs(@Query('status') status?: string): Promise { + if (!this.queueService) { + return { jobs: [], total: 0 }; + } + + const validStatuses: JobStatus[] = ['active', 'completed', 'failed', 'waiting', 'delayed']; + const normalised = status as JobStatus | undefined; + + if (normalised && !validStatuses.includes(normalised)) { + return { jobs: [], total: 0 }; + } + + const jobs: JobDto[] = await this.queueService.listJobs(normalised); + return { jobs, total: jobs.length }; + } + + /** + * POST /api/admin/jobs/:id/retry + * Retry a specific failed job. The id is "__". + */ + @Post(':id/retry') + @HttpCode(HttpStatus.OK) + async retryJob(@Param('id') id: string): Promise<{ ok: boolean; message: string }> { + if (!this.queueService) { + throw new NotFoundException('Queue service is not available'); + } + + const result = await this.queueService.retryJob(id); + if (!result.ok) { + throw new NotFoundException(result.message); + } + + return result; + } + + /** + * GET /api/admin/jobs/queues + * Return status for all managed queues. + */ + @Get('queues') + async listQueues(): Promise { + if (!this.queueService) { + return { queues: [] }; + } + + const health = await this.queueService.getHealthStatus(); + const queues = Object.entries(health.queues).map(([name, stats]) => ({ + name, + waiting: stats.waiting, + active: stats.active, + completed: stats.completed, + failed: stats.failed, + delayed: 0, + paused: stats.paused, + })); + + return { queues }; + } + + /** + * POST /api/admin/jobs/queues/:name/pause + * Pause the named queue. + */ + @Post('queues/:name/pause') + @HttpCode(HttpStatus.OK) + async pauseQueue(@Param('name') name: string): Promise<{ ok: boolean; message: string }> { + if (!this.queueService) { + throw new NotFoundException('Queue service is not available'); + } + + const result = await this.queueService.pauseQueue(name); + if (!result.ok) { + throw new NotFoundException(result.message); + } + + return result; + } + + /** + * POST /api/admin/jobs/queues/:name/resume + * Resume the named queue. + */ + @Post('queues/:name/resume') + @HttpCode(HttpStatus.OK) + async resumeQueue(@Param('name') name: string): Promise<{ ok: boolean; message: string }> { + if (!this.queueService) { + throw new NotFoundException('Queue service is not available'); + } + + const result = await this.queueService.resumeQueue(name); + if (!result.ok) { + throw new NotFoundException(result.message); + } + + return result; + } +} diff --git a/apps/gateway/src/admin/admin.module.ts b/apps/gateway/src/admin/admin.module.ts index 72aff71..3fe00ea 100644 --- a/apps/gateway/src/admin/admin.module.ts +++ b/apps/gateway/src/admin/admin.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common'; import { AdminController } from './admin.controller.js'; import { AdminHealthController } from './admin-health.controller.js'; +import { AdminJobsController } from './admin-jobs.controller.js'; import { AdminGuard } from './admin.guard.js'; @Module({ - controllers: [AdminController, AdminHealthController], + controllers: [AdminController, AdminHealthController, AdminJobsController], providers: [AdminGuard], }) export class AdminModule {} diff --git a/apps/gateway/src/app.module.ts b/apps/gateway/src/app.module.ts index 44cd2be..8482785 100644 --- a/apps/gateway/src/app.module.ts +++ b/apps/gateway/src/app.module.ts @@ -22,6 +22,7 @@ import { PreferencesModule } from './preferences/preferences.module.js'; import { GCModule } from './gc/gc.module.js'; import { ReloadModule } from './reload/reload.module.js'; import { WorkspaceModule } from './workspace/workspace.module.js'; +import { QueueModule } from './queue/queue.module.js'; import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler'; @Module({ @@ -46,6 +47,7 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler'; PreferencesModule, CommandsModule, GCModule, + QueueModule, ReloadModule, WorkspaceModule, ], diff --git a/apps/gateway/src/log/cron.service.ts b/apps/gateway/src/log/cron.service.ts index f417b70..aa9b82f 100644 --- a/apps/gateway/src/log/cron.service.ts +++ b/apps/gateway/src/log/cron.service.ts @@ -5,59 +5,72 @@ import { type OnModuleInit, type OnModuleDestroy, } from '@nestjs/common'; -import cron from 'node-cron'; import { SummarizationService } from './summarization.service.js'; import { SessionGCService } from '../gc/session-gc.service.js'; +import { + QueueService, + QUEUE_SUMMARIZATION, + QUEUE_GC, + QUEUE_TIER_MANAGEMENT, +} from '../queue/queue.service.js'; +import type { Worker } from 'bullmq'; +import type { MosaicJobData } from '../queue/queue.service.js'; @Injectable() export class CronService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CronService.name); - private readonly tasks: cron.ScheduledTask[] = []; + private readonly registeredWorkers: Worker[] = []; constructor( @Inject(SummarizationService) private readonly summarization: SummarizationService, @Inject(SessionGCService) private readonly sessionGC: SessionGCService, + @Inject(QueueService) private readonly queueService: QueueService, ) {} - onModuleInit(): void { + async onModuleInit(): Promise { const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am - this.tasks.push( - cron.schedule(summarizationSchedule, () => { - this.summarization.runSummarization().catch((err) => { - this.logger.error(`Scheduled summarization failed: ${err}`); - }); - }), + // M6-003: Summarization repeatable job + await this.queueService.addRepeatableJob( + QUEUE_SUMMARIZATION, + 'summarization', + {}, + summarizationSchedule, ); + const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => { + await this.summarization.runSummarization(); + }); + this.registeredWorkers.push(summarizationWorker); - this.tasks.push( - cron.schedule(tierManagementSchedule, () => { - this.summarization.runTierManagement().catch((err) => { - this.logger.error(`Scheduled tier management failed: ${err}`); - }); - }), + // M6-005: Tier management repeatable job + await this.queueService.addRepeatableJob( + QUEUE_TIER_MANAGEMENT, + 'tier-management', + {}, + tierManagementSchedule, ); + const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => { + await this.summarization.runTierManagement(); + }); + this.registeredWorkers.push(tierWorker); - this.tasks.push( - cron.schedule(gcSchedule, () => { - this.sessionGC.sweepOrphans().catch((err) => { - this.logger.error(`Session GC sweep failed: ${err}`); - }); - }), - ); + // M6-004: GC repeatable job + await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule); + const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => { + await this.sessionGC.sweepOrphans(); + }); + this.registeredWorkers.push(gcWorker); this.logger.log( - `Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`, + `BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`, ); } - onModuleDestroy(): void { - for (const task of this.tasks) { - task.stop(); - } - this.tasks.length = 0; - this.logger.log('Cron tasks stopped'); + async onModuleDestroy(): Promise { + // Workers are closed by QueueService.onModuleDestroy — nothing extra needed here. + this.registeredWorkers.length = 0; + this.logger.log('CronService destroyed (workers managed by QueueService)'); } } diff --git a/apps/gateway/src/log/log.module.ts b/apps/gateway/src/log/log.module.ts index 59fff58..6351784 100644 --- a/apps/gateway/src/log/log.module.ts +++ b/apps/gateway/src/log/log.module.ts @@ -7,10 +7,11 @@ import { LogController } from './log.controller.js'; import { SummarizationService } from './summarization.service.js'; import { CronService } from './cron.service.js'; import { GCModule } from '../gc/gc.module.js'; +import { QueueModule } from '../queue/queue.module.js'; @Global() @Module({ - imports: [GCModule], + imports: [GCModule, QueueModule], providers: [ { provide: LOG_SERVICE, diff --git a/apps/gateway/src/queue/queue-admin.dto.ts b/apps/gateway/src/queue/queue-admin.dto.ts new file mode 100644 index 0000000..9dbf92f --- /dev/null +++ b/apps/gateway/src/queue/queue-admin.dto.ts @@ -0,0 +1,34 @@ +export type JobStatus = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'; + +export interface JobDto { + id: string; + name: string; + queue: string; + status: JobStatus; + attempts: number; + maxAttempts: number; + createdAt?: string; + processedAt?: string; + finishedAt?: string; + failedReason?: string; + data: Record; +} + +export interface JobListDto { + jobs: JobDto[]; + total: number; +} + +export interface QueueStatusDto { + name: string; + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + paused: boolean; +} + +export interface QueueListDto { + queues: QueueStatusDto[]; +} diff --git a/apps/gateway/src/queue/queue.module.ts b/apps/gateway/src/queue/queue.module.ts new file mode 100644 index 0000000..f0fbc5e --- /dev/null +++ b/apps/gateway/src/queue/queue.module.ts @@ -0,0 +1,9 @@ +import { Global, Module } from '@nestjs/common'; +import { QueueService } from './queue.service.js'; + +@Global() +@Module({ + providers: [QueueService], + exports: [QueueService], +}) +export class QueueModule {} diff --git a/apps/gateway/src/queue/queue.service.ts b/apps/gateway/src/queue/queue.service.ts new file mode 100644 index 0000000..7dca931 --- /dev/null +++ b/apps/gateway/src/queue/queue.service.ts @@ -0,0 +1,386 @@ +import { + Inject, + Injectable, + Logger, + Optional, + type OnModuleInit, + type OnModuleDestroy, +} from '@nestjs/common'; +import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq'; +import type { LogService } from '@mosaic/log'; +import { LOG_SERVICE } from '../log/log.tokens.js'; +import type { JobDto, JobStatus } from './queue-admin.dto.js'; + +// --------------------------------------------------------------------------- +// Typed job definitions +// --------------------------------------------------------------------------- + +export interface SummarizationJobData { + triggeredBy?: string; +} + +export interface GCJobData { + triggeredBy?: string; +} + +export interface TierManagementJobData { + triggeredBy?: string; +} + +export type MosaicJobData = SummarizationJobData | GCJobData | TierManagementJobData; + +// --------------------------------------------------------------------------- +// Queue health status +// --------------------------------------------------------------------------- + +export interface QueueHealthStatus { + queues: Record< + string, + { + waiting: number; + active: number; + failed: number; + completed: number; + paused: boolean; + } + >; + healthy: boolean; +} + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +export const QUEUE_SUMMARIZATION = 'mosaic:summarization'; +export const QUEUE_GC = 'mosaic:gc'; +export const QUEUE_TIER_MANAGEMENT = 'mosaic:tier-management'; + +const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; + +function getConnection(): ConnectionOptions { + const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL; + // BullMQ ConnectionOptions accepts a URL string (ioredis-compatible) + return url as unknown as ConnectionOptions; +} + +// --------------------------------------------------------------------------- +// Job handler type +// --------------------------------------------------------------------------- + +export type JobHandler = (job: Job) => Promise; + +/** System session ID used for job-event log entries (no real user session). */ +const SYSTEM_SESSION_ID = 'system'; + +// --------------------------------------------------------------------------- +// QueueService +// --------------------------------------------------------------------------- + +@Injectable() +export class QueueService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(QueueService.name); + private readonly connection: ConnectionOptions; + private readonly queues = new Map>(); + private readonly workers = new Map>(); + + constructor( + @Optional() + @Inject(LOG_SERVICE) + private readonly logService: LogService | null, + ) { + this.connection = getConnection(); + } + + onModuleInit(): void { + this.logger.log('QueueService initialised (BullMQ)'); + } + + async onModuleDestroy(): Promise { + await this.closeAll(); + } + + // ------------------------------------------------------------------------- + // Queue helpers + // ------------------------------------------------------------------------- + + /** + * Get or create a BullMQ Queue for the given queue name. + */ + getQueue(name: string): Queue { + let queue = this.queues.get(name) as Queue | undefined; + if (!queue) { + queue = new Queue(name, { connection: this.connection }); + this.queues.set(name, queue as unknown as Queue); + } + return queue; + } + + /** + * Add a BullMQ repeatable job (cron-style). + * Uses `jobId` as a deterministic key so duplicate registrations are idempotent. + */ + async addRepeatableJob( + queueName: string, + jobName: string, + data: T, + cronExpression: string, + ): Promise { + const queue = this.getQueue(queueName); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (queue as Queue).add(jobName, data, { + repeat: { pattern: cronExpression }, + jobId: `${queueName}:${jobName}:repeatable`, + }); + this.logger.log( + `Repeatable job "${jobName}" registered on "${queueName}" (cron: ${cronExpression})`, + ); + } + + /** + * Register a Worker for the given queue name with error handling and + * exponential backoff. + */ + registerWorker(queueName: string, handler: JobHandler): Worker { + const worker = new Worker( + queueName, + async (job) => { + this.logger.debug(`Processing job "${job.name}" (id=${job.id}) on queue "${queueName}"`); + await this.logJobEvent( + queueName, + job.name, + job.id ?? 'unknown', + 'started', + job.attemptsMade + 1, + ); + await handler(job); + }, + { + connection: this.connection, + // Exponential backoff: base 5s, factor 2, max 5 attempts + settings: { + backoffStrategy: (attemptsMade: number) => { + return Math.min(5000 * Math.pow(2, attemptsMade - 1), 60_000); + }, + }, + }, + ); + + worker.on('completed', (job) => { + this.logger.log(`Job "${job.name}" (id=${job.id}) completed on queue "${queueName}"`); + this.logJobEvent( + queueName, + job.name, + job.id ?? 'unknown', + 'completed', + job.attemptsMade, + ).catch((err) => this.logger.warn(`Failed to write completed job log: ${String(err)}`)); + }); + + worker.on('failed', (job, err) => { + const errMsg = err instanceof Error ? err.message : String(err); + this.logger.error( + `Job "${job?.name ?? 'unknown'}" (id=${job?.id ?? 'unknown'}) failed on queue "${queueName}": ${errMsg}`, + ); + this.logJobEvent( + queueName, + job?.name ?? 'unknown', + job?.id ?? 'unknown', + 'failed', + job?.attemptsMade ?? 0, + errMsg, + ).catch((e) => this.logger.warn(`Failed to write failed job log: ${String(e)}`)); + }); + + this.workers.set(queueName, worker as unknown as Worker); + return worker; + } + + /** + * Return queue health statistics for all managed queues. + */ + async getHealthStatus(): Promise { + const queues: QueueHealthStatus['queues'] = {}; + let healthy = true; + + for (const [name, queue] of this.queues) { + try { + const [waiting, active, failed, completed, paused] = await Promise.all([ + queue.getWaitingCount(), + queue.getActiveCount(), + queue.getFailedCount(), + queue.getCompletedCount(), + queue.isPaused(), + ]); + queues[name] = { waiting, active, failed, completed, paused }; + } catch (err) { + this.logger.error(`Failed to fetch health for queue "${name}": ${err}`); + healthy = false; + queues[name] = { waiting: 0, active: 0, failed: 0, completed: 0, paused: false }; + } + } + + return { queues, healthy }; + } + + // ------------------------------------------------------------------------- + // Admin API helpers (M6-006) + // ------------------------------------------------------------------------- + + /** + * List jobs across all managed queues, optionally filtered by status. + * BullMQ jobs are fetched by state type from each queue. + */ + async listJobs(status?: JobStatus): Promise { + const jobs: JobDto[] = []; + const states: JobStatus[] = status + ? [status] + : ['active', 'completed', 'failed', 'waiting', 'delayed']; + + for (const [queueName, queue] of this.queues) { + try { + for (const state of states) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const raw = await (queue as Queue).getJobs([state as any]); + for (const j of raw) { + jobs.push(this.toJobDto(queueName, j, state)); + } + } + } catch (err) { + this.logger.warn(`Failed to list jobs for queue "${queueName}": ${String(err)}`); + } + } + + return jobs; + } + + /** + * Retry a specific failed job by its BullMQ job ID (format: "queueName:id"). + * The caller passes "__" as the composite ID because BullMQ + * job IDs are not globally unique — they are scoped to their queue. + */ + async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> { + const sep = compositeId.lastIndexOf('__'); + if (sep === -1) { + return { ok: false, message: 'Invalid job id format. Expected "__".' }; + } + const queueName = compositeId.slice(0, sep); + const jobId = compositeId.slice(sep + 2); + + const queue = this.queues.get(queueName); + if (!queue) { + return { ok: false, message: `Queue "${queueName}" not found.` }; + } + + const job = await queue.getJob(jobId); + if (!job) { + return { ok: false, message: `Job "${jobId}" not found in queue "${queueName}".` }; + } + + const state = await job.getState(); + if (state !== 'failed') { + return { ok: false, message: `Job "${jobId}" is not in failed state (current: ${state}).` }; + } + + await job.retry('failed'); + await this.logJobEvent(queueName, job.name, jobId, 'retried', (job.attemptsMade ?? 0) + 1); + return { ok: true, message: `Job "${jobId}" on queue "${queueName}" queued for retry.` }; + } + + /** + * Pause a queue by name. + */ + async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> { + const queue = this.queues.get(name); + if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; + await queue.pause(); + this.logger.log(`Queue paused: ${name}`); + return { ok: true, message: `Queue "${name}" paused.` }; + } + + /** + * Resume a paused queue by name. + */ + async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> { + const queue = this.queues.get(name); + if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; + await queue.resume(); + this.logger.log(`Queue resumed: ${name}`); + return { ok: true, message: `Queue "${name}" resumed.` }; + } + + private toJobDto(queueName: string, job: Job, status: JobStatus): JobDto { + return { + id: `${queueName}__${job.id ?? 'unknown'}`, + name: job.name, + queue: queueName, + status, + attempts: job.attemptsMade, + maxAttempts: job.opts?.attempts ?? 1, + createdAt: job.timestamp ? new Date(job.timestamp).toISOString() : undefined, + processedAt: job.processedOn ? new Date(job.processedOn).toISOString() : undefined, + finishedAt: job.finishedOn ? new Date(job.finishedOn).toISOString() : undefined, + failedReason: job.failedReason, + data: (job.data as Record) ?? {}, + }; + } + + // ------------------------------------------------------------------------- + // Job event logging (M6-007) + // ------------------------------------------------------------------------- + + /** Write a log entry to agent_logs for BullMQ job lifecycle events. */ + private async logJobEvent( + queueName: string, + jobName: string, + jobId: string, + event: 'started' | 'completed' | 'retried' | 'failed', + attempts: number, + errorMessage?: string, + ): Promise { + if (!this.logService) return; + + const level = event === 'failed' ? ('error' as const) : ('info' as const); + const content = + event === 'failed' + ? `Job "${jobName}" (${jobId}) on queue "${queueName}" failed: ${errorMessage ?? 'unknown error'}` + : `Job "${jobName}" (${jobId}) on queue "${queueName}" ${event} (attempt ${attempts})`; + + try { + await this.logService.logs.ingest({ + sessionId: SYSTEM_SESSION_ID, + userId: 'system', + level, + category: 'general', + content, + metadata: { + jobId, + jobName, + queue: queueName, + event, + attempts, + ...(errorMessage ? { errorMessage } : {}), + }, + }); + } catch (err) { + // Log errors must never crash job execution + this.logger.warn(`Failed to write job event log for job ${jobId}: ${String(err)}`); + } + } + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + private async closeAll(): Promise { + const workerCloses = Array.from(this.workers.values()).map((w) => + w.close().catch((err) => this.logger.error(`Worker close error: ${err}`)), + ); + const queueCloses = Array.from(this.queues.values()).map((q) => + q.close().catch((err) => this.logger.error(`Queue close error: ${err}`)), + ); + await Promise.all([...workerCloses, ...queueCloses]); + this.workers.clear(); + this.queues.clear(); + this.logger.log('QueueService shut down'); + } +} diff --git a/apps/gateway/src/queue/queue.tokens.ts b/apps/gateway/src/queue/queue.tokens.ts new file mode 100644 index 0000000..de22543 --- /dev/null +++ b/apps/gateway/src/queue/queue.tokens.ts @@ -0,0 +1,2 @@ +export const QUEUE_REDIS = 'QUEUE_REDIS'; +export const QUEUE_SERVICE = 'QUEUE_SERVICE'; diff --git a/docs/architecture/channel-protocol.md b/docs/architecture/channel-protocol.md new file mode 100644 index 0000000..aa91ccb --- /dev/null +++ b/docs/architecture/channel-protocol.md @@ -0,0 +1,333 @@ +# Channel Protocol Architecture + +**Status:** Draft +**Authors:** Mosaic Core Team +**Last Updated:** 2026-03-22 +**Covers:** M7-001 (IChannelAdapter interface) and M7-002 (ChannelMessage protocol) + +--- + +## Overview + +The channel protocol defines a unified abstraction layer between Mosaic's core messaging infrastructure and the external communication channels it supports (Matrix, Discord, Telegram, TUI, WebUI, and future channels). + +The protocol consists of two main contracts: + +1. `IChannelAdapter` — the interface each channel driver must implement. +2. `ChannelMessage` — the canonical message format that flows through the system. + +All channel-specific translation logic lives inside the adapter implementation. The rest of Mosaic works exclusively with `ChannelMessage` objects. + +--- + +## M7-001: IChannelAdapter Interface + +```typescript +interface IChannelAdapter { + /** + * Stable, lowercase identifier for this channel (e.g. "matrix", "discord"). + * Used as a namespace key in registry lookups and log metadata. + */ + readonly name: string; + + /** + * Establish a connection to the external channel backend. + * Called once at application startup. Must be idempotent (safe to call + * when already connected). + */ + connect(): Promise; + + /** + * Gracefully disconnect from the channel backend. + * Must flush in-flight sends and release resources before resolving. + */ + disconnect(): Promise; + + /** + * Return the current health of the adapter connection. + * Used by the admin health endpoint and alerting. + * + * - "connected" — fully operational + * - "degraded" — partial connectivity (e.g. read-only, rate-limited) + * - "disconnected" — no connection to channel backend + */ + health(): Promise<{ status: 'connected' | 'degraded' | 'disconnected' }>; + + /** + * Register an inbound message handler. + * The adapter calls `handler` for every message received from the channel. + * Multiple calls replace the previous handler (last-write-wins). + * The handler is async; the adapter must not deliver new messages until + * the previous handler promise resolves (back-pressure). + */ + onMessage(handler: (msg: ChannelMessage) => Promise): void; + + /** + * Send a ChannelMessage to the given channel/room/conversation. + * `channelId` is the channel-native identifier (e.g. Matrix room ID, + * Discord channel snowflake, Telegram chat ID). + */ + sendMessage(channelId: string, msg: ChannelMessage): Promise; + + /** + * Map a channel-native user identifier to the Mosaic internal userId. + * Returns null when no matching Mosaic account exists for the given + * channelUserId (anonymous or unlinked user). + */ + mapIdentity(channelUserId: string): Promise; +} +``` + +### Adapter Registration + +Adapters are registered with the `ChannelRegistry` service at startup. The registry calls `connect()` on each adapter and monitors `health()` on a configurable interval (default: 30 s). + +``` +ChannelRegistry + └── register(adapter: IChannelAdapter): void + └── getAdapter(name: string): IChannelAdapter | null + └── listAdapters(): IChannelAdapter[] + └── healthAll(): Promise> +``` + +--- + +## M7-002: ChannelMessage Protocol + +### Canonical Message Format + +```typescript +interface ChannelMessage { + /** + * Globally unique message ID. + * Format: UUID v4. Generated by the adapter when receiving, or by Mosaic + * when sending. Channel-native IDs are stored in metadata.channelMessageId. + */ + id: string; + + /** + * Channel-native room/conversation/channel identifier. + * The adapter populates this from the inbound message. + * For outbound messages, the caller supplies the target channel. + */ + channelId: string; + + /** + * Channel-native identifier of the message sender. + * For Mosaic-originated messages this is the Mosaic userId or agentId. + */ + senderId: string; + + /** Sender classification. */ + senderType: 'user' | 'agent' | 'system'; + + /** + * Textual content of the message. + * For non-text content types (image, file) this may be an empty string + * or an alt-text description; the actual payload is in `attachments`. + */ + content: string; + + /** + * Hint for how `content` should be interpreted and rendered. + * - "text" — plain text, no special rendering + * - "markdown" — CommonMark markdown + * - "code" — code block (use metadata.language for the language tag) + * - "image" — binary image; content is empty, see attachments + * - "file" — binary file; content is empty, see attachments + */ + contentType: 'text' | 'markdown' | 'code' | 'image' | 'file'; + + /** + * Arbitrary key-value metadata for channel-specific extension fields. + * Examples: { channelMessageId, language, reactionEmoji, channelType }. + * Adapters should store channel-native IDs here so round-trip correlation + * is possible without altering the canonical fields. + */ + metadata: Record; + + /** + * Optional thread or reply-chain identifier. + * For threaded channels (Matrix, Discord threads, Telegram topics) this + * groups messages into a logical thread scoped to the same channelId. + */ + threadId?: string; + + /** + * The canonical message ID this message is a reply to. + * Maps to channel-native reply/quote mechanisms in each adapter. + */ + replyToId?: string; + + /** + * Binary or URI-referenced attachments. + * Each attachment carries its MIME type and a URL or base64 payload. + */ + attachments?: ChannelAttachment[]; + + /** Wall-clock timestamp when the message was sent/received. */ + timestamp: Date; +} + +interface ChannelAttachment { + /** Filename or identifier. */ + name: string; + + /** MIME type (e.g. "image/png", "application/pdf"). */ + mimeType: string; + + /** + * URL pointing to the attachment, OR a `data:` URI with base64 payload. + * Adapters that receive file uploads SHOULD store to object storage and + * populate a stable URL here rather than embedding the raw bytes. + */ + url: string; + + /** Size in bytes, if known. */ + sizeBytes?: number; +} +``` + +--- + +## Channel Translation Reference + +The following sections document how each supported channel maps its native message format to and from `ChannelMessage`. + +### Matrix + +| ChannelMessage field | Matrix equivalent | +| -------------------- | --------------------------------------------------------------------------------------------------------------------------------- | +| `id` | Generated UUID; `metadata.channelMessageId` = Matrix event ID (`$...`) | +| `channelId` | Matrix room ID (`!roomid:homeserver`) | +| `senderId` | Matrix user ID (`@user:homeserver`) | +| `senderType` | Always `"user"` for inbound; `"agent"` or `"system"` for outbound | +| `content` | `event.content.body` | +| `contentType` | `"markdown"` if `msgtype = m.text` and body contains markdown; `"text"` otherwise; `"image"` for `m.image`; `"file"` for `m.file` | +| `threadId` | `event.content['m.relates_to']['event_id']` when `rel_type = m.thread` | +| `replyToId` | Mosaic ID looked up from `event.content['m.relates_to']['m.in_reply_to']['event_id']` | +| `attachments` | Populated from `url` in `m.image` / `m.file` events | +| `timestamp` | `new Date(event.origin_server_ts)` | +| `metadata` | `{ channelMessageId, roomId, eventType, unsigned }` | + +**Outbound:** Adapter sends `m.room.message` with `msgtype = m.text` (or `m.notice` for system messages). Markdown content is sent with `format = org.matrix.custom.html` and a rendered HTML body. + +--- + +### Discord + +| ChannelMessage field | Discord equivalent | +| -------------------- | ----------------------------------------------------------------------- | +| `id` | Generated UUID; `metadata.channelMessageId` = Discord message snowflake | +| `channelId` | Discord channel ID (snowflake string) | +| `senderId` | Discord user ID (snowflake) | +| `senderType` | `"user"` for human members; `"agent"` for bot messages | +| `content` | `message.content` | +| `contentType` | `"markdown"` (Discord uses a markdown-like syntax natively) | +| `threadId` | `message.thread.id` when the message is inside a thread channel | +| `replyToId` | Mosaic ID looked up from `message.referenced_message.id` | +| `attachments` | `message.attachments` mapped to `ChannelAttachment` | +| `timestamp` | `new Date(message.timestamp)` | +| `metadata` | `{ channelMessageId, guildId, channelType, mentions, embeds }` | + +**Outbound:** Adapter calls Discord REST `POST /channels/{id}/messages`. Markdown content is sent as-is (Discord renders it). For `contentType = "code"` the adapter wraps in triple-backtick fences with the `metadata.language` tag. + +--- + +### Telegram + +| ChannelMessage field | Telegram equivalent | +| -------------------- | ------------------------------------------------------------------------------------------------------------- | +| `id` | Generated UUID; `metadata.channelMessageId` = Telegram `message_id` (integer) | +| `channelId` | Telegram `chat_id` (integer as string) | +| `senderId` | Telegram `from.id` (integer as string) | +| `senderType` | `"user"` for human senders; `"agent"` for bot-originated messages | +| `content` | `message.text` or `message.caption` | +| `contentType` | `"text"` for plain; `"markdown"` if `parse_mode = MarkdownV2`; `"image"` for `photo`; `"file"` for `document` | +| `threadId` | `message.message_thread_id` (for supergroup topics) | +| `replyToId` | Mosaic ID looked up from `message.reply_to_message.message_id` | +| `attachments` | `photo`, `document`, `video` fields mapped to `ChannelAttachment` | +| `timestamp` | `new Date(message.date * 1000)` | +| `metadata` | `{ channelMessageId, chatType, fromUsername, forwardFrom }` | + +**Outbound:** Adapter calls Telegram Bot API `sendMessage` with `parse_mode = MarkdownV2` for markdown content. For `contentType = "image"` or `"file"` it uses `sendPhoto` / `sendDocument`. + +--- + +### TUI (Terminal UI) + +The TUI adapter bridges Mosaic's terminal interface (`packages/cli`) to the channel protocol so that TUI sessions can be treated as a first-class channel. + +| ChannelMessage field | TUI equivalent | +| -------------------- | ------------------------------------------------------------------ | +| `id` | Generated UUID (TUI has no native message IDs) | +| `channelId` | `"tui:"` — the active conversation ID | +| `senderId` | Authenticated Mosaic `userId` | +| `senderType` | `"user"` for human input; `"agent"` for agent replies | +| `content` | Raw text from stdin / agent output | +| `contentType` | `"text"` for input; `"markdown"` for agent responses | +| `threadId` | Not used (TUI sessions are linear) | +| `replyToId` | Not used | +| `attachments` | File paths dragged/pasted into the TUI; resolved to `file://` URLs | +| `timestamp` | `new Date()` at the moment of send | +| `metadata` | `{ conversationId, sessionId, ttyWidth, colorSupport }` | + +**Outbound:** The adapter writes rendered content to stdout. Markdown is rendered via a terminal markdown renderer (e.g. `marked-terminal`). Code blocks are syntax-highlighted when `metadata.colorSupport = true`. + +--- + +### WebUI + +The WebUI adapter connects the Next.js frontend (`apps/web`) to the channel protocol over the existing Socket.IO gateway (`apps/gateway`). + +| ChannelMessage field | WebUI equivalent | +| -------------------- | ------------------------------------------------------------ | +| `id` | Generated UUID; echoed back in the WebSocket event | +| `channelId` | `"webui:"` | +| `senderId` | Authenticated Mosaic `userId` | +| `senderType` | `"user"` for browser input; `"agent"` for agent responses | +| `content` | Message text from the input field | +| `contentType` | `"text"` or `"markdown"` | +| `threadId` | Not used (conversation model handles threading) | +| `replyToId` | Message ID the user replied to (UI reply affordance) | +| `attachments` | Files uploaded via the file picker; stored to object storage | +| `timestamp` | `new Date()` at send, or server timestamp from event | +| `metadata` | `{ conversationId, sessionId, clientTimezone, userAgent }` | + +**Outbound:** Adapter emits a `chat:message` Socket.IO event. The WebUI React component receives it and appends to the conversation list. Markdown content is rendered client-side via the existing markdown renderer component. + +--- + +## Identity Mapping + +`mapIdentity(channelUserId)` resolves a channel-native user identifier to a Mosaic `userId`. This is required to attribute inbound messages to authenticated Mosaic accounts. + +The implementation must query a `channel_identities` table (or equivalent) keyed on `(channel_name, channel_user_id)`. When no mapping exists the method returns `null` and the message is treated as anonymous (no Mosaic session context). + +``` +channel_identities + channel_name TEXT -- e.g. "matrix", "discord" + channel_user_id TEXT -- channel-native user identifier + mosaic_user_id TEXT -- FK to users.id + linked_at TIMESTAMP + PRIMARY KEY (channel_name, channel_user_id) +``` + +Identity linking flows (OAuth dance, deep-link verification token, etc.) are out of scope for this document and will be specified in a separate identity-linking protocol document. + +--- + +## Error Handling Conventions + +- `connect()` must throw a structured error (subclass of `ChannelConnectError`) if the initial connection cannot be established within a reasonable timeout (default: 10 s). +- `sendMessage()` must throw `ChannelSendError` on terminal failures (auth revoked, channel not found). Transient failures (rate limit, network blip) should be retried internally with exponential backoff before throwing. +- `health()` must never throw — it returns `{ status: 'disconnected' }` on error. +- Adapters must emit structured logs with `{ channel: adapter.name, event, ... }` metadata for observability. + +--- + +## Versioning + +The `ChannelMessage` protocol follows semantic versioning. Non-breaking field additions (new optional fields) are minor version bumps. Breaking changes (type changes, required field additions) require a major version bump and a migration guide. + +Current version: **1.0.0** diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b375edb..8233df7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -131,6 +131,9 @@ importers: better-auth: specifier: ^1.5.5 version: 1.5.5(drizzle-kit@0.31.9)(drizzle-orm@0.45.1(@opentelemetry/api@1.9.0)(@types/pg@8.15.6)(kysely@0.28.11)(postgres@3.4.8))(mongodb@7.1.0(socks@2.8.7))(next@16.1.6(@opentelemetry/api@1.9.0)(@playwright/test@1.58.2)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react-dom@19.2.4(react@19.2.4))(react@19.2.4)(vitest@2.1.9(@types/node@22.19.15)(jsdom@29.0.0(@noble/hashes@2.0.1))(lightningcss@1.31.1)) + bullmq: + specifier: ^5.71.0 + version: 5.71.0 class-transformer: specifier: ^0.5.1 version: 0.5.1 @@ -1737,6 +1740,9 @@ packages: cpu: [x64] os: [win32] + '@ioredis/commands@1.5.0': + resolution: {integrity: sha512-eUgLqrMf8nJkZxT24JvVRrQya1vZkQh8BBeYNwGDqa5I0VUi8ACx7uFvAaLxintokpTenkK6DASvo/bvNbBGow==} + '@ioredis/commands@1.5.1': resolution: {integrity: sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==} @@ -1868,6 +1874,36 @@ packages: '@mongodb-js/saslprep@1.4.6': resolution: {integrity: sha512-y+x3H1xBZd38n10NZF/rEBlvDOOMQ6LKUTHqr8R9VkJ+mmQOYtJFxIlkkK8fZrtOiL6VixbOBWMbZGBdal3Z1g==} + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==} + cpu: [arm64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==} + cpu: [x64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==} + cpu: [arm64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==} + cpu: [arm] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==} + cpu: [x64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==} + cpu: [x64] + os: [win32] + '@nestjs/common@11.1.16': resolution: {integrity: sha512-JSIeW+USuMJkkcNbiOdcPkVCeI3TSnXstIVEPpp3HiaKnPRuSbUUKm9TY9o/XpIcPHWUOQItAtC5BiAwFdVITQ==} peerDependencies: @@ -3387,6 +3423,9 @@ packages: buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} + bullmq@5.71.0: + resolution: {integrity: sha512-aeNWh4drsafSKnAJeiNH/nZP/5O8ZdtdMbnOPZmpjXj7NZUP5YC901U3bIH41iZValm7d1i3c34ojv7q31m30w==} + bytes@3.1.2: resolution: {integrity: sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg==} engines: {node: '>= 0.8'} @@ -3553,6 +3592,10 @@ packages: resolution: {integrity: sha512-tJtZBBHA6vjIAaF6EnIaq6laBBP9aq/Y3ouVJjEfoHbRBcHBAHYcMh/w8LDrk2PvIMMq8gmopa5D4V8RmbrxGw==} engines: {node: '>= 0.10'} + cron-parser@4.9.0: + resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} + engines: {node: '>=12.0.0'} + cross-spawn@7.0.6: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} engines: {node: '>= 8'} @@ -4274,6 +4317,10 @@ packages: resolution: {integrity: sha512-HVBe9OFuqs+Z6n64q09PQvP1/R4Bm+30PAyyD4wIEqssh3v9L21QjCVk4kRLucMBcDokJTcLjsGeVRlq/nH6DA==} engines: {node: '>=12.22.0'} + ioredis@5.9.3: + resolution: {integrity: sha512-VI5tMCdeoxZWU5vjHWsiE/Su76JGhBvWF1MJnV9ZtGltHk9BmD48oDq8Tj8haZ85aceXZMxLNDQZRVo5QKNgXA==} + engines: {node: '>=12.22.0'} + ip-address@10.1.0: resolution: {integrity: sha512-XXADHxXmvT9+CRxhXg56LJovE+bmWnEWB78LB83VZTprKTmaC5QfruXocxzTZ2Kl0DNwKuBdlIhjL8LeY8Sf8Q==} engines: {node: '>= 12'} @@ -4566,6 +4613,10 @@ packages: resolution: {integrity: sha512-jumlc0BIUrS3qJGgIkWZsyfAM7NCWiBcCDhnd+3NNM5KbBmLTgHVfWBcg6W+rLUsIpzpERPsvwUP7CckAQSOoA==} engines: {node: '>=12'} + luxon@3.7.2: + resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==} + engines: {node: '>=12'} + magic-bytes.js@1.13.0: resolution: {integrity: sha512-afO2mnxW7GDTXMm5/AoN1WuOcdoKhtgXjIvHmobqTD1grNplhGdv3PFOyjCVmrnOZBIT/gD/koDKpYG+0mvHcg==} @@ -4773,6 +4824,13 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + msgpackr-extract@3.0.3: + resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==} + hasBin: true + + msgpackr@1.11.5: + resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} + mz@2.7.0: resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==} @@ -4821,6 +4879,9 @@ packages: sass: optional: true + node-abort-controller@3.1.1: + resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} + node-cron@4.2.1: resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==} engines: {node: '>=6.0.0'} @@ -4843,6 +4904,10 @@ packages: resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + node-gyp-build-optional-packages@5.2.2: + resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} + hasBin: true + npm-run-path@5.3.0: resolution: {integrity: sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -7039,6 +7104,8 @@ snapshots: '@img/sharp-win32-x64@0.34.5': optional: true + '@ioredis/commands@1.5.0': {} + '@ioredis/commands@1.5.1': {} '@isaacs/cliui@8.0.2': @@ -7235,6 +7302,24 @@ snapshots: dependencies: sparse-bitfield: 3.0.3 + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + optional: true + '@nestjs/common@11.1.16(class-transformer@0.5.1)(class-validator@0.15.1)(reflect-metadata@0.2.2)(rxjs@7.8.2)': dependencies: file-type: 21.3.0 @@ -8977,6 +9062,18 @@ snapshots: buffer-from@1.1.2: {} + bullmq@5.71.0: + dependencies: + cron-parser: 4.9.0 + ioredis: 5.9.3 + msgpackr: 1.11.5 + node-abort-controller: 3.1.1 + semver: 7.7.4 + tslib: 2.8.1 + uuid: 11.1.0 + transitivePeerDependencies: + - supports-color + bytes@3.1.2: {} cac@6.7.14: {} @@ -9115,6 +9212,10 @@ snapshots: object-assign: 4.1.1 vary: 1.1.2 + cron-parser@4.9.0: + dependencies: + luxon: 3.7.2 + cross-spawn@7.0.6: dependencies: path-key: 3.1.1 @@ -9997,6 +10098,20 @@ snapshots: transitivePeerDependencies: - supports-color + ioredis@5.9.3: + dependencies: + '@ioredis/commands': 1.5.0 + cluster-key-slot: 1.1.2 + debug: 4.4.3 + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + ip-address@10.1.0: {} ipaddr.js@1.9.1: {} @@ -10261,6 +10376,8 @@ snapshots: lru-cache@7.18.3: {} + luxon@3.7.2: {} + magic-bytes.js@1.13.0: {} magic-string@0.30.21: @@ -10559,6 +10676,22 @@ snapshots: ms@2.1.3: {} + msgpackr-extract@3.0.3: + dependencies: + node-gyp-build-optional-packages: 5.2.2 + optionalDependencies: + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3 + optional: true + + msgpackr@1.11.5: + optionalDependencies: + msgpackr-extract: 3.0.3 + mz@2.7.0: dependencies: any-promise: 1.3.0 @@ -10603,6 +10736,8 @@ snapshots: - '@babel/core' - babel-plugin-macros + node-abort-controller@3.1.1: {} + node-cron@4.2.1: {} node-domexception@1.0.0: {} @@ -10617,6 +10752,11 @@ snapshots: fetch-blob: 3.2.0 formdata-polyfill: 4.0.10 + node-gyp-build-optional-packages@5.2.2: + dependencies: + detect-libc: 2.1.2 + optional: true + npm-run-path@5.3.0: dependencies: path-key: 4.0.0