From fabc41340790584e9d982f120d1e88e85145e76c Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Mon, 22 Jun 2026 16:48:17 +0000 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20F4=20Phase=202a=20=E2=80=94=20Ma?= =?UTF-8?q?trix=20CS-API=20connector=20client=20+=20factory=20(#618)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- docs/TASKS.md | 2 +- docs/scratchpads/f4-matrix-connector.md | 11 + .../src/fleet/connectors/matrix.spec.ts | 184 +++++++++++++ .../mosaic/src/fleet/connectors/matrix.ts | 246 ++++++++++++++++++ 4 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 packages/mosaic/src/fleet/connectors/matrix.spec.ts create mode 100644 packages/mosaic/src/fleet/connectors/matrix.ts diff --git a/docs/TASKS.md b/docs/TASKS.md index 83144b5..bc4abf6 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -69,4 +69,4 @@ Active workstream is **W1 — Federation v1**. Workers should: ## F4 — Orchestrator chat connector + Matrix (#616) — feat/f4-matrix-connector -- Status: Phase 1 done (abstraction + scaffold). Connector interface (send/subscribe/health) + registry + roster connector schema + design doc; tmux default/back-compat; matrix/discord factories are Phase 2. 7 tests green; no fleet.ts changes. Detail: scratchpads/f4-matrix-connector.md. +- Status: Phase 1 MERGED (#617: connector interface send/subscribe/health + registry + roster schema + design). Phase 2a (#618): Matrix CS-API client + factory. 20 connector tests green; no fleet.ts changes. Remaining Phase 2: init/configure connector-selection UX + roster wiring, systemd launch wiring, Conduit deploy guide. Detail: scratchpads/f4-matrix-connector.md. diff --git a/docs/scratchpads/f4-matrix-connector.md b/docs/scratchpads/f4-matrix-connector.md index 290875e..cf5b73f 100644 --- a/docs/scratchpads/f4-matrix-connector.md +++ b/docs/scratchpads/f4-matrix-connector.md @@ -17,3 +17,14 @@ ## Phase 2+ (follow-ups, in the doc) Matrix CS-API client (fetch send/sync/health) + factory; init/configure connector-selection UX + roster-parse wiring; systemd launch wiring; Conduit deploy guide; first-party Mosaic Discord (threads) as a connector. + +## Phase 2a (feat/f4-matrix-client, stacked on #617) — Matrix CS-API client + +- `src/fleet/connectors/matrix.ts`: `MatrixConnector implements OrchestratorConnector` over the Matrix + client-server API (injectable fetch, no SDK). `send` → PUT m.room.message (thread-aware); `subscribe` + → /sync long-poll loop using the pure `parseSyncResponse`; `health` → /versions + /whoami. + `registerMatrixConnector(env)` registers the factory (token from MATRIX_ACCESS_TOKEN, never roster). +- Pure helpers `buildMessageBody` + `parseSyncResponse` make send/receive unit-testable. +- 13 Matrix tests + 7 registry = 20 connector tests green; tsc/eslint/prettier clean. +- Remaining Phase 2: init/configure connector-selection UX + roster-parse wiring (touches fleet.ts — + after #615); systemd launch wiring; Conduit deploy guide. diff --git a/packages/mosaic/src/fleet/connectors/matrix.spec.ts b/packages/mosaic/src/fleet/connectors/matrix.spec.ts new file mode 100644 index 0000000..d4edd1b --- /dev/null +++ b/packages/mosaic/src/fleet/connectors/matrix.spec.ts @@ -0,0 +1,184 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { + MatrixConnector, + buildMessageBody, + parseSyncResponse, + registerMatrixConnector, + type FetchLike, +} from './matrix.js'; +import { createConnector, _resetConnectorRegistry } from './registry.js'; +import type { MatrixConnectorConfig } from './types.js'; + +const CONFIG: MatrixConnectorConfig = { + homeserverUrl: 'https://matrix.internal/', + userId: '@mos:internal', + roomId: '!room:internal', +}; + +/** A fetch mock that returns queued responses and records calls. */ +function mockFetch(responses: Array<{ ok?: boolean; status?: number; body?: unknown }>): { + fetchImpl: FetchLike; + calls: Array<{ url: string; method?: string; body?: string }>; +} { + const calls: Array<{ url: string; method?: string; body?: string }> = []; + let i = 0; + const fetchImpl: FetchLike = async (url, init) => { + calls.push({ url, method: init?.method, body: init?.body }); + const r = responses[Math.min(i, responses.length - 1)] ?? {}; + i += 1; + return { + ok: r.ok ?? true, + status: r.status ?? 200, + json: async () => r.body ?? {}, + text: async () => JSON.stringify(r.body ?? {}), + }; + }; + return { fetchImpl, calls }; +} + +describe('buildMessageBody', () => { + it('builds an m.text event', () => { + expect(buildMessageBody({ text: 'hi' })).toEqual({ msgtype: 'm.text', body: 'hi' }); + }); + it('adds an m.thread relation when threadId is set', () => { + expect(buildMessageBody({ text: 'hi', threadId: '$evt' })).toEqual({ + msgtype: 'm.text', + body: 'hi', + 'm.relates_to': { rel_type: 'm.thread', event_id: '$evt' }, + }); + }); +}); + +describe('parseSyncResponse', () => { + it('extracts operator messages and skips the orchestrator’s own echoes', () => { + const data = { + next_batch: 's2', + rooms: { + join: { + '!room:internal': { + timeline: { + events: [ + { + type: 'm.room.message', + sender: '@jason:internal', + origin_server_ts: 1_700_000_000_000, + content: { body: 'status?' }, + }, + { + type: 'm.room.message', + sender: '@mos:internal', // self — skipped + origin_server_ts: 1_700_000_001_000, + content: { body: 'working on it' }, + }, + { type: 'm.reaction', sender: '@jason:internal', content: {} }, // non-message + ], + }, + }, + }, + }, + }; + const msgs = parseSyncResponse(data, '!room:internal', '@mos:internal'); + expect(msgs).toHaveLength(1); + expect(msgs[0]).toMatchObject({ text: 'status?', sender: '@jason:internal' }); + expect(msgs[0]!.ts).toBe(new Date(1_700_000_000_000).toISOString()); + }); + + it('carries threadId through thread-relments', () => { + const data = { + rooms: { + join: { + '!room:internal': { + timeline: { + events: [ + { + type: 'm.room.message', + sender: '@jason:internal', + origin_server_ts: 1, + content: { + body: 'in thread', + 'm.relates_to': { rel_type: 'm.thread', event_id: '$root' }, + }, + }, + ], + }, + }, + }, + }, + }; + expect(parseSyncResponse(data, '!room:internal', '@mos:internal')[0]!.threadId).toBe('$root'); + }); + + it('returns [] for an empty/foreign sync', () => { + expect(parseSyncResponse({}, '!room:internal', '@mos:internal')).toEqual([]); + }); +}); + +describe('MatrixConnector', () => { + it('throws without an access token', () => { + expect(() => new MatrixConnector(CONFIG, { accessToken: '' })).toThrow(/access token/i); + }); + + it('send PUTs an m.text event and returns the event id', async () => { + const { fetchImpl, calls } = mockFetch([{ body: { event_id: '$abc' } }]); + const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl }); + const res = await c.send({ text: 'pong' }, 1234); + expect(res).toEqual({ delivered: true, messageId: '$abc' }); + expect(calls[0]!.method).toBe('PUT'); + expect(calls[0]!.url).toContain( + '/_matrix/client/v3/rooms/!room%3Ainternal/send/m.room.message/mosaic-1234-1', + ); + expect(JSON.parse(calls[0]!.body!)).toEqual({ msgtype: 'm.text', body: 'pong' }); + }); + + it('send reports not-delivered on a non-2xx', async () => { + const { fetchImpl } = mockFetch([{ ok: false, status: 403 }]); + const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl }); + const res = await c.send({ text: 'x' }); + expect(res.delivered).toBe(false); + expect(res.error).toContain('403'); + }); + + it('health reports reachable + authenticated when whoami matches', async () => { + const { fetchImpl } = mockFetch([ + { body: { versions: ['v1.11'] } }, // /versions + { body: { user_id: '@mos:internal' } }, // /whoami + ]); + const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl }); + const h = await c.health(); + expect(h.reachable).toBe(true); + expect(h.authenticated).toBe(true); + }); + + it('health flags auth mismatch', async () => { + const { fetchImpl } = mockFetch([ + { body: {} }, + { body: { user_id: '@someone-else:internal' } }, + ]); + const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl }); + const h = await c.health(); + expect(h.reachable).toBe(true); + expect(h.authenticated).toBe(false); + }); + + it('health reports unreachable when /versions fails', async () => { + const { fetchImpl } = mockFetch([{ ok: false, status: 502 }]); + const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl }); + const h = await c.health(); + expect(h.reachable).toBe(false); + }); +}); + +describe('registerMatrixConnector', () => { + beforeEach(() => _resetConnectorRegistry()); + + it('registers a matrix factory createConnector can build', () => { + registerMatrixConnector({ MATRIX_ACCESS_TOKEN: 'tok' } as NodeJS.ProcessEnv); + const c = createConnector({ kind: 'matrix', matrix: CONFIG }); + expect(c.kind).toBe('matrix'); + }); + + it('the factory rejects config missing the matrix block', () => { + registerMatrixConnector({ MATRIX_ACCESS_TOKEN: 'tok' } as NodeJS.ProcessEnv); + expect(() => createConnector({ kind: 'matrix' })).toThrow(/missing the .matrix. block/i); + }); +}); diff --git a/packages/mosaic/src/fleet/connectors/matrix.ts b/packages/mosaic/src/fleet/connectors/matrix.ts new file mode 100644 index 0000000..00a08a0 --- /dev/null +++ b/packages/mosaic/src/fleet/connectors/matrix.ts @@ -0,0 +1,246 @@ +/** + * Matrix connector (F4 Phase 2) — speaks the Matrix client-server API directly + * over HTTPS so it is homeserver-agnostic (Conduit default, Synapse alt). No + * SDK: a small injectable fetch keeps it dependency-light and unit-testable. + * + * The access token is supplied by the caller (from the environment — + * MATRIX_ACCESS_TOKEN — per the gateway secret pattern), never the roster. + */ + +import { + type OrchestratorConnector, + type OutboundMessage, + type InboundMessage, + type SendResult, + type ConnectorHealth, + type MatrixConnectorConfig, + type Unsubscribe, +} from './types.js'; +import { registerConnector } from './registry.js'; + +/** Minimal fetch surface — avoids a lib.dom dependency and is trivial to mock. */ +export interface FetchLike { + ( + url: string, + init?: { method?: string; headers?: Record; body?: string }, + ): Promise<{ + ok: boolean; + status: number; + json: () => Promise; + text: () => Promise; + }>; +} + +export interface MatrixConnectorOptions { + accessToken: string; + /** Injectable fetch (defaults to global fetch). */ + fetchImpl?: FetchLike; + /** Long-poll timeout for /sync, ms. */ + syncTimeoutMs?: number; +} + +/** Build the `m.room.message` event content, threading when a threadId is set. */ +export function buildMessageBody(message: OutboundMessage): Record { + const content: Record = { + msgtype: 'm.text', + body: message.text, + }; + if (message.threadId) { + content['m.relates_to'] = { rel_type: 'm.thread', event_id: message.threadId }; + } + return content; +} + +/** Shape of the bits of a /sync response we consume. */ +interface SyncResponse { + next_batch?: string; + rooms?: { + join?: Record< + string, + { + timeline?: { + events?: Array<{ + type?: string; + sender?: string; + origin_server_ts?: number; + content?: { + body?: string; + ['m.relates_to']?: { rel_type?: string; event_id?: string }; + }; + }>; + }; + } + >; + }; +} + +/** + * Extract inbound operator messages from a /sync response for one room, + * skipping the orchestrator's own echoes. Pure — the testable core of receive. + */ +export function parseSyncResponse( + data: unknown, + roomId: string, + selfUserId: string, +): InboundMessage[] { + const sync = data as SyncResponse; + const events = sync.rooms?.join?.[roomId]?.timeline?.events ?? []; + const out: InboundMessage[] = []; + for (const ev of events) { + if (ev.type !== 'm.room.message') continue; + if (!ev.sender || ev.sender === selfUserId) continue; // skip our own messages + const body = ev.content?.body; + if (typeof body !== 'string') continue; + const rel = ev.content?.['m.relates_to']; + out.push({ + text: body, + sender: ev.sender, + ts: new Date(ev.origin_server_ts ?? 0).toISOString(), + ...(rel?.rel_type === 'm.thread' && rel.event_id ? { threadId: rel.event_id } : {}), + }); + } + return out; +} + +export class MatrixConnector implements OrchestratorConnector { + readonly kind = 'matrix' as const; + private readonly fetchImpl: FetchLike; + private readonly token: string; + private readonly syncTimeoutMs: number; + private txnCounter = 0; + private stopped = false; + + constructor( + private readonly config: MatrixConnectorConfig, + opts: MatrixConnectorOptions, + ) { + this.token = opts.accessToken; + this.fetchImpl = opts.fetchImpl ?? (globalThis.fetch as unknown as FetchLike); + this.syncTimeoutMs = opts.syncTimeoutMs ?? 30_000; + if (!this.token) { + throw new Error('MatrixConnector requires an access token (set MATRIX_ACCESS_TOKEN).'); + } + } + + private url(path: string): string { + return `${this.config.homeserverUrl.replace(/\/$/, '')}${path}`; + } + + private authHeaders(): Record { + return { Authorization: `Bearer ${this.token}`, 'Content-Type': 'application/json' }; + } + + /** Monotonic, unique-per-instance transaction id for idempotent sends. */ + private nextTxnId(nowMs: number): string { + this.txnCounter += 1; + return `mosaic-${nowMs}-${this.txnCounter}`; + } + + async send(message: OutboundMessage, nowMs = Date.now()): Promise { + const txnId = this.nextTxnId(nowMs); + const path = `/_matrix/client/v3/rooms/${encodeURIComponent( + this.config.roomId, + )}/send/m.room.message/${encodeURIComponent(txnId)}`; + try { + const res = await this.fetchImpl(this.url(path), { + method: 'PUT', + headers: this.authHeaders(), + body: JSON.stringify(buildMessageBody(message)), + }); + if (!res.ok) { + return { delivered: false, error: `Matrix send failed: HTTP ${res.status}` }; + } + const json = (await res.json()) as { event_id?: string }; + return { delivered: true, ...(json.event_id ? { messageId: json.event_id } : {}) }; + } catch (err) { + return { delivered: false, error: err instanceof Error ? err.message : String(err) }; + } + } + + subscribe(handler: (message: InboundMessage) => void): Unsubscribe { + this.stopped = false; + let since: string | undefined; + const loop = async (): Promise => { + while (!this.stopped) { + try { + const q = new URLSearchParams({ timeout: String(this.syncTimeoutMs) }); + if (since) q.set('since', since); + const res = await this.fetchImpl(this.url(`/_matrix/client/v3/sync?${q.toString()}`), { + method: 'GET', + headers: this.authHeaders(), + }); + if (!res.ok) { + await this.backoff(); + continue; + } + const data = await res.json(); + since = (data as SyncResponse).next_batch ?? since; + for (const msg of parseSyncResponse(data, this.config.roomId, this.config.userId)) { + handler(msg); + } + } catch { + await this.backoff(); + } + } + }; + void loop(); + return () => { + this.stopped = true; + }; + } + + private backoff(): Promise { + return new Promise((resolve) => setTimeout(resolve, 2_000)); + } + + async health(): Promise { + try { + const versions = await this.fetchImpl(this.url('/_matrix/client/versions'), { + method: 'GET', + }); + if (!versions.ok) { + return { + reachable: false, + authenticated: false, + detail: `versions HTTP ${versions.status}`, + }; + } + const who = await this.fetchImpl(this.url('/_matrix/client/v3/account/whoami'), { + method: 'GET', + headers: this.authHeaders(), + }); + if (!who.ok) { + return { reachable: true, authenticated: false, detail: `whoami HTTP ${who.status}` }; + } + const json = (await who.json()) as { user_id?: string }; + const authenticated = json.user_id === this.config.userId; + return { + reachable: true, + authenticated, + lastSeen: new Date().toISOString(), + ...(authenticated + ? {} + : { detail: `whoami user ${json.user_id} != ${this.config.userId}` }), + }; + } catch (err) { + return { + reachable: false, + authenticated: false, + detail: err instanceof Error ? err.message : String(err), + }; + } + } +} + +/** + * Register the Matrix connector factory. The token is read from the environment + * (MATRIX_ACCESS_TOKEN) at build time, never the roster. + */ +export function registerMatrixConnector(env: NodeJS.ProcessEnv = process.env): void { + registerConnector('matrix', (config) => { + if (!config.matrix) { + throw new Error('Matrix connector config missing the `matrix` block (homeserver/user/room).'); + } + return new MatrixConnector(config.matrix, { accessToken: env['MATRIX_ACCESS_TOKEN'] ?? '' }); + }); +}