diff --git a/apps/appservice/package.json b/apps/appservice/package.json new file mode 100644 index 0000000..3298699 --- /dev/null +++ b/apps/appservice/package.json @@ -0,0 +1,35 @@ +{ + "name": "@mosaicstack/mosaic-as", + "version": "0.0.1", + "type": "module", + "private": true, + "repository": { + "type": "git", + "url": "https://git.mosaicstack.dev/mosaicstack/stack.git", + "directory": "apps/appservice" + }, + "main": "dist/main.js", + "bin": { + "mosaic-as": "dist/main.js", + "mosaic-as-registration": "dist/registration-main.js" + }, + "scripts": { + "build": "tsc", + "lint": "eslint src", + "typecheck": "tsc --noEmit", + "test": "vitest run --passWithNoTests", + "dev": "tsx watch src/main.ts" + }, + "dependencies": { + "@mosaicstack/appservice": "workspace:*" + }, + "devDependencies": { + "@types/node": "^22.0.0", + "tsx": "^4.19.0", + "typescript": "^5.8.0", + "vitest": "^2.0.0" + }, + "files": [ + "dist" + ] +} diff --git a/apps/appservice/src/__tests__/server.test.ts b/apps/appservice/src/__tests__/server.test.ts new file mode 100644 index 0000000..6a2658a --- /dev/null +++ b/apps/appservice/src/__tests__/server.test.ts @@ -0,0 +1,152 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { AppserviceDaemon } from '../server.js'; +import type { DaemonConfig, DaemonRequest } from '../server.js'; + +const cfg: DaemonConfig = { + homeserverUrl: 'https://hs.example', + domain: 'hs.example', + asToken: 'as-secret', + hsToken: 'hs-secret', + bridgeTokens: ['bridge-secret'], +}; + +const jsonResponse = (status: number, body: unknown): Response => + new Response(JSON.stringify(body), { status, headers: { 'Content-Type': 'application/json' } }); + +const request = (overrides: Partial): DaemonRequest => ({ + method: 'GET', + path: '/', + searchParams: new URLSearchParams(), + body: undefined, + ...overrides, +}); + +const makeDaemon = () => { + const fetchMock = vi.fn(async (_input: URL | string) => jsonResponse(200, { event_id: '$sent' })); + const daemon = new AppserviceDaemon(cfg, fetchMock as unknown as typeof fetch, () => {}); + return { daemon, fetchMock }; +}; + +describe('AppserviceDaemon routing', () => { + it('serves health unauthenticated', async () => { + const { daemon } = makeDaemon(); + expect((await daemon.handle(request({ path: '/health' }))).status).toBe(200); + }); + + it('404s unknown paths', async () => { + const { daemon } = makeDaemon(); + expect((await daemon.handle(request({ path: '/nope' }))).status).toBe(404); + }); + + it('transactions require the hs_token', async () => { + const { daemon } = makeDaemon(); + const bad = await daemon.handle( + request({ + method: 'PUT', + path: '/_matrix/app/v1/transactions/t1', + authorizationHeader: 'Bearer wrong', + body: { events: [] }, + }), + ); + expect(bad.status).toBe(403); + const ok = await daemon.handle( + request({ + method: 'PUT', + path: '/_matrix/app/v1/transactions/t1', + authorizationHeader: 'Bearer hs-secret', + body: { events: [{ type: 'm.room.message', event_id: '$e' }] }, + }), + ); + expect(ok.status).toBe(200); + }); + + it('bridge requires a bridge token (hs/as tokens do not work)', async () => { + const { daemon } = makeDaemon(); + for (const token of [undefined, 'Bearer hs-secret', 'Bearer as-secret', 'Bearer nope']) { + const res = await daemon.handle( + request({ + method: 'POST', + path: '/bridge/v1/messages', + authorizationHeader: token, + body: {}, + }), + ); + expect(res.status).toBe(403); + } + }); + + it('bridge message sends as the agent and returns the event id', async () => { + const { daemon, fetchMock } = makeDaemon(); + const res = await daemon.handle( + request({ + method: 'POST', + path: '/bridge/v1/messages', + authorizationHeader: 'Bearer bridge-secret', + body: { room_id: '!r:hs.example', agent: 'pi0-web1', body: 'hi', thread_root: '$req' }, + }), + ); + expect(res.status).toBe(200); + expect(res.body.event_id).toBe('$sent'); + const sendCall = fetchMock.mock.calls + .map((c) => new URL(String(c[0]))) + .find((u) => u.pathname.includes('/send/m.room.message/')); + expect(sendCall).toBeDefined(); + expect(sendCall!.searchParams.get('user_id')).toBe('@agent-pi0-web1:hs.example'); + }); + + it('bridge rejects invalid payloads with 400', async () => { + const { daemon } = makeDaemon(); + const res = await daemon.handle( + request({ + method: 'POST', + path: '/bridge/v1/messages', + authorizationHeader: 'Bearer bridge-secret', + body: { room_id: 'bad', agent: 'pi0', body: 'x' }, + }), + ); + expect(res.status).toBe(400); + }); + + it('bridge typing endpoint works', async () => { + const { daemon, fetchMock } = makeDaemon(); + const res = await daemon.handle( + request({ + method: 'POST', + path: '/bridge/v1/typing', + authorizationHeader: 'Bearer bridge-secret', + body: { room_id: '!r:hs.example', agent: 'pi0-web1', typing: true }, + }), + ); + expect(res.status).toBe(200); + const typingCall = fetchMock.mock.calls + .map((c) => new URL(String(c[0]))) + .find((u) => u.pathname.includes('/typing/')); + expect(typingCall).toBeDefined(); + }); + + it('authenticated unknown bridge sub-paths return 405, never fall through', async () => { + const { daemon } = makeDaemon(); + const res = await daemon.handle( + request({ + method: 'GET', + path: '/bridge/v1/unknown', + authorizationHeader: 'Bearer bridge-secret', + }), + ); + expect(res.status).toBe(405); + }); + + it('empty bridge token list denies everything', async () => { + const daemon = new AppserviceDaemon({ ...cfg, bridgeTokens: [] }, undefined, () => {}); + const res = await daemon.handle( + request({ + method: 'POST', + path: '/bridge/v1/typing', + authorizationHeader: 'Bearer bridge-secret', + body: {}, + }), + ); + expect(res.status).toBe(403); + }); +}); diff --git a/apps/appservice/src/config.ts b/apps/appservice/src/config.ts new file mode 100644 index 0000000..ae65894 --- /dev/null +++ b/apps/appservice/src/config.ts @@ -0,0 +1,23 @@ +import type { DaemonConfig } from './server.js'; + +const required = (name: string): string => { + const value = process.env[name]; + if (!value) throw new Error(`missing required env var ${name}`); + return value; +}; + +export function configFromEnv(): DaemonConfig & { port: number } { + return { + homeserverUrl: required('MOSAIC_AS_HOMESERVER_URL'), + domain: required('MOSAIC_AS_DOMAIN'), + asToken: required('MOSAIC_AS_TOKEN'), + hsToken: required('MOSAIC_HS_TOKEN'), + userPrefix: process.env.MOSAIC_AS_USER_PREFIX ?? 'agent-', + senderLocalpart: process.env.MOSAIC_AS_SENDER_LOCALPART ?? 'mosaic-as', + bridgeTokens: (process.env.MOSAIC_AS_BRIDGE_TOKENS ?? '') + .split(',') + .map((t) => t.trim()) + .filter(Boolean), + port: Number(process.env.MOSAIC_AS_PORT ?? 8008), + }; +} diff --git a/apps/appservice/src/main.ts b/apps/appservice/src/main.ts new file mode 100644 index 0000000..5258d88 --- /dev/null +++ b/apps/appservice/src/main.ts @@ -0,0 +1,67 @@ +import http from 'node:http'; + +import { configFromEnv } from './config.js'; +import { AppserviceDaemon } from './server.js'; + +const cfg = configFromEnv(); +const daemon = new AppserviceDaemon(cfg); + +const MAX_BODY_BYTES = 1024 * 1024; + +const server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + let received = 0; + let rejected = false; + req.on('data', (chunk: Buffer) => { + received += chunk.length; + if (received > MAX_BODY_BYTES) { + rejected = true; + res.writeHead(413, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ errcode: 'M_TOO_LARGE', error: 'request body too large' })); + req.destroy(); + return; + } + chunks.push(chunk); + }); + req.on('end', () => { + if (rejected) return; + void (async () => { + const url = new URL(req.url ?? '/', 'http://localhost'); + let body: unknown; + try { + const raw = Buffer.concat(chunks).toString(); + body = raw ? JSON.parse(raw) : undefined; + } catch { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ errcode: 'M_NOT_JSON', error: 'invalid json' })); + return; + } + const result = await daemon.handle({ + method: req.method ?? 'GET', + path: url.pathname, + searchParams: url.searchParams, + authorizationHeader: req.headers.authorization, + body, + }); + res.writeHead(result.status, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(result.body)); + })().catch((error: unknown) => { + console.error('request failed:', error); + if (res.headersSent) { + res.destroy(); + return; + } + res.writeHead(500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'internal error' })); + }); + }); +}); + +server.listen(cfg.port, () => { + console.log( + `mosaic-as listening on :${cfg.port} (homeserver ${cfg.homeserverUrl}, domain ${cfg.domain})`, + ); + if (cfg.bridgeTokens.length === 0) { + console.warn('WARNING: MOSAIC_AS_BRIDGE_TOKENS is empty — bridge API will deny all requests'); + } +}); diff --git a/apps/appservice/src/registration-main.ts b/apps/appservice/src/registration-main.ts new file mode 100644 index 0000000..bf7ec8c --- /dev/null +++ b/apps/appservice/src/registration-main.ts @@ -0,0 +1,10 @@ +import { buildRegistration, registrationToYaml } from '@mosaicstack/appservice'; + +import { configFromEnv } from './config.js'; + +// Prints the Synapse registration YAML (mosaic-as.yaml) for the current env. +// Usage: MOSAIC_AS_URL=http://mosaic-as:8008 mosaic-as-registration > mosaic-as.yaml +const cfg = configFromEnv(); +const url = process.env.MOSAIC_AS_URL; +if (!url) throw new Error('missing required env var MOSAIC_AS_URL'); +process.stdout.write(registrationToYaml(buildRegistration(cfg, { url }))); diff --git a/apps/appservice/src/server.ts b/apps/appservice/src/server.ts new file mode 100644 index 0000000..cff0e6c --- /dev/null +++ b/apps/appservice/src/server.ts @@ -0,0 +1,124 @@ +import { createHmac, randomBytes, timingSafeEqual } from 'node:crypto'; + +import { + AppserviceIntent, + TransactionHandler, + validateBridgeMessage, + validateBridgeTyping, +} from '@mosaicstack/appservice'; +import type { AppserviceConfig, MatrixEvent } from '@mosaicstack/appservice'; + +export interface DaemonConfig extends AppserviceConfig { + /** Bearer tokens accepted on /bridge/v1/* (one per agent-comms host daemon). */ + bridgeTokens: string[]; +} + +export interface DaemonRequest { + method: string; + /** URL path without query string. */ + path: string; + searchParams: URLSearchParams; + authorizationHeader?: string; + body: unknown; +} + +export interface DaemonResponse { + status: number; + body: Record; +} + +// Compare equal-length HMAC digests so neither content nor LENGTH of the +// stored secret is observable through timing. +const HMAC_KEY = randomBytes(32); +const digest = (value: string): Buffer => createHmac('sha256', HMAC_KEY).update(value).digest(); + +const safeEqual = (a: string, b: string): boolean => timingSafeEqual(digest(a), digest(b)); + +const TXN_PATH = /^\/_matrix\/app\/v1\/transactions\/([^/]+)$/; + +/** + * HTTP-framework-agnostic request router for the mosaic-as daemon: the + * Application Service transactions endpoint (Synapse-facing) plus the + * internal bridge API v1 (agent-comms daemon-facing). main.ts binds this to + * node:http; tests drive it directly. + */ +export class AppserviceDaemon { + readonly intent: AppserviceIntent; + private readonly transactions: TransactionHandler; + + constructor( + private readonly cfg: DaemonConfig, + fetchImpl?: typeof fetch, + private readonly log: (line: string) => void = (line) => console.log(line), + ) { + this.intent = new AppserviceIntent(cfg, fetchImpl); + this.transactions = new TransactionHandler({ + hsToken: cfg.hsToken, + onEvent: (event) => this.onEvent(event), + onError: (error, txnId) => this.log(`txn ${txnId} handler error: ${String(error)}`), + }); + } + + /** v1: the daemon only observes; room logic lives in the agent-comms daemons. */ + private onEvent(event: MatrixEvent): void { + if (event.type === 'm.room.message') { + this.log( + `event ${event.event_id ?? '?'} in ${event.room_id ?? '?'} from ${event.sender ?? '?'}`, + ); + } + } + + private bridgeAuthorized(authorizationHeader: string | undefined): boolean { + if (!authorizationHeader?.startsWith('Bearer ')) return false; + const presented = authorizationHeader.slice('Bearer '.length); + return this.cfg.bridgeTokens.some((token) => safeEqual(presented, token)); + } + + async handle(req: DaemonRequest): Promise { + if (req.method === 'GET' && req.path === '/health') { + return { status: 200, body: { ok: true } }; + } + + const txnMatch = req.method === 'PUT' ? TXN_PATH.exec(req.path) : null; + if (txnMatch?.[1] !== undefined) { + return this.transactions.handle(txnMatch[1], req.body, { + authorizationHeader: req.authorizationHeader, + accessTokenParam: req.searchParams.get('access_token') ?? undefined, + }); + } + + if (req.path.startsWith('/bridge/v1/')) { + if (!this.bridgeAuthorized(req.authorizationHeader)) { + return { status: 403, body: { errcode: 'M_FORBIDDEN', error: 'bad bridge token' } }; + } + try { + if (req.method === 'POST' && req.path === '/bridge/v1/messages') { + validateBridgeMessage(req.body); + const eventId = await this.intent.sendAsAgent({ + roomId: req.body.room_id, + agent: req.body.agent, + body: req.body.body, + threadRoot: req.body.thread_root, + msgtype: req.body.msgtype, + extraContent: req.body.extra_content, + }); + return { status: 200, body: { event_id: eventId ?? null } }; + } + if (req.method === 'POST' && req.path === '/bridge/v1/typing') { + validateBridgeTyping(req.body); + await this.intent.setTyping(req.body.room_id, req.body.agent, req.body.typing); + return { status: 200, body: {} }; + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.log(`bridge error ${req.method} ${req.path}: ${message}`); + return { status: 400, body: { error: message } }; + } + // Explicit: never fall out of the authenticated bridge block, so future + // sub-paths cannot accidentally route around the auth guard above. + return { status: 405, body: { error: 'unsupported bridge method/path' } }; + } + + return { status: 404, body: { error: 'not found' } }; + } +} diff --git a/apps/appservice/tsconfig.json b/apps/appservice/tsconfig.json new file mode 100644 index 0000000..c973386 --- /dev/null +++ b/apps/appservice/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +} diff --git a/docker/appservice.Dockerfile b/docker/appservice.Dockerfile new file mode 100644 index 0000000..a92e252 --- /dev/null +++ b/docker/appservice.Dockerfile @@ -0,0 +1,28 @@ +FROM node:22-alpine AS base +ENV PNPM_HOME="/pnpm" +ENV PATH="$PNPM_HOME:$PATH" +RUN corepack enable + +FROM base AS builder +WORKDIR /app +# Copy workspace manifests first for layer-cached install +COPY pnpm-workspace.yaml pnpm-lock.yaml package.json ./ +COPY apps/appservice/package.json ./apps/appservice/ +COPY packages/ ./packages/ +COPY plugins/ ./plugins/ +RUN pnpm install --frozen-lockfile +COPY . . +RUN pnpm turbo run build --filter @mosaicstack/mosaic-as... +RUN pnpm --filter @mosaicstack/mosaic-as --prod deploy --legacy /deploy + +FROM base AS runner +WORKDIR /app +ENV NODE_ENV=production +COPY --from=builder /deploy/node_modules ./node_modules +COPY --from=builder /deploy/package.json ./package.json +COPY --from=builder /app/apps/appservice/dist ./dist +USER node +EXPOSE 8008 +HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=5 \ + CMD ["node", "-e", "require('http').get('http://127.0.0.1:8008/health',r=>process.exit(r.statusCode===200?0:1)).on('error',()=>process.exit(1))"] +CMD ["node", "dist/main.js"] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index acab223..6132a4d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -39,6 +39,25 @@ importers: specifier: ^2.0.0 version: 2.1.9(@types/node@24.12.0)(jsdom@29.0.0(@noble/hashes@2.0.1))(lightningcss@1.31.1) + apps/appservice: + dependencies: + '@mosaicstack/appservice': + specifier: workspace:* + version: link:../../packages/appservice + devDependencies: + '@types/node': + specifier: ^22.0.0 + version: 22.19.15 + tsx: + specifier: ^4.19.0 + version: 4.21.0 + typescript: + specifier: ^5.8.0 + version: 5.9.3 + vitest: + specifier: ^2.0.0 + version: 2.1.9(@types/node@22.19.15)(jsdom@29.0.0(@noble/hashes@2.0.1))(lightningcss@1.31.1) + apps/gateway: dependencies: '@anthropic-ai/sdk':