/** * Matrix connector (F4 Phase 2) — speaks the Matrix client-server API directly * over HTTPS so it is homeserver-agnostic (Conduit default, Synapse alt). No * SDK: a small injectable fetch keeps it dependency-light and unit-testable. * * The access token is supplied by the caller (from the environment — * MATRIX_ACCESS_TOKEN — per the gateway secret pattern), never the roster. */ import { type OrchestratorConnector, type OutboundMessage, type InboundMessage, type SendResult, type ConnectorHealth, type MatrixConnectorConfig, type Unsubscribe, } from './types.js'; import { registerConnector } from './registry.js'; /** Minimal fetch surface — avoids a lib.dom dependency and is trivial to mock. */ export interface FetchLike { ( url: string, init?: { method?: string; headers?: Record; body?: string }, ): Promise<{ ok: boolean; status: number; json: () => Promise; text: () => Promise; }>; } export interface MatrixConnectorOptions { accessToken: string; /** Injectable fetch (defaults to global fetch). */ fetchImpl?: FetchLike; /** Long-poll timeout for /sync, ms. */ syncTimeoutMs?: number; } /** Build the `m.room.message` event content, threading when a threadId is set. */ export function buildMessageBody(message: OutboundMessage): Record { const content: Record = { msgtype: 'm.text', body: message.text, }; if (message.threadId) { content['m.relates_to'] = { rel_type: 'm.thread', event_id: message.threadId }; } return content; } /** Shape of the bits of a /sync response we consume. */ interface SyncResponse { next_batch?: string; rooms?: { join?: Record< string, { timeline?: { events?: Array<{ type?: string; sender?: string; origin_server_ts?: number; content?: { body?: string; ['m.relates_to']?: { rel_type?: string; event_id?: string }; }; }>; }; } >; }; } /** * Extract inbound operator messages from a /sync response for one room, * skipping the orchestrator's own echoes. Pure — the testable core of receive. */ export function parseSyncResponse( data: unknown, roomId: string, selfUserId: string, ): InboundMessage[] { const sync = data as SyncResponse; const events = sync.rooms?.join?.[roomId]?.timeline?.events ?? []; const out: InboundMessage[] = []; for (const ev of events) { if (ev.type !== 'm.room.message') continue; if (!ev.sender || ev.sender === selfUserId) continue; // skip our own messages const body = ev.content?.body; if (typeof body !== 'string') continue; const rel = ev.content?.['m.relates_to']; out.push({ text: body, sender: ev.sender, ts: new Date(ev.origin_server_ts ?? 0).toISOString(), ...(rel?.rel_type === 'm.thread' && rel.event_id ? { threadId: rel.event_id } : {}), }); } return out; } export class MatrixConnector implements OrchestratorConnector { readonly kind = 'matrix' as const; private readonly fetchImpl: FetchLike; private readonly token: string; private readonly syncTimeoutMs: number; private txnCounter = 0; private stopped = false; constructor( private readonly config: MatrixConnectorConfig, opts: MatrixConnectorOptions, ) { this.token = opts.accessToken; this.fetchImpl = opts.fetchImpl ?? (globalThis.fetch as unknown as FetchLike); this.syncTimeoutMs = opts.syncTimeoutMs ?? 30_000; if (!this.token) { throw new Error('MatrixConnector requires an access token (set MATRIX_ACCESS_TOKEN).'); } } private url(path: string): string { return `${this.config.homeserverUrl.replace(/\/$/, '')}${path}`; } private authHeaders(): Record { return { Authorization: `Bearer ${this.token}`, 'Content-Type': 'application/json' }; } /** Monotonic, unique-per-instance transaction id for idempotent sends. */ private nextTxnId(nowMs: number): string { this.txnCounter += 1; return `mosaic-${nowMs}-${this.txnCounter}`; } async send(message: OutboundMessage, nowMs = Date.now()): Promise { const txnId = this.nextTxnId(nowMs); const path = `/_matrix/client/v3/rooms/${encodeURIComponent( this.config.roomId, )}/send/m.room.message/${encodeURIComponent(txnId)}`; try { const res = await this.fetchImpl(this.url(path), { method: 'PUT', headers: this.authHeaders(), body: JSON.stringify(buildMessageBody(message)), }); if (!res.ok) { return { delivered: false, error: `Matrix send failed: HTTP ${res.status}` }; } const json = (await res.json()) as { event_id?: string }; return { delivered: true, ...(json.event_id ? { messageId: json.event_id } : {}) }; } catch (err) { return { delivered: false, error: err instanceof Error ? err.message : String(err) }; } } subscribe(handler: (message: InboundMessage) => void): Unsubscribe { this.stopped = false; let since: string | undefined; const loop = async (): Promise => { while (!this.stopped) { try { const q = new URLSearchParams({ timeout: String(this.syncTimeoutMs) }); if (since) q.set('since', since); const res = await this.fetchImpl(this.url(`/_matrix/client/v3/sync?${q.toString()}`), { method: 'GET', headers: this.authHeaders(), }); if (!res.ok) { await this.backoff(); continue; } const data = await res.json(); since = (data as SyncResponse).next_batch ?? since; for (const msg of parseSyncResponse(data, this.config.roomId, this.config.userId)) { handler(msg); } } catch { await this.backoff(); } } }; void loop(); return () => { this.stopped = true; }; } private backoff(): Promise { return new Promise((resolve) => setTimeout(resolve, 2_000)); } async health(): Promise { try { const versions = await this.fetchImpl(this.url('/_matrix/client/versions'), { method: 'GET', }); if (!versions.ok) { return { reachable: false, authenticated: false, detail: `versions HTTP ${versions.status}`, }; } const who = await this.fetchImpl(this.url('/_matrix/client/v3/account/whoami'), { method: 'GET', headers: this.authHeaders(), }); if (!who.ok) { return { reachable: true, authenticated: false, detail: `whoami HTTP ${who.status}` }; } const json = (await who.json()) as { user_id?: string }; const authenticated = json.user_id === this.config.userId; return { reachable: true, authenticated, lastSeen: new Date().toISOString(), ...(authenticated ? {} : { detail: `whoami user ${json.user_id} != ${this.config.userId}` }), }; } catch (err) { return { reachable: false, authenticated: false, detail: err instanceof Error ? err.message : String(err), }; } } } /** * Register the Matrix connector factory. The token is read from the environment * (MATRIX_ACCESS_TOKEN) at build time, never the roster. */ export function registerMatrixConnector(env: NodeJS.ProcessEnv = process.env): void { registerConnector('matrix', (config) => { if (!config.matrix) { throw new Error('Matrix connector config missing the `matrix` block (homeserver/user/room).'); } return new MatrixConnector(config.matrix, { accessToken: env['MATRIX_ACCESS_TOKEN'] ?? '' }); }); }