feat(fleet): F4 Phase 2a — Matrix CS-API connector client + factory (#618)
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #618.
This commit is contained in:
184
packages/mosaic/src/fleet/connectors/matrix.spec.ts
Normal file
184
packages/mosaic/src/fleet/connectors/matrix.spec.ts
Normal file
@@ -0,0 +1,184 @@
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import {
|
||||
MatrixConnector,
|
||||
buildMessageBody,
|
||||
parseSyncResponse,
|
||||
registerMatrixConnector,
|
||||
type FetchLike,
|
||||
} from './matrix.js';
|
||||
import { createConnector, _resetConnectorRegistry } from './registry.js';
|
||||
import type { MatrixConnectorConfig } from './types.js';
|
||||
|
||||
const CONFIG: MatrixConnectorConfig = {
|
||||
homeserverUrl: 'https://matrix.internal/',
|
||||
userId: '@mos:internal',
|
||||
roomId: '!room:internal',
|
||||
};
|
||||
|
||||
/** A fetch mock that returns queued responses and records calls. */
|
||||
function mockFetch(responses: Array<{ ok?: boolean; status?: number; body?: unknown }>): {
|
||||
fetchImpl: FetchLike;
|
||||
calls: Array<{ url: string; method?: string; body?: string }>;
|
||||
} {
|
||||
const calls: Array<{ url: string; method?: string; body?: string }> = [];
|
||||
let i = 0;
|
||||
const fetchImpl: FetchLike = async (url, init) => {
|
||||
calls.push({ url, method: init?.method, body: init?.body });
|
||||
const r = responses[Math.min(i, responses.length - 1)] ?? {};
|
||||
i += 1;
|
||||
return {
|
||||
ok: r.ok ?? true,
|
||||
status: r.status ?? 200,
|
||||
json: async () => r.body ?? {},
|
||||
text: async () => JSON.stringify(r.body ?? {}),
|
||||
};
|
||||
};
|
||||
return { fetchImpl, calls };
|
||||
}
|
||||
|
||||
describe('buildMessageBody', () => {
|
||||
it('builds an m.text event', () => {
|
||||
expect(buildMessageBody({ text: 'hi' })).toEqual({ msgtype: 'm.text', body: 'hi' });
|
||||
});
|
||||
it('adds an m.thread relation when threadId is set', () => {
|
||||
expect(buildMessageBody({ text: 'hi', threadId: '$evt' })).toEqual({
|
||||
msgtype: 'm.text',
|
||||
body: 'hi',
|
||||
'm.relates_to': { rel_type: 'm.thread', event_id: '$evt' },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseSyncResponse', () => {
|
||||
it('extracts operator messages and skips the orchestrator’s own echoes', () => {
|
||||
const data = {
|
||||
next_batch: 's2',
|
||||
rooms: {
|
||||
join: {
|
||||
'!room:internal': {
|
||||
timeline: {
|
||||
events: [
|
||||
{
|
||||
type: 'm.room.message',
|
||||
sender: '@jason:internal',
|
||||
origin_server_ts: 1_700_000_000_000,
|
||||
content: { body: 'status?' },
|
||||
},
|
||||
{
|
||||
type: 'm.room.message',
|
||||
sender: '@mos:internal', // self — skipped
|
||||
origin_server_ts: 1_700_000_001_000,
|
||||
content: { body: 'working on it' },
|
||||
},
|
||||
{ type: 'm.reaction', sender: '@jason:internal', content: {} }, // non-message
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const msgs = parseSyncResponse(data, '!room:internal', '@mos:internal');
|
||||
expect(msgs).toHaveLength(1);
|
||||
expect(msgs[0]).toMatchObject({ text: 'status?', sender: '@jason:internal' });
|
||||
expect(msgs[0]!.ts).toBe(new Date(1_700_000_000_000).toISOString());
|
||||
});
|
||||
|
||||
it('carries threadId through thread-relments', () => {
|
||||
const data = {
|
||||
rooms: {
|
||||
join: {
|
||||
'!room:internal': {
|
||||
timeline: {
|
||||
events: [
|
||||
{
|
||||
type: 'm.room.message',
|
||||
sender: '@jason:internal',
|
||||
origin_server_ts: 1,
|
||||
content: {
|
||||
body: 'in thread',
|
||||
'm.relates_to': { rel_type: 'm.thread', event_id: '$root' },
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
expect(parseSyncResponse(data, '!room:internal', '@mos:internal')[0]!.threadId).toBe('$root');
|
||||
});
|
||||
|
||||
it('returns [] for an empty/foreign sync', () => {
|
||||
expect(parseSyncResponse({}, '!room:internal', '@mos:internal')).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('MatrixConnector', () => {
|
||||
it('throws without an access token', () => {
|
||||
expect(() => new MatrixConnector(CONFIG, { accessToken: '' })).toThrow(/access token/i);
|
||||
});
|
||||
|
||||
it('send PUTs an m.text event and returns the event id', async () => {
|
||||
const { fetchImpl, calls } = mockFetch([{ body: { event_id: '$abc' } }]);
|
||||
const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl });
|
||||
const res = await c.send({ text: 'pong' }, 1234);
|
||||
expect(res).toEqual({ delivered: true, messageId: '$abc' });
|
||||
expect(calls[0]!.method).toBe('PUT');
|
||||
expect(calls[0]!.url).toContain(
|
||||
'/_matrix/client/v3/rooms/!room%3Ainternal/send/m.room.message/mosaic-1234-1',
|
||||
);
|
||||
expect(JSON.parse(calls[0]!.body!)).toEqual({ msgtype: 'm.text', body: 'pong' });
|
||||
});
|
||||
|
||||
it('send reports not-delivered on a non-2xx', async () => {
|
||||
const { fetchImpl } = mockFetch([{ ok: false, status: 403 }]);
|
||||
const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl });
|
||||
const res = await c.send({ text: 'x' });
|
||||
expect(res.delivered).toBe(false);
|
||||
expect(res.error).toContain('403');
|
||||
});
|
||||
|
||||
it('health reports reachable + authenticated when whoami matches', async () => {
|
||||
const { fetchImpl } = mockFetch([
|
||||
{ body: { versions: ['v1.11'] } }, // /versions
|
||||
{ body: { user_id: '@mos:internal' } }, // /whoami
|
||||
]);
|
||||
const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl });
|
||||
const h = await c.health();
|
||||
expect(h.reachable).toBe(true);
|
||||
expect(h.authenticated).toBe(true);
|
||||
});
|
||||
|
||||
it('health flags auth mismatch', async () => {
|
||||
const { fetchImpl } = mockFetch([
|
||||
{ body: {} },
|
||||
{ body: { user_id: '@someone-else:internal' } },
|
||||
]);
|
||||
const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl });
|
||||
const h = await c.health();
|
||||
expect(h.reachable).toBe(true);
|
||||
expect(h.authenticated).toBe(false);
|
||||
});
|
||||
|
||||
it('health reports unreachable when /versions fails', async () => {
|
||||
const { fetchImpl } = mockFetch([{ ok: false, status: 502 }]);
|
||||
const c = new MatrixConnector(CONFIG, { accessToken: 'tok', fetchImpl });
|
||||
const h = await c.health();
|
||||
expect(h.reachable).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('registerMatrixConnector', () => {
|
||||
beforeEach(() => _resetConnectorRegistry());
|
||||
|
||||
it('registers a matrix factory createConnector can build', () => {
|
||||
registerMatrixConnector({ MATRIX_ACCESS_TOKEN: 'tok' } as NodeJS.ProcessEnv);
|
||||
const c = createConnector({ kind: 'matrix', matrix: CONFIG });
|
||||
expect(c.kind).toBe('matrix');
|
||||
});
|
||||
|
||||
it('the factory rejects config missing the matrix block', () => {
|
||||
registerMatrixConnector({ MATRIX_ACCESS_TOKEN: 'tok' } as NodeJS.ProcessEnv);
|
||||
expect(() => createConnector({ kind: 'matrix' })).toThrow(/missing the .matrix. block/i);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user