Compare commits

...

139 Commits

Author SHA1 Message Date
51d6302401 Merge pull request 'style(web): fix prettier formatting in AuditLogDrawer' (#743) from fix/audit-drawer-format into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 01:56:03 +00:00
cf490510bf style(web): fix prettier formatting in AuditLogDrawer
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 19:54:08 -06:00
3d91334df7 Merge pull request 'fix(mission-control): increase rate limit for events/recent, add error handling' (#742) from fix/mission-control-ratelimit into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-08 00:44:31 +00:00
e80b624ca6 fix(mission-control): increase rate limit for events/recent, add error handling
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 18:42:50 -06:00
65536fcb75 Merge pull request 'fix(orchestrator): add missing module import for OrchestratorApiKeyGuard' (#741) from fix/orchestrator-guard-import into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 23:36:55 +00:00
53915dc621 fix(orchestrator): add missing module import for OrchestratorApiKeyGuard
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 17:35:57 -06:00
398ee06920 Merge pull request 'chore: release v0.0.23 — Mission Control Dashboard' (#740) from chore/ms23-v0.0.23-release into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 23:01:19 +00:00
2182717f59 chore: release v0.0.23 — Mission Control Dashboard
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 17:00:15 -06:00
fe55363f38 Merge pull request 'chore: MS23-P4-001 QA gate — lint/typecheck/test all green' (#739) from chore/ms23-p4-qa into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 22:57:06 +00:00
d60165572a fix(orchestrator): encrypt OpenClaw provider tokens at rest
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 16:55:51 -06:00
ff73fbd391 Merge pull request 'test(orchestrator): MS23-P3-004 OpenClaw provider E2E — Phase 3 gate' (#738) from test/ms23-p3 into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 22:46:21 +00:00
95ec63a868 test(orchestrator): MS23-P3-004 OpenClaw provider E2E — Phase 3 gate
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 16:44:57 -06:00
2ab736b68b Merge pull request 'feat(web): MS23-P3-003 OpenClaw provider config UI' (#736) from feat/ms23-p3-config-ui into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 22:33:22 +00:00
30e0168983 Merge pull request 'feat(orchestrator): MS23-P3-002 OpenClaw SSE bridge' (#737) from feat/ms23-p3-openclaw-bridge into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 22:33:13 +00:00
495d78115e feat(orchestrator): add OpenClaw SSE bridge streaming
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 16:20:27 -06:00
54ee5cf945 feat(web): MS23-P3-003 OpenClaw provider config UI
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 16:18:21 -06:00
563d59ad5d Merge pull request 'feat(orchestrator): MS23-P3-001 OpenClawProvider' (#735) from feat/ms23-p3-openclaw-provider into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 22:06:31 +00:00
da6e055113 feat(orchestrator): MS23-P3-001 OpenClawProvider
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 15:58:56 -06:00
0441d44f42 Merge pull request 'test(web): MS23-P2-009 Mission Control frontend tests' (#733) from test/ms23-p2 into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 21:38:36 +00:00
7147dc3503 test(web): MS23-P2-009 Mission Control frontend tests
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 15:27:36 -06:00
f0aa3b5a75 test(web): add Mission Control frontend tests for phase 2 gate
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 15:25:39 -06:00
11d64341b1 Merge pull request 'fix(web): MS23-P2 restore main branch CI' (#732) from fix/ms23-p2-main into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 21:08:54 +00:00
90d2fa7563 fix(web): MS23-P2 fix broken lint/typecheck on main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 14:55:04 -06:00
31af6c26ec Merge pull request 'feat(web): MS23-P2-008 Panel grid responsive layout' (#731) from feat/ms23-p2-grid into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 20:48:36 +00:00
e4f942dde7 feat(web): MS23-P2-008 panel grid responsive layout
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 14:47:39 -06:00
4ea31c5749 Merge pull request 'feat: MS23-P2-007 AuditLogDrawer + audit log endpoint' (#730) from feat/ms23-p2-audit into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 20:40:21 +00:00
4792f7b70a feat: MS23-P2-007 AuditLogDrawer + audit log endpoint
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 14:40:06 -06:00
571094a099 Merge pull request 'feat(web): MS23-P2-004 Panel operator controls' (#729) from feat/ms23-p2-controls into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 20:35:53 +00:00
adef5bdbb2 feat(web): MS23-P2-004 panel operator controls
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 14:34:31 -06:00
eb771d795a Merge pull request 'feat(web): MS23-P2-003 BargeInInput component' (#728) from feat/ms23-p2-barge into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 20:33:39 +00:00
c9aff531ea Merge pull request 'feat(web): MS23-P2-006 KillAllDialog confirmation modal' (#727) from feat/ms23-p2-killall into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 20:33:03 +00:00
b2c751caca feat(web): MS23-P2-003 BargeInInput component
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 14:32:12 -06:00
cd28428cf2 feat(web): MS23-P2-006 KillAllDialog confirmation modal
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 14:31:38 -06:00
2c36569f85 Merge pull request 'feat(web): MS23-P2-005 GlobalAgentRoster sidebar tree' (#726) from feat/ms23-p2-roster into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 20:20:17 +00:00
7c086db7e4 feat(web): MS23-P2-005 GlobalAgentRoster sidebar tree
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-07 14:19:05 -06:00
577e6141e0 Merge pull request 'feat(web): MS23-P2-002 OrchestratorPanel SSE stream component' (#725) from feat/ms23-p2-panel into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 20:17:56 +00:00
631ba499e3 feat(web): MS23-P2-002 OrchestratorPanel SSE stream component
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 14:14:57 -06:00
a61106c24a Merge pull request 'feat(web): MS23-P2-001 Mission Control page route + layout shell' (#724) from feat/ms23-p2-page into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 20:00:55 +00:00
487aac6903 feat(web): add Mission Control page layout shell
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 13:59:23 -06:00
544e828e58 Merge pull request 'test(orchestrator): MS23-P1-006 Phase 1 provider system unit tests' (#723) from test/ms23-p1 into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 19:51:09 +00:00
9489bc63f8 test(orchestrator): add mission-control phase 1 gate coverage
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 13:47:49 -06:00
ad644799aa feat(orchestrator): MS23-P1-005 Mission Control proxy API (#722)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 19:43:34 +00:00
81bf349270 Merge pull request 'feat(orchestrator): MS23-P1-004 AgentProviderConfig CRUD API' (#721) from feat/ms23-p1-provider-api into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 19:26:24 +00:00
bcada71e88 feat(orchestrator): add AgentProviderConfig CRUD API
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-07 13:22:56 -06:00
9cc82e7fcf feat(orchestrator): MS23-P1-003 AgentProviderRegistry (#720)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 19:15:59 +00:00
4b135ae1f0 feat(orchestrator): MS23-P1-002 InternalAgentProvider (#719)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 19:06:36 +00:00
364619b332 feat(shared): MS23-P1-001 IAgentProvider interface + agent types (#718)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:29:18 +00:00
18ed3a5411 chore(tasks): MS23 Phase 0 complete — all 6 P0 tasks done (#717)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:13:12 +00:00
79ff3a921f test(orchestrator): MS23-P0-006 service unit tests for AgentMessages/Control/Tree (#716)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 18:12:47 +00:00
76f06d0291 chore(tasks): MS23-P0-005 done, P0-006 in-progress (#715)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:58:20 +00:00
03dd25f028 feat(orchestrator): MS23-P0-005 subagent tree endpoint (#714)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:57:55 +00:00
f3726de54e chore(tasks): MS23-P0-004 done, P0-005 in-progress (#713)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:43:35 +00:00
d0c6622de5 feat(orchestrator): MS23-P0-004 operator inject/pause/resume endpoints (#712)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:43:11 +00:00
4749f52668 chore(tasks): MS23-P0-002 done, P0-003 done, P0-004 in-progress (#711)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:15:57 +00:00
e0b28c91c3 feat(orchestrator): MS23 per-agent message history and SSE stream (#702)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:15:30 +00:00
fa7837af3e fix(orchestrator): rm symlink before schema COPY in Docker builder (#710)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 17:00:38 +00:00
123cbce5cd fix(orchestrator): copy schema to overwrite dangling symlink in Docker (#709)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:45:54 +00:00
7d47e5ff99 fix(orchestrator): use symlink path ./prisma/schema.prisma in generate script (#708)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:31:49 +00:00
ef674206e7 fix(orchestrator): symlink prisma/schema.prisma to resolve Docker build root detection (#707)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:17:20 +00:00
977747599f fix(orchestrator): local prisma schema copy for Docker generate (#706)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 16:00:07 +00:00
fc4699ca51 fix(orchestrator): copy apps/api/package.json for prisma generate in Dockerfile (#705)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 15:38:46 +00:00
b61554800b fix(orchestrator): add prisma CLI devDependency for prisma:generate (#704)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 06:56:42 +00:00
98e892f23c fix(orchestrator): Dockerfile prisma generate + vitest reflect-metadata setup (#703)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:45:17 +00:00
de6faf659e feat(orchestrator): MS23 agent lifecycle ingestion service (#701)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 04:21:26 +00:00
49fa958444 chore(orchestrator): MS23 P0-001 done, P0-002 in-progress (#700)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 02:10:59 +00:00
8d6abd72bb feat(api): MS23 mission control Prisma schema (#699)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 01:18:06 +00:00
1bed5b3573 chore(orchestrator): Bootstrap MS23 mission-control TASKS.md + PRD (#698)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 00:27:24 +00:00
c644d1044b Merge pull request 'fix(api): add AuthModule import to UserAgentModule' (#691) from fix/user-agent-auth-module into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-05 22:22:18 +00:00
bf5779fb73 fix(api): add AuthModule import to UserAgentModule
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-05 16:21:38 -06:00
08d7a6b708 chore: trigger CI rebuild for hotfix 2026-03-05 16:15:32 -06:00
570edef4e5 Merge pull request 'fix(api): add AuthModule import to AgentTemplateModule' (#690) from fix/agent-template-auth-module into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-05 22:10:36 +00:00
d220be6b58 fix(api): add AuthModule import to AgentTemplateModule
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
The AgentTemplateController uses AuthGuard which requires AuthService,
but AgentTemplateModule was not importing AuthModule. This caused the
API to crash during dependency resolution.
2026-03-05 09:46:22 -06:00
ade9e968ca chore(ms22-p2): mission complete — all 10 tasks done (#689)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 15:41:43 +00:00
413ecdb63b feat(ms22-p2): add Discord channel → agent routing (#688)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 14:04:06 +00:00
e85fb11f03 test(ms22-p2): add unit tests for agent services (#687)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 03:40:35 +00:00
0869a3dcb6 chore(ms22-p2): update mission docs — P2-008 complete (#686)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 03:32:36 +00:00
a70f149886 feat(ms22-p2): add agent selector UI in WebUI (#685)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 03:29:02 +00:00
2f1ee53c8d feat(ms22-p2): add agent status endpoints and chat routing (#684)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 02:56:36 +00:00
b52c4e7ff9 chore(ms22-p2): update docs after P2-004 completion (#683)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 02:46:24 +00:00
af56684e84 feat(ms22-p2): add UserAgent CRUD endpoints (#682)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 02:44:19 +00:00
ee4d6fa12b feat(ms22-p2): add AgentTemplate admin CRUD endpoints (#678)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 02:32:17 +00:00
5bd08b0d0b fix(deps): update multer override to >=2.1.1 (#681)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-05 02:27:08 +00:00
1eb581553a Merge pull request 'docs(ms22-p2): validated PRD — 15/15 mosaic prdy validate' (#680) from docs/ms22-p2-prd-validated into main 2026-03-05 01:59:25 +00:00
da62b9bb73 docs(ms22-p2): validated PRD with FR/US/AC items — 15/15 mosaic prdy validate 2026-03-04 19:59:20 -06:00
62fc76fea6 Merge pull request 'chore(ms22-p2): initialize mission, update manifest and TASKS' (#679) from chore/ms22-p2-mission-init into main 2026-03-05 01:54:14 +00:00
8b38026fed chore(ms22-p2): initialize mission, update manifest and TASKS 2026-03-04 19:53:57 -06:00
82b1b4cb41 Merge pull request 'feat(ms22-p2): seed default agent templates' (#677) from feat/ms22-p2-agent-seed into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-05 01:42:05 +00:00
22e08e4ef2 feat(ms22-p2): seed default agent templates (jarvis, builder, medic)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-04 19:41:25 -06:00
29cc37f8df Merge pull request 'ci: mark deploy-swarm as failure:ignore' (#676) from fix/ci-disable-deploy-swarm into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-05 01:02:40 +00:00
091fb54f77 ci: mark deploy-swarm as failure:ignore so CI passes independently of deploy
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-04 19:02:25 -06:00
939479ac7e Merge pull request 'feat(ms22-p2): add AgentTemplate and UserAgent schema' (#675) from feat/ms22-p2-agent-schema into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-05 00:49:44 +00:00
9031509bbd Merge pull request 'test(web): update useChat tests for streaming-only implementation' (#674) from fix/usechat-tests into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-05 00:49:38 +00:00
f11a005538 feat(ms22-p2): add AgentTemplate and UserAgent prisma schema
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-04 18:49:25 -06:00
8484e060d7 test(web): update useChat tests for streaming-only implementation
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-04 18:14:14 -06:00
673ca32d5a Merge pull request 'docs(ms22): add Phase 2 PRD and TASKS for Named Agent Fleet' (#673) from docs/ms22-p2-agent-fleet-prd into main 2026-03-04 20:18:38 +00:00
a777f1f695 docs(ms22): add Phase 2 PRD and TASKS for Named Agent Fleet 2026-03-04 14:17:57 -06:00
d7d8c3c88d Merge pull request 'fix(chat): restrict to authenticated users only, fix overlay transparency' (#672) from fix/chat-auth-only into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-04 20:15:13 +00:00
aec8085f60 chore: mark orchestrator session as completed 2026-03-04 14:12:58 -06:00
44da50d0b3 fix(chat): restrict to authenticated users only, fix overlay transparency
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-04 11:33:32 -06:00
44fb402ef2 Merge pull request 'ci: use Portainer API for Docker Swarm deploy' (#671) from ci/portainer-v2 into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-03 19:01:31 +00:00
f42c47e314 ci: use Portainer API for Docker Swarm deploy
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-03 13:00:59 -06:00
8069aeadb5 Merge pull request 'fix(chat): ConfigModule import + CSRF skip for guest endpoint' (#670) from fix/chat-complete into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-03 19:00:06 +00:00
1f883c4c04 chore: remove stray file 2026-03-03 12:58:00 -06:00
5207d8c0c9 fix(chat): skip CSRF for guest endpoint
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-03 12:36:01 -06:00
d1c9a747b9 fix(chat): import ConfigModule in ChatProxyModule
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-03 12:28:50 -06:00
3d669713d7 Merge pull request 'feat(chat): add guest chat mode for unauthenticated users' (#667) from feature/chat-guest-mode into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-03 17:52:08 +00:00
1a6cf113c8 fix(lint): resolve prettier formatting in useChat.ts
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-03 11:46:05 -06:00
48d734516a fix(lint): resolve prettier and dot-notation errors
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-03 11:40:38 -06:00
83477165d4 fix(chat): correct indentation in useChat guest fallback
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-03 11:22:18 -06:00
c45cec3bba feat(chat): add guest chat mode for unauthenticated users
Some checks failed
ci/woodpecker/push/ci Pipeline failed
- Add POST /api/chat/guest endpoint (no auth required)
- Add proxyGuestChat() method using configurable LLM endpoint
- Add streamGuestChat() function to frontend chat API
- Modify useChat to fall back to guest mode on auth errors (403/401)
- Remove !user check from ChatInput disabled prop
- Configure guest LLM via env vars: GUEST_LLM_URL, GUEST_LLM_API_KEY, GUEST_LLM_MODEL
- Default guest LLM: http://10.1.1.42:11434/v1 (Ollama) with llama3.2 model
2026-03-03 11:16:23 -06:00
b1baa70e00 fix(db): add missing MS21 user auth fields migration (#666)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 04:10:10 +00:00
55340dc661 fix(infra): install pgvector + uuid-ossp extensions in mosaic-db-init (#665)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 03:55:25 +00:00
a8d426e3c0 infra: migrate postgres to shared openbrain_brain-db (#664)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 03:45:46 +00:00
40e12214cf fix(test): make queue completion test more robust (#663)
Some checks failed
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/push/coordinator Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 02:36:36 +00:00
892ffd637f ci: fix deploy service names (#662)
Some checks failed
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline failed
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/manual/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 02:06:11 +00:00
394a46bef2 ci: fix deploy - use docker service update (#661)
Some checks failed
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/manual/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-03 01:23:01 +00:00
29a78890c9 ci: use localadmin for deploy (#660)
Some checks failed
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 18:06:05 +00:00
0c88010123 ci: add auto-deploy to Docker Swarm (#658)
Some checks failed
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 17:42:06 +00:00
7f94ecdc7a fix: add missing orchestrator endpoints + fix AgentStatusWidget (#657)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 16:43:51 +00:00
5b77774d91 fix(web): remove mock data from dashboard telemetry/tasks/calendar (#656)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 14:19:27 +00:00
a16371c6f9 fix(ci): use node:24-slim (glibc) instead of Alpine (musl) (#655)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 01:40:37 +00:00
51d46b2e4a fix(ci): copy .npmrc before pnpm install in all Dockerfiles (#654)
Some checks failed
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 01:09:22 +00:00
6582785ddd fix: matrix native binary + Dockerfile audit (#653)
All checks were successful
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 00:19:41 +00:00
ae0bebe2e0 ci: enable Kaniko layer caching (#652)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-02 00:08:15 +00:00
173b429c62 fix(ci): Kaniko for base image build (#651)
All checks were successful
ci/woodpecker/manual/base-image Pipeline was successful
ci/woodpecker/manual/infra Pipeline was successful
ci/woodpecker/manual/coordinator Pipeline was successful
ci/woodpecker/manual/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 23:41:46 +00:00
7d505e75f8 feat: custom node base image (#649)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 23:39:41 +00:00
cd1c52c506 ci: pnpm store cache (#648)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 23:26:51 +00:00
a00f1e1fd7 fix(api): activity interceptor tests (#647)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 23:15:16 +00:00
9305cacd4a fix(web): kanban add-task tests (#645)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 23:03:21 +00:00
0d5aa5c3ae feat: wire chat to backend (#644)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:54:48 +00:00
eb34eb8104 feat: compact usage widget in header (#643)
Some checks failed
ci/woodpecker/push/infra Pipeline was successful
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/push/coordinator Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:53:31 +00:00
5165a30fad feat: compact usage widget in header (#642)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:51:50 +00:00
6eb91c9eba fix(api): security hardening — helmet + auth rate limiting (#641)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:43:10 +00:00
e7da4ca25e fix: attach domain to project (#640)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:33:49 +00:00
e1e265804a feat: inline add-task in Kanban (#638)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:33:07 +00:00
d361d00674 fix: Logs page — activity_logs, optional workspaceId, autoRefresh on (#637)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 22:10:16 +00:00
78ff8f8e70 fix: GET workspace members endpoint (#635)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 21:53:51 +00:00
2463b7b8ba test(glm47): workspace stats endpoint (#633)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 21:46:48 +00:00
5b235a668f fix(web): CI lint failures from PR #632 (#634)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 21:41:29 +00:00
178 changed files with 16157 additions and 1728 deletions

View File

@@ -343,6 +343,11 @@ RATE_LIMIT_STORAGE=redis
# DISCORD_CONTROL_CHANNEL_ID=channel-id-for-commands # DISCORD_CONTROL_CHANNEL_ID=channel-id-for-commands
# DISCORD_WORKSPACE_ID=your-workspace-uuid # DISCORD_WORKSPACE_ID=your-workspace-uuid
# #
# Agent channel routing: Maps Discord channels to specific agents.
# Format: <channelId>:<agentName>,<channelId>:<agentName>
# Example: 123456789:jarvis,987654321:builder
# DISCORD_AGENT_CHANNELS=
#
# SECURITY: DISCORD_WORKSPACE_ID must be a valid workspace UUID from your database. # SECURITY: DISCORD_WORKSPACE_ID must be a valid workspace UUID from your database.
# All Discord commands will execute within this workspace context for proper # All Discord commands will execute within this workspace context for proper
# multi-tenant isolation. Each Discord bot instance should be configured for # multi-tenant isolation. Each Discord bot instance should be configured for

View File

@@ -1,56 +1,56 @@
{ {
"schema_version": 1, "schema_version": 1,
"mission_id": "ms21-multi-tenant-rbac-data-migration-20260228", "mission_id": "ms22-p2-named-agent-fleet-20260304",
"name": "MS21 Multi-Tenant RBAC Data Migration", "name": "MS22-P2 Named Agent Fleet",
"description": "Build multi-tenant user/workspace/team management, break-glass auth, RBAC UI enforcement, and migrate jarvis-brain data into Mosaic Stack", "description": "",
"project_path": "/home/jwoltje/src/mosaic-stack", "project_path": "/home/jwoltje/src/mosaic-stack",
"created_at": "2026-02-28T17:10:22Z", "created_at": "2026-03-05T01:53:28Z",
"status": "active", "status": "active",
"task_prefix": "MS21", "task_prefix": "",
"quality_gates": "pnpm lint && pnpm build && pnpm test", "quality_gates": "",
"milestone_version": "0.0.21", "milestone_version": "0.0.1",
"milestones": [ "milestones": [
{ {
"id": "phase-1", "id": "phase-1",
"name": "Schema and Admin API", "name": "Schema+Seed",
"status": "pending", "status": "pending",
"branch": "schema-and-admin-api", "branch": "schema-seed",
"issue_ref": "", "issue_ref": "",
"started_at": "", "started_at": "",
"completed_at": "" "completed_at": ""
}, },
{ {
"id": "phase-2", "id": "phase-2",
"name": "Break-Glass Authentication", "name": "Admin CRUD",
"status": "pending", "status": "pending",
"branch": "break-glass-authentication", "branch": "admin-crud",
"issue_ref": "", "issue_ref": "",
"started_at": "", "started_at": "",
"completed_at": "" "completed_at": ""
}, },
{ {
"id": "phase-3", "id": "phase-3",
"name": "Data Migration", "name": "User CRUD",
"status": "pending", "status": "pending",
"branch": "data-migration", "branch": "user-crud",
"issue_ref": "", "issue_ref": "",
"started_at": "", "started_at": "",
"completed_at": "" "completed_at": ""
}, },
{ {
"id": "phase-4", "id": "phase-4",
"name": "Admin UI", "name": "Agent Routing",
"status": "pending", "status": "pending",
"branch": "admin-ui", "branch": "agent-routing",
"issue_ref": "", "issue_ref": "",
"started_at": "", "started_at": "",
"completed_at": "" "completed_at": ""
}, },
{ {
"id": "phase-5", "id": "phase-5",
"name": "RBAC UI Enforcement", "name": "Discord+UI",
"status": "pending", "status": "pending",
"branch": "rbac-ui-enforcement", "branch": "discord-ui",
"issue_ref": "", "issue_ref": "",
"started_at": "", "started_at": "",
"completed_at": "" "completed_at": ""
@@ -65,26 +65,5 @@
"completed_at": "" "completed_at": ""
} }
], ],
"sessions": [ "sessions": []
{
"session_id": "sess-001",
"runtime": "unknown",
"started_at": "2026-02-28T17:48:51Z",
"ended_at": "",
"ended_reason": "",
"milestone_at_end": "",
"tasks_completed": [],
"last_task_id": ""
},
{
"session_id": "sess-002",
"runtime": "unknown",
"started_at": "2026-02-28T20:30:13Z",
"ended_at": "",
"ended_reason": "",
"milestone_at_end": "",
"tasks_completed": [],
"last_task_id": ""
}
]
} }

View File

@@ -1,8 +0,0 @@
{
"session_id": "sess-002",
"runtime": "unknown",
"pid": 3178395,
"started_at": "2026-02-28T20:30:13Z",
"project_path": "/tmp/ms21-ui-001",
"milestone_id": ""
}

2
.npmrc
View File

@@ -1 +1,3 @@
@mosaicstack:registry=https://git.mosaicstack.dev/api/packages/mosaic/npm/ @mosaicstack:registry=https://git.mosaicstack.dev/api/packages/mosaic/npm/
supportedArchitectures[libc][]=glibc
supportedArchitectures[cpu][]=x64

View File

@@ -0,0 +1,27 @@
when:
- event: manual
- event: cron
cron: weekly-base-image
variables:
- &kaniko_setup |
mkdir -p /kaniko/.docker
echo "{\"auths\":{\"git.mosaicstack.dev\":{\"username\":\"$GITEA_USER\",\"password\":\"$GITEA_TOKEN\"}}}" > /kaniko/.docker/config.json
steps:
build-base:
image: gcr.io/kaniko-project/executor:debug
environment:
GITEA_USER:
from_secret: gitea_username
GITEA_TOKEN:
from_secret: gitea_token
commands:
- *kaniko_setup
- /kaniko/executor
--context .
--dockerfile docker/base.Dockerfile
--destination git.mosaicstack.dev/mosaic/node-base:24-slim
--destination git.mosaicstack.dev/mosaic/node-base:latest
--cache=true
--cache-repo git.mosaicstack.dev/mosaic/node-base/cache

View File

@@ -29,9 +29,11 @@ when:
- ".trivyignore" - ".trivyignore"
variables: variables:
- &node_image "node:24-alpine" - &node_image "node:24-slim"
- &install_deps | - &install_deps |
corepack enable corepack enable
apt-get update && apt-get install -y --no-install-recommends python3 make g++
pnpm config set store-dir /root/.local/share/pnpm/store
pnpm install --frozen-lockfile pnpm install --frozen-lockfile
- &use_deps | - &use_deps |
corepack enable corepack enable
@@ -168,7 +170,7 @@ steps:
elif [ "$CI_COMMIT_BRANCH" = "main" ]; then elif [ "$CI_COMMIT_BRANCH" = "main" ]; then
DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-api:latest" DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-api:latest"
fi fi
/kaniko/executor --context . --dockerfile apps/api/Dockerfile --snapshot-mode=redo $DESTINATIONS /kaniko/executor --context . --dockerfile apps/api/Dockerfile --snapshot-mode=redo --cache=true --cache-repo git.mosaicstack.dev/mosaic/stack-api/cache $DESTINATIONS
when: when:
- branch: [main] - branch: [main]
event: [push, manual, tag] event: [push, manual, tag]
@@ -193,7 +195,7 @@ steps:
elif [ "$CI_COMMIT_BRANCH" = "main" ]; then elif [ "$CI_COMMIT_BRANCH" = "main" ]; then
DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-orchestrator:latest" DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-orchestrator:latest"
fi fi
/kaniko/executor --context . --dockerfile apps/orchestrator/Dockerfile --snapshot-mode=redo $DESTINATIONS /kaniko/executor --context . --dockerfile apps/orchestrator/Dockerfile --snapshot-mode=redo --cache=true --cache-repo git.mosaicstack.dev/mosaic/stack-orchestrator/cache $DESTINATIONS
when: when:
- branch: [main] - branch: [main]
event: [push, manual, tag] event: [push, manual, tag]
@@ -218,7 +220,7 @@ steps:
elif [ "$CI_COMMIT_BRANCH" = "main" ]; then elif [ "$CI_COMMIT_BRANCH" = "main" ]; then
DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-web:latest" DESTINATIONS="--destination git.mosaicstack.dev/mosaic/stack-web:latest"
fi fi
/kaniko/executor --context . --dockerfile apps/web/Dockerfile --snapshot-mode=redo --build-arg NEXT_PUBLIC_API_URL=https://api.mosaicstack.dev $DESTINATIONS /kaniko/executor --context . --dockerfile apps/web/Dockerfile --snapshot-mode=redo --cache=true --cache-repo git.mosaicstack.dev/mosaic/stack-web/cache --build-arg NEXT_PUBLIC_API_URL=https://api.mosaicstack.dev $DESTINATIONS
when: when:
- branch: [main] - branch: [main]
event: [push, manual, tag] event: [push, manual, tag]
@@ -335,3 +337,47 @@ steps:
- security-trivy-api - security-trivy-api
- security-trivy-orchestrator - security-trivy-orchestrator
- security-trivy-web - security-trivy-web
# ─── Deploy to Docker Swarm via Portainer API (main only) ─────────────────────
deploy-swarm:
image: alpine:3
failure: ignore
environment:
PORTAINER_URL:
from_secret: portainer_url
PORTAINER_API_KEY:
from_secret: portainer_api_key
PORTAINER_STACK_ID: "121"
commands:
- apk add --no-cache curl
- |
set -e
echo "🚀 Deploying to Docker Swarm via Portainer API..."
# Use Portainer API to update the stack (forces pull of new images)
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "X-API-Key: $PORTAINER_API_KEY" \
-H "Content-Type: application/json" \
"$PORTAINER_URL/api/stacks/$PORTAINER_STACK_ID/git/redeploy")
HTTP_CODE=$(echo "$RESPONSE" | tail -1)
BODY=$(echo "$RESPONSE" | head -n -1)
if [ "$HTTP_CODE" = "200" ] || [ "$HTTP_CODE" = "202" ]; then
echo "✅ Stack update triggered successfully"
else
echo "❌ Stack update failed (HTTP $HTTP_CODE)"
echo "$BODY"
exit 1
fi
# Wait for services to converge
echo "⏳ Waiting for services to converge..."
sleep 30
echo "✅ Deploy complete"
when:
- branch: [main]
event: [push, manual, tag]
depends_on:
- link-packages

View File

@@ -1,7 +1,7 @@
# Base image for all stages # Base image for all stages
# Uses Debian slim (glibc) instead of Alpine (musl) because native Node.js addons # Uses Debian slim (glibc) instead of Alpine (musl) because native Node.js addons
# (matrix-sdk-crypto-nodejs, Prisma engines) require glibc-compatible binaries. # (matrix-sdk-crypto-nodejs, Prisma engines) require glibc-compatible binaries.
FROM node:24-slim AS base FROM git.mosaicstack.dev/mosaic/node-base:24-slim AS base
# Install pnpm globally # Install pnpm globally
RUN corepack enable && corepack prepare pnpm@10.27.0 --activate RUN corepack enable && corepack prepare pnpm@10.27.0 --activate
@@ -19,9 +19,9 @@ COPY turbo.json ./
FROM base AS deps FROM base AS deps
# Install build tools for native addons (node-pty requires node-gyp compilation) # Install build tools for native addons (node-pty requires node-gyp compilation)
# and OpenSSL for Prisma engine detection # Note: openssl and ca-certificates pre-installed in base image
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
python3 make g++ openssl \ python3 make g++ \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Copy all package.json files for workspace resolution # Copy all package.json files for workspace resolution
@@ -30,6 +30,9 @@ COPY packages/ui/package.json ./packages/ui/
COPY packages/config/package.json ./packages/config/ COPY packages/config/package.json ./packages/config/
COPY apps/api/package.json ./apps/api/ COPY apps/api/package.json ./apps/api/
# Copy npm configuration for native binary architecture hints
COPY .npmrc ./
# Install dependencies (no cache mount — Kaniko builds are ephemeral in CI) # Install dependencies (no cache mount — Kaniko builds are ephemeral in CI)
# Then explicitly rebuild node-pty from source since pnpm may skip postinstall # Then explicitly rebuild node-pty from source since pnpm may skip postinstall
# scripts or fail to find prebuilt binaries for this Node.js version # scripts or fail to find prebuilt binaries for this Node.js version
@@ -61,19 +64,14 @@ RUN pnpm turbo build --filter=@mosaic/api --force
# ====================== # ======================
# Production stage # Production stage
# ====================== # ======================
FROM node:24-slim AS production FROM git.mosaicstack.dev/mosaic/node-base:24-slim AS production
# Install dumb-init for proper signal handling (static binary from GitHub, # dumb-init, openssl, ca-certificates pre-installed in base image
# avoids apt-get which fails under Kaniko with bookworm GPG signature errors)
ADD https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 /usr/local/bin/dumb-init
# Single RUN to minimize Kaniko filesystem snapshots (each RUN = full snapshot) # Single RUN to minimize Kaniko filesystem snapshots (each RUN = full snapshot)
# - openssl: Prisma engine detection requires libssl # - Remove npm/npx to reduce image size (not used in production)
# - No build tools needed here — native addons are compiled in the deps stage # - Create non-root user
RUN apt-get update && apt-get install -y --no-install-recommends openssl \ RUN rm -rf /usr/local/lib/node_modules/npm /usr/local/bin/npm /usr/local/bin/npx \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /usr/local/lib/node_modules/npm /usr/local/bin/npm /usr/local/bin/npx \
&& chmod 755 /usr/local/bin/dumb-init \
&& groupadd -g 1001 nodejs && useradd -m -u 1001 -g nodejs nestjs && groupadd -g 1001 nodejs && useradd -m -u 1001 -g nodejs nestjs
WORKDIR /app WORKDIR /app

View File

@@ -62,6 +62,7 @@
"discord.js": "^14.25.1", "discord.js": "^14.25.1",
"dockerode": "^4.0.9", "dockerode": "^4.0.9",
"gray-matter": "^4.0.3", "gray-matter": "^4.0.3",
"helmet": "^8.1.0",
"highlight.js": "^11.11.1", "highlight.js": "^11.11.1",
"ioredis": "^5.9.2", "ioredis": "^5.9.2",
"jose": "^6.1.3", "jose": "^6.1.3",

View File

@@ -0,0 +1,13 @@
-- MS21: Add admin, local auth, and invitation fields to users table
-- These columns were added to schema.prisma but never captured in a migration.
ALTER TABLE "users"
ADD COLUMN IF NOT EXISTS "deactivated_at" TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS "is_local_auth" BOOLEAN NOT NULL DEFAULT false,
ADD COLUMN IF NOT EXISTS "password_hash" TEXT,
ADD COLUMN IF NOT EXISTS "invited_by" UUID,
ADD COLUMN IF NOT EXISTS "invitation_token" TEXT,
ADD COLUMN IF NOT EXISTS "invited_at" TIMESTAMPTZ;
-- CreateIndex
CREATE UNIQUE INDEX IF NOT EXISTS "users_invitation_token_key" ON "users"("invitation_token");

View File

@@ -0,0 +1,83 @@
-- CreateTable
CREATE TABLE "AgentConversationMessage" (
"id" TEXT NOT NULL,
"sessionId" TEXT NOT NULL,
"provider" TEXT NOT NULL DEFAULT 'internal',
"role" TEXT NOT NULL,
"content" TEXT NOT NULL,
"timestamp" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "AgentConversationMessage_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "AgentSessionTree" (
"id" TEXT NOT NULL,
"sessionId" TEXT NOT NULL,
"parentSessionId" TEXT,
"provider" TEXT NOT NULL DEFAULT 'internal',
"missionId" TEXT,
"taskId" TEXT,
"taskSource" TEXT DEFAULT 'internal',
"agentType" TEXT,
"status" TEXT NOT NULL DEFAULT 'spawning',
"spawnedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"completedAt" TIMESTAMP(3),
"metadata" JSONB NOT NULL DEFAULT '{}',
CONSTRAINT "AgentSessionTree_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "AgentProviderConfig" (
"id" TEXT NOT NULL,
"workspaceId" TEXT NOT NULL,
"name" TEXT NOT NULL,
"provider" TEXT NOT NULL,
"gatewayUrl" TEXT NOT NULL,
"credentials" JSONB NOT NULL DEFAULT '{}',
"isActive" BOOLEAN NOT NULL DEFAULT true,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "AgentProviderConfig_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "OperatorAuditLog" (
"id" TEXT NOT NULL,
"userId" TEXT NOT NULL,
"sessionId" TEXT NOT NULL,
"provider" TEXT NOT NULL,
"action" TEXT NOT NULL,
"content" TEXT,
"metadata" JSONB NOT NULL DEFAULT '{}',
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "OperatorAuditLog_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "AgentConversationMessage_sessionId_timestamp_idx" ON "AgentConversationMessage"("sessionId", "timestamp");
-- CreateIndex
CREATE UNIQUE INDEX "AgentSessionTree_sessionId_key" ON "AgentSessionTree"("sessionId");
-- CreateIndex
CREATE INDEX "AgentSessionTree_parentSessionId_idx" ON "AgentSessionTree"("parentSessionId");
-- CreateIndex
CREATE INDEX "AgentSessionTree_missionId_idx" ON "AgentSessionTree"("missionId");
-- CreateIndex
CREATE UNIQUE INDEX "AgentProviderConfig_workspaceId_name_key" ON "AgentProviderConfig"("workspaceId", "name");
-- CreateIndex
CREATE INDEX "OperatorAuditLog_sessionId_idx" ON "OperatorAuditLog"("sessionId");
-- CreateIndex
CREATE INDEX "OperatorAuditLog_userId_idx" ON "OperatorAuditLog"("userId");
-- CreateIndex
CREATE INDEX "OperatorAuditLog_createdAt_idx" ON "OperatorAuditLog"("createdAt");

View File

@@ -1703,3 +1703,102 @@ model UserAgentConfig {
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
} }
model AgentTemplate {
id String @id @default(cuid())
name String @unique // "jarvis", "builder", "medic"
displayName String // "Jarvis", "Builder", "Medic"
role String // "orchestrator" | "coding" | "monitoring"
personality String // SOUL.md content (markdown)
primaryModel String // "opus", "codex", "haiku"
fallbackModels Json @default("[]") // ["sonnet", "haiku"]
toolPermissions Json @default("[]") // ["exec", "read", "write", ...]
discordChannel String? // "jarvis", "builder", "medic-alerts"
isActive Boolean @default(true)
isDefault Boolean @default(false) // Include in new user provisioning
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
model UserAgent {
id String @id @default(cuid())
userId String
templateId String? // null = custom agent
name String // "jarvis", "builder", "medic" or custom
displayName String
role String
personality String // User can customize
primaryModel String?
fallbackModels Json @default("[]")
toolPermissions Json @default("[]")
discordChannel String?
isActive Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, name])
@@index([userId])
}
// MS23: Agent conversation messages for Mission Control streaming
model AgentConversationMessage {
id String @id @default(cuid())
sessionId String
provider String @default("internal")
role String
content String
timestamp DateTime @default(now())
metadata Json @default("{}")
@@index([sessionId, timestamp])
}
// MS23: Agent session tree for parent/child relationships
model AgentSessionTree {
id String @id @default(cuid())
sessionId String @unique
parentSessionId String?
provider String @default("internal")
missionId String?
taskId String?
taskSource String? @default("internal")
agentType String?
status String @default("spawning")
spawnedAt DateTime @default(now())
completedAt DateTime?
metadata Json @default("{}")
@@index([parentSessionId])
@@index([missionId])
}
// MS23: External agent provider configuration per workspace
model AgentProviderConfig {
id String @id @default(cuid())
workspaceId String
name String
provider String
gatewayUrl String
credentials Json @default("{}")
isActive Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([workspaceId, name])
}
// MS23: Audit log for operator interventions
model OperatorAuditLog {
id String @id @default(cuid())
userId String
sessionId String
provider String
action String
content String?
metadata Json @default("{}")
createdAt DateTime @default(now())
@@index([sessionId])
@@index([userId])
@@index([createdAt])
}

View File

@@ -7,6 +7,7 @@ import {
EntryStatus, EntryStatus,
Visibility, Visibility,
} from "@prisma/client"; } from "@prisma/client";
import { seedAgentTemplates } from "../src/seed/agent-templates.seed";
const prisma = new PrismaClient(); const prisma = new PrismaClient();
@@ -586,6 +587,9 @@ This is a draft document. See [[architecture-overview]] for current state.`,
console.log(`Created ${links.length} knowledge links`); console.log(`Created ${links.length} knowledge links`);
}); });
// Seed default agent templates (idempotent)
await seedAgentTemplates(prisma);
console.log("Seeding completed successfully!"); console.log("Seeding completed successfully!");
} }

View File

@@ -117,12 +117,13 @@ export class ActivityService {
/** /**
* Get a single activity log by ID * Get a single activity log by ID
*/ */
async findOne(id: string, workspaceId: string): Promise<ActivityLogResult | null> { async findOne(id: string, workspaceId?: string): Promise<ActivityLogResult | null> {
const where: Prisma.ActivityLogWhereUniqueInput = { id };
if (workspaceId) {
where.workspaceId = workspaceId;
}
return await this.prisma.activityLog.findUnique({ return await this.prisma.activityLog.findUnique({
where: { where,
id,
workspaceId,
},
include: { include: {
user: { user: {
select: { select: {

View File

@@ -384,10 +384,18 @@ describe("ActivityLoggingInterceptor", () => {
const context = createMockExecutionContext("POST", {}, body, user); const context = createMockExecutionContext("POST", {}, body, user);
const next = createMockCallHandler(result); const next = createMockCallHandler(result);
mockActivityService.logActivity.mockResolvedValue({
id: "activity-123",
});
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
interceptor.intercept(context, next).subscribe(() => { interceptor.intercept(context, next).subscribe(() => {
// Should not call logActivity when workspaceId is missing // workspaceId is now optional, so logActivity should be called without it
expect(mockActivityService.logActivity).not.toHaveBeenCalled(); expect(mockActivityService.logActivity).toHaveBeenCalled();
const callArgs = mockActivityService.logActivity.mock.calls[0][0];
expect(callArgs.userId).toBe("user-123");
expect(callArgs.entityId).toBe("task-123");
expect(callArgs.workspaceId).toBeUndefined();
resolve(); resolve();
}); });
}); });
@@ -412,10 +420,18 @@ describe("ActivityLoggingInterceptor", () => {
const context = createMockExecutionContext("POST", {}, body, user); const context = createMockExecutionContext("POST", {}, body, user);
const next = createMockCallHandler(result); const next = createMockCallHandler(result);
mockActivityService.logActivity.mockResolvedValue({
id: "activity-123",
});
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
interceptor.intercept(context, next).subscribe(() => { interceptor.intercept(context, next).subscribe(() => {
// Should not call logActivity when workspaceId is missing // workspaceId is now optional, so logActivity should be called without it
expect(mockActivityService.logActivity).not.toHaveBeenCalled(); expect(mockActivityService.logActivity).toHaveBeenCalled();
const callArgs = mockActivityService.logActivity.mock.calls[0][0];
expect(callArgs.userId).toBe("user-123");
expect(callArgs.entityId).toBe("task-123");
expect(callArgs.workspaceId).toBeUndefined();
resolve(); resolve();
}); });
}); });

View File

@@ -4,6 +4,7 @@ import { tap } from "rxjs/operators";
import { ActivityService } from "../activity.service"; import { ActivityService } from "../activity.service";
import { ActivityAction, EntityType } from "@prisma/client"; import { ActivityAction, EntityType } from "@prisma/client";
import type { Prisma } from "@prisma/client"; import type { Prisma } from "@prisma/client";
import type { CreateActivityLogInput } from "../interfaces/activity.interface";
import type { AuthenticatedRequest } from "../../common/types/user.types"; import type { AuthenticatedRequest } from "../../common/types/user.types";
/** /**
@@ -61,10 +62,13 @@ export class ActivityLoggingInterceptor implements NestInterceptor {
// Extract entity information // Extract entity information
const resultObj = result as Record<string, unknown> | undefined; const resultObj = result as Record<string, unknown> | undefined;
const entityId = params.id ?? (resultObj?.id as string | undefined); const entityId = params.id ?? (resultObj?.id as string | undefined);
// workspaceId is now optional - log events even when missing
const workspaceId = user.workspaceId ?? (body.workspaceId as string | undefined); const workspaceId = user.workspaceId ?? (body.workspaceId as string | undefined);
if (!entityId || !workspaceId) { // Log with warning if entityId is missing, but still proceed with logging if workspaceId exists
this.logger.warn("Cannot log activity: missing entityId or workspaceId"); if (!entityId) {
this.logger.warn("Cannot log activity: missing entityId");
return; return;
} }
@@ -92,9 +96,8 @@ export class ActivityLoggingInterceptor implements NestInterceptor {
const userAgent = const userAgent =
typeof userAgentHeader === "string" ? userAgentHeader : userAgentHeader?.[0]; typeof userAgentHeader === "string" ? userAgentHeader : userAgentHeader?.[0];
// Log the activity // Log the activity — workspaceId is optional
await this.activityService.logActivity({ const activityInput: CreateActivityLogInput = {
workspaceId,
userId: user.id, userId: user.id,
action, action,
entityType, entityType,
@@ -102,7 +105,11 @@ export class ActivityLoggingInterceptor implements NestInterceptor {
details, details,
ipAddress: ip ?? undefined, ipAddress: ip ?? undefined,
userAgent: userAgent ?? undefined, userAgent: userAgent ?? undefined,
}); };
if (workspaceId) {
activityInput.workspaceId = workspaceId;
}
await this.activityService.logActivity(activityInput);
} catch (error) { } catch (error) {
// Don't fail the request if activity logging fails // Don't fail the request if activity logging fails
this.logger.error( this.logger.error(

View File

@@ -2,9 +2,10 @@ import type { ActivityAction, EntityType, Prisma } from "@prisma/client";
/** /**
* Interface for creating a new activity log entry * Interface for creating a new activity log entry
* workspaceId is optional - allows logging events without workspace context
*/ */
export interface CreateActivityLogInput { export interface CreateActivityLogInput {
workspaceId: string; workspaceId?: string | null;
userId: string; userId: string;
action: ActivityAction; action: ActivityAction;
entityType: EntityType; entityType: EntityType;

View File

@@ -0,0 +1,47 @@
import {
Controller,
Get,
Post,
Patch,
Delete,
Body,
Param,
UseGuards,
ParseUUIDPipe,
} from "@nestjs/common";
import { AgentTemplateService } from "./agent-template.service";
import { CreateAgentTemplateDto } from "./dto/create-agent-template.dto";
import { UpdateAgentTemplateDto } from "./dto/update-agent-template.dto";
import { AuthGuard } from "../auth/guards/auth.guard";
import { AdminGuard } from "../auth/guards/admin.guard";
@Controller("admin/agent-templates")
@UseGuards(AuthGuard, AdminGuard)
export class AgentTemplateController {
constructor(private readonly agentTemplateService: AgentTemplateService) {}
@Get()
findAll() {
return this.agentTemplateService.findAll();
}
@Get(":id")
findOne(@Param("id", ParseUUIDPipe) id: string) {
return this.agentTemplateService.findOne(id);
}
@Post()
create(@Body() dto: CreateAgentTemplateDto) {
return this.agentTemplateService.create(dto);
}
@Patch(":id")
update(@Param("id", ParseUUIDPipe) id: string, @Body() dto: UpdateAgentTemplateDto) {
return this.agentTemplateService.update(id, dto);
}
@Delete(":id")
remove(@Param("id", ParseUUIDPipe) id: string) {
return this.agentTemplateService.remove(id);
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { AgentTemplateService } from "./agent-template.service";
import { AgentTemplateController } from "./agent-template.controller";
import { PrismaModule } from "../prisma/prisma.module";
import { AuthModule } from "../auth/auth.module";
@Module({
imports: [PrismaModule, AuthModule],
controllers: [AgentTemplateController],
providers: [AgentTemplateService],
exports: [AgentTemplateService],
})
export class AgentTemplateModule {}

View File

@@ -0,0 +1,57 @@
import { Injectable, NotFoundException, ConflictException } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { CreateAgentTemplateDto } from "./dto/create-agent-template.dto";
import { UpdateAgentTemplateDto } from "./dto/update-agent-template.dto";
@Injectable()
export class AgentTemplateService {
constructor(private readonly prisma: PrismaService) {}
async findAll() {
return this.prisma.agentTemplate.findMany({
orderBy: { createdAt: "asc" },
});
}
async findOne(id: string) {
const template = await this.prisma.agentTemplate.findUnique({ where: { id } });
if (!template) throw new NotFoundException(`AgentTemplate ${id} not found`);
return template;
}
async findByName(name: string) {
const template = await this.prisma.agentTemplate.findUnique({ where: { name } });
if (!template) throw new NotFoundException(`AgentTemplate "${name}" not found`);
return template;
}
async create(dto: CreateAgentTemplateDto) {
const existing = await this.prisma.agentTemplate.findUnique({ where: { name: dto.name } });
if (existing) throw new ConflictException(`AgentTemplate "${dto.name}" already exists`);
return this.prisma.agentTemplate.create({
data: {
name: dto.name,
displayName: dto.displayName,
role: dto.role,
personality: dto.personality,
primaryModel: dto.primaryModel,
fallbackModels: dto.fallbackModels ?? ([] as string[]),
toolPermissions: dto.toolPermissions ?? ([] as string[]),
...(dto.discordChannel !== undefined && { discordChannel: dto.discordChannel }),
isActive: dto.isActive ?? true,
isDefault: dto.isDefault ?? false,
},
});
}
async update(id: string, dto: UpdateAgentTemplateDto) {
await this.findOne(id);
return this.prisma.agentTemplate.update({ where: { id }, data: dto });
}
async remove(id: string) {
await this.findOne(id);
return this.prisma.agentTemplate.delete({ where: { id } });
}
}

View File

@@ -0,0 +1,43 @@
import { IsString, IsBoolean, IsOptional, IsArray, MinLength } from "class-validator";
export class CreateAgentTemplateDto {
@IsString()
@MinLength(1)
name!: string;
@IsString()
@MinLength(1)
displayName!: string;
@IsString()
@MinLength(1)
role!: string;
@IsString()
@MinLength(1)
personality!: string;
@IsString()
@MinLength(1)
primaryModel!: string;
@IsArray()
@IsOptional()
fallbackModels?: string[];
@IsArray()
@IsOptional()
toolPermissions?: string[];
@IsString()
@IsOptional()
discordChannel?: string;
@IsBoolean()
@IsOptional()
isActive?: boolean;
@IsBoolean()
@IsOptional()
isDefault?: boolean;
}

View File

@@ -0,0 +1,4 @@
import { PartialType } from "@nestjs/mapped-types";
import { CreateAgentTemplateDto } from "./create-agent-template.dto";
export class UpdateAgentTemplateDto extends PartialType(CreateAgentTemplateDto) {}

View File

@@ -48,6 +48,8 @@ import { TerminalModule } from "./terminal/terminal.module";
import { PersonalitiesModule } from "./personalities/personalities.module"; import { PersonalitiesModule } from "./personalities/personalities.module";
import { WorkspacesModule } from "./workspaces/workspaces.module"; import { WorkspacesModule } from "./workspaces/workspaces.module";
import { AdminModule } from "./admin/admin.module"; import { AdminModule } from "./admin/admin.module";
import { AgentTemplateModule } from "./agent-template/agent-template.module";
import { UserAgentModule } from "./user-agent/user-agent.module";
import { TeamsModule } from "./teams/teams.module"; import { TeamsModule } from "./teams/teams.module";
import { ImportModule } from "./import/import.module"; import { ImportModule } from "./import/import.module";
import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module"; import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module";
@@ -129,6 +131,8 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
PersonalitiesModule, PersonalitiesModule,
WorkspacesModule, WorkspacesModule,
AdminModule, AdminModule,
AgentTemplateModule,
UserAgentModule,
TeamsModule, TeamsModule,
ImportModule, ImportModule,
ConversationArchiveModule, ConversationArchiveModule,

View File

@@ -106,7 +106,7 @@ export class AuthController {
// @SkipCsrf avoids double-protection conflicts. // @SkipCsrf avoids double-protection conflicts.
// See: https://www.better-auth.com/docs/reference/security // See: https://www.better-auth.com/docs/reference/security
@SkipCsrf() @SkipCsrf()
@Throttle({ strict: { limit: 10, ttl: 60000 } }) @Throttle({ default: { ttl: 60_000, limit: 5 } })
async handleAuth(@Req() req: ExpressRequest, @Res() res: ExpressResponse): Promise<void> { async handleAuth(@Req() req: ExpressRequest, @Res() res: ExpressResponse): Promise<void> {
// Extract client IP for logging // Extract client IP for logging
const clientIp = this.getClientIp(req); const clientIp = this.getClientIp(req);

View File

@@ -5,6 +5,7 @@ import { MatrixService } from "./matrix/matrix.service";
import { StitcherService } from "../stitcher/stitcher.service"; import { StitcherService } from "../stitcher/stitcher.service";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service"; import { BullMqService } from "../bullmq/bullmq.service";
import { ChatProxyService } from "../chat-proxy/chat-proxy.service";
import { CHAT_PROVIDERS } from "./bridge.constants"; import { CHAT_PROVIDERS } from "./bridge.constants";
import type { IChatProvider } from "./interfaces"; import type { IChatProvider } from "./interfaces";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
@@ -89,6 +90,7 @@ interface SavedEnvVars {
MATRIX_CONTROL_ROOM_ID?: string; MATRIX_CONTROL_ROOM_ID?: string;
MATRIX_WORKSPACE_ID?: string; MATRIX_WORKSPACE_ID?: string;
ENCRYPTION_KEY?: string; ENCRYPTION_KEY?: string;
MOSAIC_SECRET_KEY?: string;
} }
describe("BridgeModule", () => { describe("BridgeModule", () => {
@@ -106,6 +108,7 @@ describe("BridgeModule", () => {
MATRIX_CONTROL_ROOM_ID: process.env.MATRIX_CONTROL_ROOM_ID, MATRIX_CONTROL_ROOM_ID: process.env.MATRIX_CONTROL_ROOM_ID,
MATRIX_WORKSPACE_ID: process.env.MATRIX_WORKSPACE_ID, MATRIX_WORKSPACE_ID: process.env.MATRIX_WORKSPACE_ID,
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY, ENCRYPTION_KEY: process.env.ENCRYPTION_KEY,
MOSAIC_SECRET_KEY: process.env.MOSAIC_SECRET_KEY,
}; };
// Clear all bridge env vars // Clear all bridge env vars
@@ -120,6 +123,8 @@ describe("BridgeModule", () => {
// Set encryption key (needed by StitcherService) // Set encryption key (needed by StitcherService)
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
// Set MOSAIC_SECRET_KEY (needed by CryptoService via ChatProxyModule)
process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long";
// Clear ready callbacks // Clear ready callbacks
mockReadyCallbacks.length = 0; mockReadyCallbacks.length = 0;
@@ -149,6 +154,10 @@ describe("BridgeModule", () => {
.useValue({}) .useValue({})
.overrideProvider(BullMqService) .overrideProvider(BullMqService)
.useValue({}) .useValue({})
.overrideProvider(ChatProxyService)
.useValue({
proxyChat: vi.fn().mockResolvedValue(new Response()),
})
.compile(); .compile();
} }

View File

@@ -5,6 +5,8 @@ import { MatrixRoomService } from "./matrix/matrix-room.service";
import { MatrixStreamingService } from "./matrix/matrix-streaming.service"; import { MatrixStreamingService } from "./matrix/matrix-streaming.service";
import { CommandParserService } from "./parser/command-parser.service"; import { CommandParserService } from "./parser/command-parser.service";
import { StitcherModule } from "../stitcher/stitcher.module"; import { StitcherModule } from "../stitcher/stitcher.module";
import { ChatProxyModule } from "../chat-proxy/chat-proxy.module";
import { PrismaModule } from "../prisma/prisma.module";
import { CHAT_PROVIDERS } from "./bridge.constants"; import { CHAT_PROVIDERS } from "./bridge.constants";
import type { IChatProvider } from "./interfaces"; import type { IChatProvider } from "./interfaces";
@@ -28,7 +30,7 @@ const logger = new Logger("BridgeModule");
* MatrixRoomService handles workspace-to-Matrix-room mapping. * MatrixRoomService handles workspace-to-Matrix-room mapping.
*/ */
@Module({ @Module({
imports: [StitcherModule], imports: [StitcherModule, ChatProxyModule, PrismaModule],
providers: [ providers: [
CommandParserService, CommandParserService,
MatrixRoomService, MatrixRoomService,

View File

@@ -1,6 +1,8 @@
import { Test, TestingModule } from "@nestjs/testing"; import { Test, TestingModule } from "@nestjs/testing";
import { DiscordService } from "./discord.service"; import { DiscordService } from "./discord.service";
import { StitcherService } from "../../stitcher/stitcher.service"; import { StitcherService } from "../../stitcher/stitcher.service";
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
import { PrismaService } from "../../prisma/prisma.service";
import { Client, Events, GatewayIntentBits, Message } from "discord.js"; import { Client, Events, GatewayIntentBits, Message } from "discord.js";
import { vi, describe, it, expect, beforeEach } from "vitest"; import { vi, describe, it, expect, beforeEach } from "vitest";
import type { ChatMessage, ChatCommand } from "../interfaces"; import type { ChatMessage, ChatCommand } from "../interfaces";
@@ -61,6 +63,8 @@ vi.mock("discord.js", () => {
describe("DiscordService", () => { describe("DiscordService", () => {
let service: DiscordService; let service: DiscordService;
let stitcherService: StitcherService; let stitcherService: StitcherService;
let chatProxyService: ChatProxyService;
let prismaService: PrismaService;
const mockStitcherService = { const mockStitcherService = {
dispatchJob: vi.fn().mockResolvedValue({ dispatchJob: vi.fn().mockResolvedValue({
@@ -71,12 +75,29 @@ describe("DiscordService", () => {
trackJobEvent: vi.fn().mockResolvedValue(undefined), trackJobEvent: vi.fn().mockResolvedValue(undefined),
}; };
const mockChatProxyService = {
proxyChat: vi.fn().mockResolvedValue(
new Response('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\ndata: [DONE]\n\n', {
headers: { "Content-Type": "text/event-stream" },
})
),
};
const mockPrismaService = {
workspace: {
findUnique: vi.fn().mockResolvedValue({
ownerId: "owner-user-id",
}),
},
};
beforeEach(async () => { beforeEach(async () => {
// Set environment variables for testing // Set environment variables for testing
process.env.DISCORD_BOT_TOKEN = "test-token"; process.env.DISCORD_BOT_TOKEN = "test-token";
process.env.DISCORD_GUILD_ID = "test-guild-id"; process.env.DISCORD_GUILD_ID = "test-guild-id";
process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id"; process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id";
process.env.DISCORD_WORKSPACE_ID = "test-workspace-id"; process.env.DISCORD_WORKSPACE_ID = "test-workspace-id";
process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder";
// Clear callbacks // Clear callbacks
mockReadyCallbacks.length = 0; mockReadyCallbacks.length = 0;
@@ -89,11 +110,21 @@ describe("DiscordService", () => {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: ChatProxyService,
useValue: mockChatProxyService,
},
{
provide: PrismaService,
useValue: mockPrismaService,
},
], ],
}).compile(); }).compile();
service = module.get<DiscordService>(DiscordService); service = module.get<DiscordService>(DiscordService);
stitcherService = module.get<StitcherService>(StitcherService); stitcherService = module.get<StitcherService>(StitcherService);
chatProxyService = module.get<ChatProxyService>(ChatProxyService);
prismaService = module.get<PrismaService>(PrismaService);
// Clear all mocks // Clear all mocks
vi.clearAllMocks(); vi.clearAllMocks();
@@ -449,6 +480,14 @@ describe("DiscordService", () => {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: ChatProxyService,
useValue: mockChatProxyService,
},
{
provide: PrismaService,
useValue: mockPrismaService,
},
], ],
}).compile(); }).compile();
@@ -470,6 +509,14 @@ describe("DiscordService", () => {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: ChatProxyService,
useValue: mockChatProxyService,
},
{
provide: PrismaService,
useValue: mockPrismaService,
},
], ],
}).compile(); }).compile();
@@ -492,6 +539,14 @@ describe("DiscordService", () => {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: ChatProxyService,
useValue: mockChatProxyService,
},
{
provide: PrismaService,
useValue: mockPrismaService,
},
], ],
}).compile(); }).compile();
@@ -654,4 +709,150 @@ describe("DiscordService", () => {
expect(loggedError.statusCode).toBe(408); expect(loggedError.statusCode).toBe(408);
}); });
}); });
describe("Agent Channel Routing", () => {
it("should load agent channel mappings from environment", () => {
// The service should have loaded the agent channels from DISCORD_AGENT_CHANNELS
expect((service as any).agentChannels.size).toBe(2);
expect((service as any).agentChannels.get("jarvis-channel")).toBe("jarvis");
expect((service as any).agentChannels.get("builder-channel")).toBe("builder");
});
it("should handle empty agent channels config", async () => {
delete process.env.DISCORD_AGENT_CHANNELS;
const module: TestingModule = await Test.createTestingModule({
providers: [
DiscordService,
{
provide: StitcherService,
useValue: mockStitcherService,
},
{
provide: ChatProxyService,
useValue: mockChatProxyService,
},
{
provide: PrismaService,
useValue: mockPrismaService,
},
],
}).compile();
const newService = module.get<DiscordService>(DiscordService);
expect((newService as any).agentChannels.size).toBe(0);
// Restore for other tests
process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder";
});
it("should route messages in agent channels to ChatProxyService", async () => {
const mockChannel = {
send: vi.fn().mockResolvedValue({}),
isTextBased: () => true,
sendTyping: vi.fn(),
};
(mockClient.channels.fetch as any).mockResolvedValue(mockChannel);
// Create a mock streaming response
const mockStreamResponse = new Response(
'data: {"choices":[{"delta":{"content":"Test response"}}]}\n\ndata: [DONE]\n\n',
{ headers: { "Content-Type": "text/event-stream" } }
);
mockChatProxyService.proxyChat.mockResolvedValue(mockStreamResponse);
await service.connect();
// Simulate a message in the jarvis channel
const message: ChatMessage = {
id: "msg-agent-1",
channelId: "jarvis-channel",
authorId: "user-1",
authorName: "TestUser",
content: "Hello Jarvis!",
timestamp: new Date(),
};
// Call handleAgentChat directly
await (service as any).handleAgentChat(message, "jarvis");
// Verify ChatProxyService was called with workspace owner's ID and agent name
expect(mockChatProxyService.proxyChat).toHaveBeenCalledWith(
"owner-user-id",
[{ role: "user", content: "Hello Jarvis!" }],
undefined,
"jarvis"
);
// Verify response was sent to channel
expect(mockChannel.send).toHaveBeenCalled();
});
it("should not route empty messages", async () => {
const message: ChatMessage = {
id: "msg-empty",
channelId: "jarvis-channel",
authorId: "user-1",
authorName: "TestUser",
content: " ",
timestamp: new Date(),
};
await (service as any).handleAgentChat(message, "jarvis");
expect(mockChatProxyService.proxyChat).not.toHaveBeenCalled();
});
it("should handle ChatProxyService errors gracefully", async () => {
const mockChannel = {
send: vi.fn().mockResolvedValue({}),
isTextBased: () => true,
sendTyping: vi.fn(),
};
(mockClient.channels.fetch as any).mockResolvedValue(mockChannel);
mockChatProxyService.proxyChat.mockRejectedValue(new Error("Agent not found"));
await service.connect();
const message: ChatMessage = {
id: "msg-error",
channelId: "jarvis-channel",
authorId: "user-1",
authorName: "TestUser",
content: "Hello",
timestamp: new Date(),
};
await (service as any).handleAgentChat(message, "jarvis");
// Should send error message to channel
expect(mockChannel.send).toHaveBeenCalledWith(
expect.stringContaining("Failed to get response from jarvis")
);
});
it("should split long messages for Discord", () => {
const longContent = "A".repeat(5000);
const chunks = (service as any).splitMessageForDiscord(longContent);
// Should split into chunks of 2000 or less
expect(chunks.length).toBeGreaterThan(1);
for (const chunk of chunks) {
expect(chunk.length).toBeLessThanOrEqual(2000);
}
// Reassembled content should match original
expect(chunks.join("")).toBe(longContent.trim());
});
it("should prefer paragraph breaks when splitting messages", () => {
const content = "A".repeat(1500) + "\n\n" + "B".repeat(1500);
const chunks = (service as any).splitMessageForDiscord(content);
expect(chunks.length).toBe(2);
expect(chunks[0]).toContain("A");
expect(chunks[1]).toContain("B");
});
});
}); });

View File

@@ -1,6 +1,8 @@
import { Injectable, Logger } from "@nestjs/common"; import { Injectable, Logger } from "@nestjs/common";
import { Client, Events, GatewayIntentBits, TextChannel, ThreadChannel } from "discord.js"; import { Client, Events, GatewayIntentBits, TextChannel, ThreadChannel } from "discord.js";
import { StitcherService } from "../../stitcher/stitcher.service"; import { StitcherService } from "../../stitcher/stitcher.service";
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
import { PrismaService } from "../../prisma/prisma.service";
import { sanitizeForLogging } from "../../common/utils"; import { sanitizeForLogging } from "../../common/utils";
import type { import type {
IChatProvider, IChatProvider,
@@ -17,6 +19,7 @@ import type {
* - Connect to Discord via bot token * - Connect to Discord via bot token
* - Listen for commands in designated channels * - Listen for commands in designated channels
* - Forward commands to stitcher * - Forward commands to stitcher
* - Route messages in agent channels to specific agents via ChatProxyService
* - Receive status updates from herald * - Receive status updates from herald
* - Post updates to threads * - Post updates to threads
*/ */
@@ -28,12 +31,21 @@ export class DiscordService implements IChatProvider {
private readonly botToken: string; private readonly botToken: string;
private readonly controlChannelId: string; private readonly controlChannelId: string;
private readonly workspaceId: string; private readonly workspaceId: string;
private readonly agentChannels = new Map<string, string>();
private workspaceOwnerId: string | null = null;
constructor(private readonly stitcherService: StitcherService) { constructor(
private readonly stitcherService: StitcherService,
private readonly chatProxyService: ChatProxyService,
private readonly prisma: PrismaService
) {
this.botToken = process.env.DISCORD_BOT_TOKEN ?? ""; this.botToken = process.env.DISCORD_BOT_TOKEN ?? "";
this.controlChannelId = process.env.DISCORD_CONTROL_CHANNEL_ID ?? ""; this.controlChannelId = process.env.DISCORD_CONTROL_CHANNEL_ID ?? "";
this.workspaceId = process.env.DISCORD_WORKSPACE_ID ?? ""; this.workspaceId = process.env.DISCORD_WORKSPACE_ID ?? "";
// Load agent channel mappings from environment
this.loadAgentChannels();
// Initialize Discord client with required intents // Initialize Discord client with required intents
this.client = new Client({ this.client = new Client({
intents: [ intents: [
@@ -46,6 +58,51 @@ export class DiscordService implements IChatProvider {
this.setupEventHandlers(); this.setupEventHandlers();
} }
/**
* Load agent channel mappings from environment variables.
* Format: DISCORD_AGENT_CHANNELS=<channelId>:<agentName>,<channelId>:<agentName>
* Example: DISCORD_AGENT_CHANNELS=123456:jarvis,789012:builder
*/
private loadAgentChannels(): void {
const channelsConfig = process.env.DISCORD_AGENT_CHANNELS ?? "";
if (!channelsConfig) {
this.logger.debug("No agent channels configured (DISCORD_AGENT_CHANNELS not set)");
return;
}
const channels = channelsConfig.split(",").map((pair) => pair.trim());
for (const channel of channels) {
const [channelId, agentName] = channel.split(":");
if (channelId && agentName) {
this.agentChannels.set(channelId.trim(), agentName.trim());
this.logger.log(`Agent channel mapped: ${channelId.trim()}${agentName.trim()}`);
}
}
}
/**
* Get the workspace owner's user ID for chat proxy routing.
* Caches the result after first lookup.
*/
private async getWorkspaceOwnerId(): Promise<string> {
if (this.workspaceOwnerId) {
return this.workspaceOwnerId;
}
const workspace = await this.prisma.workspace.findUnique({
where: { id: this.workspaceId },
select: { ownerId: true },
});
if (!workspace) {
throw new Error(`Workspace not found: ${this.workspaceId}`);
}
this.workspaceOwnerId = workspace.ownerId;
this.logger.debug(`Workspace owner resolved: ${workspace.ownerId}`);
return workspace.ownerId;
}
/** /**
* Setup event handlers for Discord client * Setup event handlers for Discord client
*/ */
@@ -60,9 +117,6 @@ export class DiscordService implements IChatProvider {
// Ignore bot messages // Ignore bot messages
if (message.author.bot) return; if (message.author.bot) return;
// Check if message is in control channel
if (message.channelId !== this.controlChannelId) return;
// Parse message into ChatMessage format // Parse message into ChatMessage format
const chatMessage: ChatMessage = { const chatMessage: ChatMessage = {
id: message.id, id: message.id,
@@ -74,6 +128,16 @@ export class DiscordService implements IChatProvider {
...(message.channel.isThread() && { threadId: message.channelId }), ...(message.channel.isThread() && { threadId: message.channelId }),
}; };
// Check if message is in an agent channel
const agentName = this.agentChannels.get(message.channelId);
if (agentName) {
void this.handleAgentChat(chatMessage, agentName);
return;
}
// Check if message is in control channel for commands
if (message.channelId !== this.controlChannelId) return;
// Parse command // Parse command
const command = this.parseCommand(chatMessage); const command = this.parseCommand(chatMessage);
if (command) { if (command) {
@@ -394,4 +458,150 @@ export class DiscordService implements IChatProvider {
await this.sendMessage(message.channelId, helpMessage); await this.sendMessage(message.channelId, helpMessage);
} }
/**
* Handle agent chat - Route message to specific agent via ChatProxyService
* Messages in agent channels are sent directly to the agent without requiring @mosaic prefix.
*/
private async handleAgentChat(message: ChatMessage, agentName: string): Promise<void> {
this.logger.log(
`Routing message from ${message.authorName} to agent "${agentName}" in channel ${message.channelId}`
);
// Ignore empty messages
if (!message.content.trim()) {
return;
}
try {
// Get workspace owner ID for routing
const userId = await this.getWorkspaceOwnerId();
// Build message history (just the user's message for now)
const messages = [{ role: "user" as const, content: message.content }];
// Send typing indicator while waiting for response
const channel = await this.client.channels.fetch(message.channelId);
if (channel?.isTextBased()) {
void (channel as TextChannel).sendTyping();
}
// Proxy to agent
const response = await this.chatProxyService.proxyChat(
userId,
messages,
undefined,
agentName
);
// Stream the response to channel
await this.streamResponseToChannel(message.channelId, response);
this.logger.debug(`Agent "${agentName}" response sent to channel ${message.channelId}`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to route message to agent "${agentName}": ${errorMessage}`);
await this.sendMessage(
message.channelId,
`Failed to get response from ${agentName}. Please try again later.`
);
}
}
/**
* Stream SSE response from chat proxy and send to Discord channel.
* Collects the full response and sends as a single message for reliability.
*/
private async streamResponseToChannel(channelId: string, response: Response): Promise<string> {
const reader = response.body?.getReader();
if (!reader) {
throw new Error("Response body is not readable");
}
const decoder = new TextDecoder();
let fullContent = "";
let buffer = "";
try {
let readResult = await reader.read();
while (!readResult.done) {
const { value } = readResult;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") continue;
try {
const parsed = JSON.parse(data) as {
choices?: { delta?: { content?: string } }[];
};
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
fullContent += content;
}
} catch {
// Skip invalid JSON
}
}
}
readResult = await reader.read();
}
// Send the full response to Discord
if (fullContent.trim()) {
// Discord has a 2000 character limit, split if needed
const chunks = this.splitMessageForDiscord(fullContent);
for (const chunk of chunks) {
await this.sendMessage(channelId, chunk);
}
}
return fullContent;
} finally {
reader.releaseLock();
}
}
/**
* Split a message into chunks that fit within Discord's 2000 character limit.
* Tries to split on paragraph or sentence boundaries when possible.
*/
private splitMessageForDiscord(content: string, maxLength = 2000): string[] {
if (content.length <= maxLength) {
return [content];
}
const chunks: string[] = [];
let remaining = content;
while (remaining.length > maxLength) {
// Try to find a good break point
let breakPoint = remaining.lastIndexOf("\n\n", maxLength);
if (breakPoint < maxLength * 0.5) {
breakPoint = remaining.lastIndexOf("\n", maxLength);
}
if (breakPoint < maxLength * 0.5) {
breakPoint = remaining.lastIndexOf(". ", maxLength);
}
if (breakPoint < maxLength * 0.5) {
breakPoint = remaining.lastIndexOf(" ", maxLength);
}
if (breakPoint < maxLength * 0.5) {
breakPoint = maxLength - 1;
}
chunks.push(remaining.slice(0, breakPoint + 1).trim());
remaining = remaining.slice(breakPoint + 1).trim();
}
if (remaining.length > 0) {
chunks.push(remaining);
}
return chunks;
}
} }

View File

@@ -28,6 +28,7 @@ import { StitcherService } from "../../stitcher/stitcher.service";
import { HeraldService } from "../../herald/herald.service"; import { HeraldService } from "../../herald/herald.service";
import { PrismaService } from "../../prisma/prisma.service"; import { PrismaService } from "../../prisma/prisma.service";
import { BullMqService } from "../../bullmq/bullmq.service"; import { BullMqService } from "../../bullmq/bullmq.service";
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
import type { IChatProvider } from "../interfaces"; import type { IChatProvider } from "../interfaces";
import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types"; import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types";
@@ -192,6 +193,7 @@ function setDiscordEnv(): void {
function setEncryptionKey(): void { function setEncryptionKey(): void {
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long";
} }
/** /**
@@ -205,6 +207,10 @@ async function compileBridgeModule(): Promise<TestingModule> {
.useValue({}) .useValue({})
.overrideProvider(BullMqService) .overrideProvider(BullMqService)
.useValue({}) .useValue({})
.overrideProvider(ChatProxyService)
.useValue({
proxyChat: vi.fn().mockResolvedValue(new Response()),
})
.compile(); .compile();
} }

View File

@@ -1,31 +1,79 @@
import { import { Body, Controller, HttpException, Logger, Post, Req, Res, UseGuards } from "@nestjs/common";
Body,
Controller,
HttpException,
Logger,
Post,
Req,
Res,
UnauthorizedException,
UseGuards,
} from "@nestjs/common";
import type { Response } from "express"; import type { Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard"; import { AuthGuard } from "../auth/guards/auth.guard";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface"; import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface";
import { ChatStreamDto } from "./chat-proxy.dto"; import { ChatStreamDto } from "./chat-proxy.dto";
import { ChatProxyService } from "./chat-proxy.service"; import { ChatProxyService } from "./chat-proxy.service";
@Controller("chat") @Controller("chat")
@UseGuards(AuthGuard)
export class ChatProxyController { export class ChatProxyController {
private readonly logger = new Logger(ChatProxyController.name); private readonly logger = new Logger(ChatProxyController.name);
constructor(private readonly chatProxyService: ChatProxyService) {} constructor(private readonly chatProxyService: ChatProxyService) {}
// POST /api/chat/guest
// Guest chat endpoint - no authentication required
// Uses a shared LLM configuration for unauthenticated users
@SkipCsrf()
@Post("guest")
async guestChat(
@Body() body: ChatStreamDto,
@Req() req: MaybeAuthenticatedRequest,
@Res() res: Response
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
try {
const upstreamResponse = await this.chatProxyService.proxyGuestChat(
body.messages,
abortController.signal
);
const upstreamContentType = upstreamResponse.headers.get("content-type");
if (upstreamContentType) {
res.setHeader("Content-Type", upstreamContentType);
}
if (!upstreamResponse.body) {
throw new Error("LLM response did not include a stream body");
}
for await (const chunk of upstreamResponse.body as unknown as AsyncIterable<Uint8Array>) {
if (res.writableEnded || res.destroyed) {
break;
}
res.write(Buffer.from(chunk));
}
} catch (error: unknown) {
this.logStreamError(error);
if (!res.writableEnded && !res.destroyed) {
res.write("event: error\n");
res.write(`data: ${JSON.stringify({ error: this.toSafeClientMessage(error) })}\n\n`);
}
} finally {
if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
}
// POST /api/chat/stream // POST /api/chat/stream
// Request: { messages: Array<{role, content}> } // Request: { messages: Array<{role, content}> }
// Response: SSE stream of chat completion events // Response: SSE stream of chat completion events
// Requires authentication - uses user's personal OpenClaw container
@Post("stream") @Post("stream")
@UseGuards(AuthGuard)
async streamChat( async streamChat(
@Body() body: ChatStreamDto, @Body() body: ChatStreamDto,
@Req() req: MaybeAuthenticatedRequest, @Req() req: MaybeAuthenticatedRequest,
@@ -33,7 +81,8 @@ export class ChatProxyController {
): Promise<void> { ): Promise<void> {
const userId = req.user?.id; const userId = req.user?.id;
if (!userId) { if (!userId) {
throw new UnauthorizedException("No authenticated user found on request"); this.logger.warn("streamChat called without user ID after AuthGuard");
throw new HttpException("Authentication required", 401);
} }
const abortController = new AbortController(); const abortController = new AbortController();
@@ -50,7 +99,8 @@ export class ChatProxyController {
const upstreamResponse = await this.chatProxyService.proxyChat( const upstreamResponse = await this.chatProxyService.proxyChat(
userId, userId,
body.messages, body.messages,
abortController.signal abortController.signal,
body.agent
); );
const upstreamContentType = upstreamResponse.headers.get("content-type"); const upstreamContentType = upstreamResponse.headers.get("content-type");

View File

@@ -1,5 +1,12 @@
import { Type } from "class-transformer"; import { Type } from "class-transformer";
import { ArrayMinSize, IsArray, IsNotEmpty, IsString, ValidateNested } from "class-validator"; import {
ArrayMinSize,
IsArray,
IsNotEmpty,
IsOptional,
IsString,
ValidateNested,
} from "class-validator";
export interface ChatMessage { export interface ChatMessage {
role: string; role: string;
@@ -22,4 +29,8 @@ export class ChatStreamDto {
@ValidateNested({ each: true }) @ValidateNested({ each: true })
@Type(() => ChatMessageDto) @Type(() => ChatMessageDto)
messages!: ChatMessageDto[]; messages!: ChatMessageDto[];
@IsString({ message: "agent must be a string" })
@IsOptional()
agent?: string;
} }

View File

@@ -1,4 +1,5 @@
import { Module } from "@nestjs/common"; import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AuthModule } from "../auth/auth.module"; import { AuthModule } from "../auth/auth.module";
import { AgentConfigModule } from "../agent-config/agent-config.module"; import { AgentConfigModule } from "../agent-config/agent-config.module";
import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module"; import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module";
@@ -7,7 +8,7 @@ import { ChatProxyController } from "./chat-proxy.controller";
import { ChatProxyService } from "./chat-proxy.service"; import { ChatProxyService } from "./chat-proxy.service";
@Module({ @Module({
imports: [AuthModule, PrismaModule, ContainerLifecycleModule, AgentConfigModule], imports: [AuthModule, PrismaModule, ContainerLifecycleModule, AgentConfigModule, ConfigModule],
controllers: [ChatProxyController], controllers: [ChatProxyController],
providers: [ChatProxyService], providers: [ChatProxyService],
exports: [ChatProxyService], exports: [ChatProxyService],

View File

@@ -1,4 +1,8 @@
import { ServiceUnavailableException } from "@nestjs/common"; import {
ServiceUnavailableException,
NotFoundException,
BadGatewayException,
} from "@nestjs/common";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ChatProxyService } from "./chat-proxy.service"; import { ChatProxyService } from "./chat-proxy.service";
@@ -9,6 +13,9 @@ describe("ChatProxyService", () => {
userAgentConfig: { userAgentConfig: {
findUnique: vi.fn(), findUnique: vi.fn(),
}, },
userAgent: {
findUnique: vi.fn(),
},
}; };
const containerLifecycle = { const containerLifecycle = {
@@ -16,13 +23,17 @@ describe("ChatProxyService", () => {
touch: vi.fn(), touch: vi.fn(),
}; };
const config = {
get: vi.fn(),
};
let service: ChatProxyService; let service: ChatProxyService;
let fetchMock: ReturnType<typeof vi.fn>; let fetchMock: ReturnType<typeof vi.fn>;
beforeEach(() => { beforeEach(() => {
fetchMock = vi.fn(); fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock); vi.stubGlobal("fetch", fetchMock);
service = new ChatProxyService(prisma as never, containerLifecycle as never); service = new ChatProxyService(prisma as never, containerLifecycle as never, config as never);
}); });
afterEach(() => { afterEach(() => {
@@ -105,4 +116,135 @@ describe("ChatProxyService", () => {
); );
}); });
}); });
describe("proxyChat with agent routing", () => {
it("includes agent config when agentName is specified", async () => {
const mockAgent = {
name: "jarvis",
displayName: "Jarvis",
personality: "Capable, direct, proactive.",
primaryModel: "opus",
isActive: true,
};
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
prisma.userAgent.findUnique.mockResolvedValue(mockAgent);
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
const messages = [{ role: "user", content: "Hello Jarvis" }];
await service.proxyChat(userId, messages, undefined, "jarvis");
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
const parsedBody = JSON.parse(String(request.body));
expect(parsedBody).toEqual({
messages,
model: "opus",
stream: true,
agent: "jarvis",
agent_personality: "Capable, direct, proactive.",
});
});
it("throws NotFoundException when agent not found", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
prisma.userAgent.findUnique.mockResolvedValue(null);
const messages = [{ role: "user", content: "Hello" }];
await expect(service.proxyChat(userId, messages, undefined, "nonexistent")).rejects.toThrow(
NotFoundException
);
});
it("throws NotFoundException when agent is not active", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
prisma.userAgent.findUnique.mockResolvedValue({
name: "inactive-agent",
displayName: "Inactive",
personality: "...",
primaryModel: null,
isActive: false,
});
const messages = [{ role: "user", content: "Hello" }];
await expect(
service.proxyChat(userId, messages, undefined, "inactive-agent")
).rejects.toThrow(NotFoundException);
});
it("falls back to default model when agent has no primaryModel", async () => {
const mockAgent = {
name: "jarvis",
displayName: "Jarvis",
personality: "Capable, direct, proactive.",
primaryModel: null,
isActive: true,
};
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
prisma.userAgent.findUnique.mockResolvedValue(mockAgent);
prisma.userAgentConfig.findUnique.mockResolvedValue(null);
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
const messages = [{ role: "user", content: "Hello" }];
await service.proxyChat(userId, messages, undefined, "jarvis");
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
const parsedBody = JSON.parse(String(request.body));
expect(parsedBody.model).toBe("openclaw:default");
});
});
describe("proxyGuestChat", () => {
it("uses environment variables for guest LLM configuration", async () => {
config.get.mockImplementation((key: string) => {
if (key === "GUEST_LLM_URL") return "http://10.1.1.42:11434/v1";
if (key === "GUEST_LLM_MODEL") return "llama3.2";
return undefined;
});
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
const messages = [{ role: "user", content: "Hello" }];
await service.proxyGuestChat(messages);
expect(fetchMock).toHaveBeenCalledWith(
"http://10.1.1.42:11434/v1/chat/completions",
expect.objectContaining({
method: "POST",
headers: {
"Content-Type": "application/json",
},
})
);
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
const parsedBody = JSON.parse(String(request.body));
expect(parsedBody.model).toBe("llama3.2");
});
it("throws BadGatewayException on guest LLM errors", async () => {
config.get.mockReturnValue(undefined);
fetchMock.mockResolvedValue(new Response("Internal Server Error", { status: 500 }));
const messages = [{ role: "user", content: "Hello" }];
await expect(service.proxyGuestChat(messages)).rejects.toThrow(BadGatewayException);
});
});
}); });

View File

@@ -2,26 +2,38 @@ import {
BadGatewayException, BadGatewayException,
Injectable, Injectable,
Logger, Logger,
NotFoundException,
ServiceUnavailableException, ServiceUnavailableException,
} from "@nestjs/common"; } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service"; import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import type { ChatMessage } from "./chat-proxy.dto"; import type { ChatMessage } from "./chat-proxy.dto";
const DEFAULT_OPENCLAW_MODEL = "openclaw:default"; const DEFAULT_OPENCLAW_MODEL = "openclaw:default";
const DEFAULT_GUEST_LLM_URL = "http://10.1.1.42:11434/v1";
const DEFAULT_GUEST_LLM_MODEL = "llama3.2";
interface ContainerConnection { interface ContainerConnection {
url: string; url: string;
token: string; token: string;
} }
interface AgentConfig {
name: string;
displayName: string;
personality: string;
primaryModel: string | null;
}
@Injectable() @Injectable()
export class ChatProxyService { export class ChatProxyService {
private readonly logger = new Logger(ChatProxyService.name); private readonly logger = new Logger(ChatProxyService.name);
constructor( constructor(
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly containerLifecycle: ContainerLifecycleService private readonly containerLifecycle: ContainerLifecycleService,
private readonly config: ConfigService
) {} ) {}
// Get the user's OpenClaw container URL and mark it active. // Get the user's OpenClaw container URL and mark it active.
@@ -34,21 +46,38 @@ export class ChatProxyService {
async proxyChat( async proxyChat(
userId: string, userId: string,
messages: ChatMessage[], messages: ChatMessage[],
signal?: AbortSignal signal?: AbortSignal,
agentName?: string
): Promise<Response> { ): Promise<Response> {
const { url: containerUrl, token: gatewayToken } = await this.getContainerConnection(userId); const { url: containerUrl, token: gatewayToken } = await this.getContainerConnection(userId);
const model = await this.getPreferredModel(userId);
// Get agent config if specified
let agentConfig: AgentConfig | null = null;
if (agentName) {
agentConfig = await this.getAgentConfig(userId, agentName);
}
const model = agentConfig?.primaryModel ?? (await this.getPreferredModel(userId));
const requestBody: Record<string, unknown> = {
messages,
model,
stream: true,
};
// Add agent config if available
if (agentConfig) {
requestBody.agent = agentConfig.name;
requestBody.agent_personality = agentConfig.personality;
}
const requestInit: RequestInit = { const requestInit: RequestInit = {
method: "POST", method: "POST",
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
Authorization: `Bearer ${gatewayToken}`, Authorization: `Bearer ${gatewayToken}`,
}, },
body: JSON.stringify({ body: JSON.stringify(requestBody),
messages,
model,
stream: true,
}),
}; };
if (signal) { if (signal) {
@@ -79,6 +108,65 @@ export class ChatProxyService {
} }
} }
/**
* Proxy guest chat request to configured LLM endpoint.
* Uses environment variables for configuration:
* - GUEST_LLM_URL: OpenAI-compatible endpoint URL
* - GUEST_LLM_API_KEY: API key (optional, for cloud providers)
* - GUEST_LLM_MODEL: Model name to use
*/
async proxyGuestChat(messages: ChatMessage[], signal?: AbortSignal): Promise<Response> {
const llmUrl = this.config.get<string>("GUEST_LLM_URL") ?? DEFAULT_GUEST_LLM_URL;
const llmApiKey = this.config.get<string>("GUEST_LLM_API_KEY");
const llmModel = this.config.get<string>("GUEST_LLM_MODEL") ?? DEFAULT_GUEST_LLM_MODEL;
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (llmApiKey) {
headers.Authorization = `Bearer ${llmApiKey}`;
}
const requestInit: RequestInit = {
method: "POST",
headers,
body: JSON.stringify({
messages,
model: llmModel,
stream: true,
}),
};
if (signal) {
requestInit.signal = signal;
}
try {
this.logger.debug(`Guest chat proxying to ${llmUrl} with model ${llmModel}`);
const response = await fetch(`${llmUrl}/chat/completions`, requestInit);
if (!response.ok) {
const detail = await this.readResponseText(response);
const status = `${String(response.status)} ${response.statusText}`.trim();
this.logger.warn(
detail ? `Guest LLM returned ${status}: ${detail}` : `Guest LLM returned ${status}`
);
throw new BadGatewayException(`Guest LLM returned ${status}`);
}
return response;
} catch (error: unknown) {
if (error instanceof BadGatewayException) {
throw error;
}
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Failed to proxy guest chat request: ${message}`);
throw new ServiceUnavailableException("Failed to proxy guest chat to LLM");
}
}
private async getContainerConnection(userId: string): Promise<ContainerConnection> { private async getContainerConnection(userId: string): Promise<ContainerConnection> {
const connection = await this.containerLifecycle.ensureRunning(userId); const connection = await this.containerLifecycle.ensureRunning(userId);
await this.containerLifecycle.touch(userId); await this.containerLifecycle.touch(userId);
@@ -107,4 +195,32 @@ export class ChatProxyService {
return null; return null;
} }
} }
private async getAgentConfig(userId: string, agentName: string): Promise<AgentConfig> {
const agent = await this.prisma.userAgent.findUnique({
where: { userId_name: { userId, name: agentName } },
select: {
name: true,
displayName: true,
personality: true,
primaryModel: true,
isActive: true,
},
});
if (!agent) {
throw new NotFoundException(`Agent "${agentName}" not found for user`);
}
if (!agent.isActive) {
throw new NotFoundException(`Agent "${agentName}" is not active`);
}
return {
name: agent.name,
displayName: agent.displayName,
personality: agent.personality,
primaryModel: agent.primaryModel,
};
}
} }

View File

@@ -1,6 +1,7 @@
import { NestFactory } from "@nestjs/core"; import { NestFactory } from "@nestjs/core";
import { RequestMethod, ValidationPipe } from "@nestjs/common"; import { RequestMethod, ValidationPipe } from "@nestjs/common";
import cookieParser from "cookie-parser"; import cookieParser from "cookie-parser";
import helmet from "helmet";
import { AppModule } from "./app.module"; import { AppModule } from "./app.module";
import { getTrustedOrigins } from "./auth/auth.config"; import { getTrustedOrigins } from "./auth/auth.config";
import { GlobalExceptionFilter } from "./filters/global-exception.filter"; import { GlobalExceptionFilter } from "./filters/global-exception.filter";
@@ -33,6 +34,14 @@ async function bootstrap() {
// Enable cookie parser for session handling // Enable cookie parser for session handling
app.use(cookieParser()); app.use(cookieParser());
// Enable helmet security headers
app.use(
helmet({
contentSecurityPolicy: false, // Let Next.js handle CSP
crossOriginEmbedderPolicy: false,
})
);
// Enable global validation pipe with transformation // Enable global validation pipe with transformation
app.useGlobalPipes( app.useGlobalPipes(
new ValidationPipe({ new ValidationPipe({

View File

@@ -1,4 +1,4 @@
import { Controller, Get, Res, UseGuards } from "@nestjs/common"; import { Controller, Get, Query, Res, UseGuards } from "@nestjs/common";
import { AgentStatus } from "@prisma/client"; import { AgentStatus } from "@prisma/client";
import type { Response } from "express"; import type { Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard"; import { AuthGuard } from "../auth/guards/auth.guard";
@@ -6,6 +6,7 @@ import { PrismaService } from "../prisma/prisma.service";
const AGENT_POLL_INTERVAL_MS = 5_000; const AGENT_POLL_INTERVAL_MS = 5_000;
const SSE_HEARTBEAT_MS = 15_000; const SSE_HEARTBEAT_MS = 15_000;
const DEFAULT_EVENTS_LIMIT = 25;
interface OrchestratorAgentDto { interface OrchestratorAgentDto {
id: string; id: string;
@@ -15,6 +16,26 @@ interface OrchestratorAgentDto {
createdAt: Date; createdAt: Date;
} }
interface OrchestratorEventDto {
type: string;
timestamp: string;
agentId?: string;
taskId?: string;
data?: Record<string, unknown>;
}
interface OrchestratorHealthDto {
status: "healthy" | "degraded" | "unhealthy";
database: "connected" | "disconnected";
agents: {
total: number;
working: number;
idle: number;
errored: number;
};
timestamp: string;
}
@Controller("orchestrator") @Controller("orchestrator")
@UseGuards(AuthGuard) @UseGuards(AuthGuard)
export class OrchestratorController { export class OrchestratorController {
@@ -25,6 +46,81 @@ export class OrchestratorController {
return this.fetchActiveAgents(); return this.fetchActiveAgents();
} }
@Get("events/recent")
async getRecentEvents(
@Query("limit") limit?: string
): Promise<{ events: OrchestratorEventDto[] }> {
const eventsLimit = limit ? parseInt(limit, 10) : DEFAULT_EVENTS_LIMIT;
const safeLimit = Math.min(Math.max(eventsLimit, 1), 100);
// Fetch recent agent activity to derive events
const agents = await this.prisma.agent.findMany({
where: {
status: {
not: AgentStatus.TERMINATED,
},
},
orderBy: {
createdAt: "desc",
},
take: safeLimit,
});
// Derive events from agent status changes
const events: OrchestratorEventDto[] = agents.map((agent) => ({
type: `agent:${agent.status.toLowerCase()}`,
timestamp: agent.createdAt.toISOString(),
agentId: agent.id,
data: {
name: agent.name,
role: agent.role,
model: agent.model,
},
}));
return { events };
}
@Get("health")
async getHealth(): Promise<OrchestratorHealthDto> {
let databaseConnected = false;
let agents: OrchestratorAgentDto[] = [];
try {
// Check database connectivity
await this.prisma.$queryRaw`SELECT 1`;
databaseConnected = true;
// Get agent counts
agents = await this.fetchActiveAgents();
} catch {
databaseConnected = false;
}
const working = agents.filter((a) => a.status === AgentStatus.WORKING).length;
const idle = agents.filter((a) => a.status === AgentStatus.IDLE).length;
const errored = agents.filter((a) => a.status === AgentStatus.ERROR).length;
let status: OrchestratorHealthDto["status"] = "healthy";
if (!databaseConnected) {
status = "unhealthy";
} else if (errored > 0) {
status = "degraded";
}
return {
status,
database: databaseConnected ? "connected" : "disconnected",
agents: {
total: agents.length,
working,
idle,
errored,
},
timestamp: new Date().toISOString(),
};
}
@Get("events") @Get("events")
async streamEvents(@Res() res: Response): Promise<void> { async streamEvents(@Res() res: Response): Promise<void> {
res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Content-Type", "text/event-stream");

View File

@@ -8,6 +8,7 @@ import {
MinLength, MinLength,
MaxLength, MaxLength,
Matches, Matches,
IsUUID,
} from "class-validator"; } from "class-validator";
/** /**
@@ -43,6 +44,10 @@ export class CreateProjectDto {
}) })
color?: string; color?: string;
@IsOptional()
@IsUUID("4", { message: "domainId must be a valid UUID" })
domainId?: string;
@IsOptional() @IsOptional()
@IsObject({ message: "metadata must be an object" }) @IsObject({ message: "metadata must be an object" })
metadata?: Record<string, unknown>; metadata?: Record<string, unknown>;

View File

@@ -8,6 +8,7 @@ import {
MinLength, MinLength,
MaxLength, MaxLength,
Matches, Matches,
IsUUID,
} from "class-validator"; } from "class-validator";
/** /**
@@ -45,6 +46,10 @@ export class UpdateProjectDto {
}) })
color?: string | null; color?: string | null;
@IsOptional()
@IsUUID("4", { message: "domainId must be a valid UUID" })
domainId?: string | null;
@IsOptional() @IsOptional()
@IsObject({ message: "metadata must be an object" }) @IsObject({ message: "metadata must be an object" })
metadata?: Record<string, unknown>; metadata?: Record<string, unknown>;

View File

@@ -47,6 +47,9 @@ export class ProjectsService {
createProjectDto: CreateProjectDto createProjectDto: CreateProjectDto
): Promise<ProjectWithRelations> { ): Promise<ProjectWithRelations> {
const data: Prisma.ProjectCreateInput = { const data: Prisma.ProjectCreateInput = {
...(createProjectDto.domainId
? { domain: { connect: { id: createProjectDto.domainId } } }
: {}),
name: createProjectDto.name, name: createProjectDto.name,
description: createProjectDto.description ?? null, description: createProjectDto.description ?? null,
color: createProjectDto.color ?? null, color: createProjectDto.color ?? null,
@@ -221,6 +224,18 @@ export class ProjectsService {
if (updateProjectDto.startDate !== undefined) updateData.startDate = updateProjectDto.startDate; if (updateProjectDto.startDate !== undefined) updateData.startDate = updateProjectDto.startDate;
if (updateProjectDto.endDate !== undefined) updateData.endDate = updateProjectDto.endDate; if (updateProjectDto.endDate !== undefined) updateData.endDate = updateProjectDto.endDate;
if (updateProjectDto.color !== undefined) updateData.color = updateProjectDto.color; if (updateProjectDto.color !== undefined) updateData.color = updateProjectDto.color;
if (updateProjectDto.domainId !== undefined)
updateData.domain = updateProjectDto.domainId
? { connect: { id: updateProjectDto.domainId } }
: { disconnect: true };
if (updateProjectDto.domainId !== undefined)
updateData.domain = updateProjectDto.domainId
? {
connect: {
id: updateProjectDto.domainId,
},
}
: { disconnect: true };
if (updateProjectDto.metadata !== undefined) { if (updateProjectDto.metadata !== undefined) {
updateData.metadata = updateProjectDto.metadata as unknown as Prisma.InputJsonValue; updateData.metadata = updateProjectDto.metadata as unknown as Prisma.InputJsonValue;
} }

View File

@@ -0,0 +1,62 @@
import type { PrismaClient } from "@prisma/client";
const AGENT_TEMPLATES = [
{
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
personality: `# Jarvis - Orchestrator Agent\n\nYou are Jarvis, the orchestrator and COO. You plan, delegate, and coordinate. You never write code directly — you spawn workers. You are direct, capable, and proactive. Your job is to get things done without hand-holding.\n\n## Core Traits\n- Direct and concise\n- Resourceful — figure it out before asking\n- Proactive — find problems to solve\n- Delegator — workers execute, you orchestrate`,
primaryModel: "opus",
fallbackModels: ["sonnet"],
toolPermissions: ["read", "write", "exec", "browser", "web_search", "memory_search"],
discordChannel: "jarvis",
isActive: true,
isDefault: true,
},
{
name: "builder",
displayName: "Builder",
role: "coding",
personality: `# Builder - Coding Agent\n\nYou are Builder, the coding agent. You implement features, fix bugs, and write tests. You work in worktrees, follow the E2E delivery protocol, and never skip quality gates. You are methodical and thorough.\n\n## Core Traits\n- Works in git worktrees (never touches main directly)\n- Runs lint + typecheck + tests before every commit\n- Follows the Mosaic E2E delivery framework\n- Never marks a task done until CI is green`,
primaryModel: "codex",
fallbackModels: ["sonnet", "haiku"],
toolPermissions: ["read", "write", "exec"],
discordChannel: "builder",
isActive: true,
isDefault: true,
},
{
name: "medic",
displayName: "Medic",
role: "monitoring",
personality: `# Medic - Health Monitoring Agent\n\nYou are Medic, the health monitoring agent. You watch services, check deployments, alert on anomalies, and verify system health. You are vigilant, calm, and proactive.\n\n## Core Traits\n- Monitors service health proactively\n- Alerts clearly and concisely\n- Tracks uptime and deployment status\n- Never panics — diagnoses methodically`,
primaryModel: "haiku",
fallbackModels: ["sonnet"],
toolPermissions: ["read", "exec"],
discordChannel: "medic-alerts",
isActive: true,
isDefault: true,
},
];
export async function seedAgentTemplates(prisma: PrismaClient): Promise<void> {
for (const template of AGENT_TEMPLATES) {
await prisma.agentTemplate.upsert({
where: { name: template.name },
update: {},
create: {
name: template.name,
displayName: template.displayName,
role: template.role,
personality: template.personality,
primaryModel: template.primaryModel,
fallbackModels: template.fallbackModels,
toolPermissions: template.toolPermissions,
discordChannel: template.discordChannel,
isActive: template.isActive,
isDefault: template.isDefault,
},
});
}
console.log("✅ Agent templates seeded:", AGENT_TEMPLATES.map((t) => t.name).join(", "));
}

View File

@@ -0,0 +1,43 @@
import { IsString, IsBoolean, IsOptional, IsArray, MinLength } from "class-validator";
export class CreateUserAgentDto {
@IsString()
@MinLength(1)
templateId?: string;
@IsString()
@MinLength(1)
name!: string;
@IsString()
@MinLength(1)
displayName!: string;
@IsString()
@MinLength(1)
role!: string;
@IsString()
@MinLength(1)
personality!: string;
@IsString()
@IsOptional()
primaryModel?: string;
@IsArray()
@IsOptional()
fallbackModels?: string[];
@IsArray()
@IsOptional()
toolPermissions?: string[];
@IsString()
@IsOptional()
discordChannel?: string;
@IsBoolean()
@IsOptional()
isActive?: boolean;
}

View File

@@ -0,0 +1,4 @@
import { PartialType } from "@nestjs/mapped-types";
import { CreateUserAgentDto } from "./create-user-agent.dto";
export class UpdateUserAgentDto extends PartialType(CreateUserAgentDto) {}

View File

@@ -0,0 +1,70 @@
import {
Controller,
Get,
Post,
Patch,
Delete,
Body,
Param,
UseGuards,
ParseUUIDPipe,
} from "@nestjs/common";
import { UserAgentService } from "./user-agent.service";
import { CreateUserAgentDto } from "./dto/create-user-agent.dto";
import { UpdateUserAgentDto } from "./dto/update-user-agent.dto";
import { AuthGuard } from "../auth/guards/auth.guard";
import { CurrentUser } from "../auth/decorators/current-user.decorator";
import type { AuthUser } from "@mosaic/shared";
@Controller("agents")
@UseGuards(AuthGuard)
export class UserAgentController {
constructor(private readonly userAgentService: UserAgentService) {}
@Get()
findAll(@CurrentUser() user: AuthUser) {
return this.userAgentService.findAll(user.id);
}
@Get("status")
getAllStatuses(@CurrentUser() user: AuthUser) {
return this.userAgentService.getAllStatuses(user.id);
}
@Get(":id")
findOne(@CurrentUser() user: AuthUser, @Param("id", ParseUUIDPipe) id: string) {
return this.userAgentService.findOne(user.id, id);
}
@Get(":id/status")
getStatus(@CurrentUser() user: AuthUser, @Param("id", ParseUUIDPipe) id: string) {
return this.userAgentService.getStatus(user.id, id);
}
@Post()
create(@CurrentUser() user: AuthUser, @Body() dto: CreateUserAgentDto) {
return this.userAgentService.create(user.id, dto);
}
@Post("from-template/:templateId")
createFromTemplate(
@CurrentUser() user: AuthUser,
@Param("templateId", ParseUUIDPipe) templateId: string
) {
return this.userAgentService.createFromTemplate(user.id, templateId);
}
@Patch(":id")
update(
@CurrentUser() user: AuthUser,
@Param("id", ParseUUIDPipe) id: string,
@Body() dto: UpdateUserAgentDto
) {
return this.userAgentService.update(user.id, id, dto);
}
@Delete(":id")
remove(@CurrentUser() user: AuthUser, @Param("id", ParseUUIDPipe) id: string) {
return this.userAgentService.remove(user.id, id);
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { UserAgentService } from "./user-agent.service";
import { UserAgentController } from "./user-agent.controller";
import { PrismaModule } from "../prisma/prisma.module";
import { AuthModule } from "../auth/auth.module";
@Module({
imports: [PrismaModule, AuthModule],
controllers: [UserAgentController],
providers: [UserAgentService],
exports: [UserAgentService],
})
export class UserAgentModule {}

View File

@@ -0,0 +1,300 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { UserAgentService } from "./user-agent.service";
import { PrismaService } from "../prisma/prisma.service";
import { NotFoundException, ConflictException, ForbiddenException } from "@nestjs/common";
describe("UserAgentService", () => {
let service: UserAgentService;
let prisma: PrismaService;
const mockPrismaService = {
userAgent: {
findMany: vi.fn(),
findUnique: vi.fn(),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
},
agentTemplate: {
findUnique: vi.fn(),
},
};
const mockUserId = "550e8400-e29b-41d4-a716-446655440001";
const mockAgentId = "550e8400-e29b-41d4-a716-446655440002";
const mockTemplateId = "550e8400-e29b-41d4-a716-446655440003";
const mockAgent = {
id: mockAgentId,
userId: mockUserId,
templateId: null,
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
personality: "Capable, direct, proactive.",
primaryModel: "opus",
fallbackModels: ["sonnet"],
toolPermissions: ["all"],
discordChannel: "jarvis",
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
};
const mockTemplate = {
id: mockTemplateId,
name: "builder",
displayName: "Builder",
role: "coding",
personality: "Focused, thorough.",
primaryModel: "codex",
fallbackModels: ["sonnet"],
toolPermissions: ["exec", "read", "write"],
discordChannel: "builder",
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
UserAgentService,
{
provide: PrismaService,
useValue: mockPrismaService,
},
],
}).compile();
service = module.get<UserAgentService>(UserAgentService);
prisma = module.get<PrismaService>(PrismaService);
vi.clearAllMocks();
});
it("should be defined", () => {
expect(service).toBeDefined();
});
describe("findAll", () => {
it("should return all agents for a user", async () => {
mockPrismaService.userAgent.findMany.mockResolvedValue([mockAgent]);
const result = await service.findAll(mockUserId);
expect(result).toEqual([mockAgent]);
expect(mockPrismaService.userAgent.findMany).toHaveBeenCalledWith({
where: { userId: mockUserId },
orderBy: { createdAt: "asc" },
});
});
it("should return empty array if no agents", async () => {
mockPrismaService.userAgent.findMany.mockResolvedValue([]);
const result = await service.findAll(mockUserId);
expect(result).toEqual([]);
});
});
describe("findOne", () => {
it("should return an agent by id", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
const result = await service.findOne(mockUserId, mockAgentId);
expect(result).toEqual(mockAgent);
});
it("should throw NotFoundException if agent not found", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(null);
await expect(service.findOne(mockUserId, mockAgentId)).rejects.toThrow(NotFoundException);
});
it("should throw ForbiddenException if agent belongs to different user", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue({
...mockAgent,
userId: "different-user-id",
});
await expect(service.findOne(mockUserId, mockAgentId)).rejects.toThrow(ForbiddenException);
});
});
describe("findByName", () => {
it("should return an agent by name", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
const result = await service.findByName(mockUserId, "jarvis");
expect(result).toEqual(mockAgent);
expect(mockPrismaService.userAgent.findUnique).toHaveBeenCalledWith({
where: { userId_name: { userId: mockUserId, name: "jarvis" } },
});
});
it("should throw NotFoundException if agent not found", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(null);
await expect(service.findByName(mockUserId, "nonexistent")).rejects.toThrow(
NotFoundException
);
});
});
describe("create", () => {
it("should create a new agent", async () => {
const createDto = {
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
personality: "Capable, direct, proactive.",
};
mockPrismaService.userAgent.findUnique.mockResolvedValue(null);
mockPrismaService.userAgent.create.mockResolvedValue(mockAgent);
const result = await service.create(mockUserId, createDto);
expect(result).toEqual(mockAgent);
});
it("should throw ConflictException if agent name already exists", async () => {
const createDto = {
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
personality: "Capable, direct, proactive.",
};
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
await expect(service.create(mockUserId, createDto)).rejects.toThrow(ConflictException);
});
it("should throw NotFoundException if templateId is invalid", async () => {
const createDto = {
name: "custom",
displayName: "Custom",
role: "custom",
personality: "Custom agent",
templateId: "nonexistent-template",
};
mockPrismaService.userAgent.findUnique.mockResolvedValue(null);
mockPrismaService.agentTemplate.findUnique.mockResolvedValue(null);
await expect(service.create(mockUserId, createDto)).rejects.toThrow(NotFoundException);
});
});
describe("createFromTemplate", () => {
it("should create an agent from a template", async () => {
mockPrismaService.agentTemplate.findUnique.mockResolvedValue(mockTemplate);
mockPrismaService.userAgent.findUnique.mockResolvedValue(null);
mockPrismaService.userAgent.create.mockResolvedValue({
...mockAgent,
templateId: mockTemplateId,
name: mockTemplate.name,
displayName: mockTemplate.displayName,
role: mockTemplate.role,
});
const result = await service.createFromTemplate(mockUserId, mockTemplateId);
expect(result.name).toBe(mockTemplate.name);
expect(result.displayName).toBe(mockTemplate.displayName);
});
it("should throw NotFoundException if template not found", async () => {
mockPrismaService.agentTemplate.findUnique.mockResolvedValue(null);
await expect(service.createFromTemplate(mockUserId, mockTemplateId)).rejects.toThrow(
NotFoundException
);
});
it("should throw ConflictException if agent name already exists", async () => {
mockPrismaService.agentTemplate.findUnique.mockResolvedValue(mockTemplate);
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
await expect(service.createFromTemplate(mockUserId, mockTemplateId)).rejects.toThrow(
ConflictException
);
});
});
describe("update", () => {
it("should update an agent", async () => {
const updateDto = { displayName: "Updated Jarvis" };
const updatedAgent = { ...mockAgent, ...updateDto };
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
mockPrismaService.userAgent.update.mockResolvedValue(updatedAgent);
const result = await service.update(mockUserId, mockAgentId, updateDto);
expect(result.displayName).toBe("Updated Jarvis");
});
it("should throw ConflictException if new name already exists", async () => {
const updateDto = { name: "existing-name" };
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
// Second call checks for existing name
mockPrismaService.userAgent.findUnique.mockResolvedValue({ ...mockAgent, id: "other-id" });
await expect(service.update(mockUserId, mockAgentId, updateDto)).rejects.toThrow(
ConflictException
);
});
});
describe("remove", () => {
it("should delete an agent", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
mockPrismaService.userAgent.delete.mockResolvedValue(mockAgent);
const result = await service.remove(mockUserId, mockAgentId);
expect(result).toEqual(mockAgent);
});
});
describe("getStatus", () => {
it("should return agent status", async () => {
mockPrismaService.userAgent.findUnique.mockResolvedValue(mockAgent);
const result = await service.getStatus(mockUserId, mockAgentId);
expect(result).toEqual({
id: mockAgentId,
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
isActive: true,
});
});
});
describe("getAllStatuses", () => {
it("should return all agent statuses", async () => {
mockPrismaService.userAgent.findMany.mockResolvedValue([mockAgent]);
const result = await service.getAllStatuses(mockUserId);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({
id: mockAgentId,
name: "jarvis",
displayName: "Jarvis",
role: "orchestrator",
isActive: true,
});
});
});
});

View File

@@ -0,0 +1,153 @@
import {
Injectable,
NotFoundException,
ConflictException,
ForbiddenException,
} from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { CreateUserAgentDto } from "./dto/create-user-agent.dto";
import { UpdateUserAgentDto } from "./dto/update-user-agent.dto";
export interface AgentStatusResponse {
id: string;
name: string;
displayName: string;
role: string;
isActive: boolean;
containerStatus?: "running" | "stopped" | "unknown";
}
@Injectable()
export class UserAgentService {
constructor(private readonly prisma: PrismaService) {}
async findAll(userId: string) {
return this.prisma.userAgent.findMany({
where: { userId },
orderBy: { createdAt: "asc" },
});
}
async findOne(userId: string, id: string) {
const agent = await this.prisma.userAgent.findUnique({ where: { id } });
if (!agent) throw new NotFoundException(`UserAgent ${id} not found`);
if (agent.userId !== userId) throw new ForbiddenException("Access denied to this agent");
return agent;
}
async findByName(userId: string, name: string) {
const agent = await this.prisma.userAgent.findUnique({
where: { userId_name: { userId, name } },
});
if (!agent) throw new NotFoundException(`UserAgent "${name}" not found for user`);
return agent;
}
async create(userId: string, dto: CreateUserAgentDto) {
// Check for unique name within user scope
const existing = await this.prisma.userAgent.findUnique({
where: { userId_name: { userId, name: dto.name } },
});
if (existing)
throw new ConflictException(`UserAgent "${dto.name}" already exists for this user`);
// If templateId provided, verify it exists
if (dto.templateId) {
const template = await this.prisma.agentTemplate.findUnique({
where: { id: dto.templateId },
});
if (!template) throw new NotFoundException(`AgentTemplate ${dto.templateId} not found`);
}
return this.prisma.userAgent.create({
data: {
userId,
templateId: dto.templateId ?? null,
name: dto.name,
displayName: dto.displayName,
role: dto.role,
personality: dto.personality,
primaryModel: dto.primaryModel ?? null,
fallbackModels: dto.fallbackModels ?? ([] as string[]),
toolPermissions: dto.toolPermissions ?? ([] as string[]),
discordChannel: dto.discordChannel ?? null,
isActive: dto.isActive ?? true,
},
});
}
async createFromTemplate(userId: string, templateId: string) {
const template = await this.prisma.agentTemplate.findUnique({
where: { id: templateId },
});
if (!template) throw new NotFoundException(`AgentTemplate ${templateId} not found`);
// Check for unique name within user scope
const existing = await this.prisma.userAgent.findUnique({
where: { userId_name: { userId, name: template.name } },
});
if (existing)
throw new ConflictException(`UserAgent "${template.name}" already exists for this user`);
return this.prisma.userAgent.create({
data: {
userId,
templateId: template.id,
name: template.name,
displayName: template.displayName,
role: template.role,
personality: template.personality,
primaryModel: template.primaryModel,
fallbackModels: template.fallbackModels as string[],
toolPermissions: template.toolPermissions as string[],
discordChannel: template.discordChannel,
isActive: template.isActive,
},
});
}
async update(userId: string, id: string, dto: UpdateUserAgentDto) {
const agent = await this.findOne(userId, id);
// If name is being changed, check for uniqueness
if (dto.name && dto.name !== agent.name) {
const existing = await this.prisma.userAgent.findUnique({
where: { userId_name: { userId, name: dto.name } },
});
if (existing)
throw new ConflictException(`UserAgent "${dto.name}" already exists for this user`);
}
return this.prisma.userAgent.update({
where: { id },
data: dto,
});
}
async remove(userId: string, id: string) {
await this.findOne(userId, id);
return this.prisma.userAgent.delete({ where: { id } });
}
async getStatus(userId: string, id: string): Promise<AgentStatusResponse> {
const agent = await this.findOne(userId, id);
return {
id: agent.id,
name: agent.name,
displayName: agent.displayName,
role: agent.role,
isActive: agent.isActive,
};
}
async getAllStatuses(userId: string): Promise<AgentStatusResponse[]> {
const agents = await this.findAll(userId);
return agents.map((agent) => ({
id: agent.id,
name: agent.name,
displayName: agent.displayName,
role: agent.role,
isActive: agent.isActive,
}));
}
}

View File

@@ -29,6 +29,25 @@ export class WorkspacesController {
return this.workspacesService.getUserWorkspaces(user.id); return this.workspacesService.getUserWorkspaces(user.id);
} }
/**
* GET /api/workspaces/:workspaceId/stats
* Returns member, project, and domain counts for a workspace.
*/
@Get(":workspaceId/stats")
async getStats(@Param("workspaceId") workspaceId: string) {
return this.workspacesService.getStats(workspaceId);
}
/**
* GET /api/workspaces/:workspaceId/members
* Returns the list of members for a workspace.
*/
@Get(":workspaceId/members")
@UseGuards(WorkspaceGuard)
async getMembers(@Param("workspaceId") workspaceId: string) {
return this.workspacesService.getMembers(workspaceId);
}
/** /**
* POST /api/workspaces/:workspaceId/members * POST /api/workspaces/:workspaceId/members
* Add a member to a workspace with the specified role. * Add a member to a workspace with the specified role.

View File

@@ -321,6 +321,18 @@ export class WorkspacesService {
}); });
} }
/**
* Get members of a workspace.
*/
async getMembers(workspaceId: string) {
return this.prisma.workspaceMember.findMany({
where: { workspaceId },
include: {
user: { select: { id: true, name: true, email: true, createdAt: true } },
},
orderBy: { joinedAt: "asc" },
});
}
private assertCanAssignRole( private assertCanAssignRole(
actorRole: WorkspaceMemberRole, actorRole: WorkspaceMemberRole,
requestedRole: WorkspaceMemberRole requestedRole: WorkspaceMemberRole
@@ -342,4 +354,15 @@ export class WorkspacesService {
private isUniqueConstraintError(error: unknown): error is Prisma.PrismaClientKnownRequestError { private isUniqueConstraintError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
return error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002"; return error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002";
} }
async getStats(
workspaceId: string
): Promise<{ memberCount: number; projectCount: number; domainCount: number }> {
const [memberCount, projectCount, domainCount] = await Promise.all([
this.prisma.workspaceMember.count({ where: { workspaceId } }),
this.prisma.project.count({ where: { workspaceId } }),
this.prisma.domain.count({ where: { workspaceId } }),
]);
return { memberCount, projectCount, domainCount };
}
} }

View File

@@ -601,9 +601,21 @@ class TestCoordinatorIntegration:
coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.02) coordinator = Coordinator(queue_manager=queue_manager, poll_interval=0.02)
task = asyncio.create_task(coordinator.start()) task = asyncio.create_task(coordinator.start())
await asyncio.sleep(0.5) # Allow time for processing
await coordinator.stop()
# Poll for completion with timeout instead of fixed sleep
deadline = asyncio.get_event_loop().time() + 5.0 # 5 second timeout
while asyncio.get_event_loop().time() < deadline:
all_completed = True
for i in range(157, 162):
item = queue_manager.get_item(i)
if item is None or item.status != QueueItemStatus.COMPLETED:
all_completed = False
break
if all_completed:
break
await asyncio.sleep(0.05)
await coordinator.stop()
task.cancel() task.cancel()
try: try:
await task await task

View File

@@ -1,6 +1,6 @@
# Base image for all stages # Base image for all stages
# Uses Debian slim (glibc) instead of Alpine (musl) for native addon compatibility. # Uses Debian slim (glibc) instead of Alpine (musl) for native addon compatibility.
FROM node:24-slim AS base FROM git.mosaicstack.dev/mosaic/node-base:24-slim AS base
# Install pnpm globally # Install pnpm globally
RUN corepack enable && corepack prepare pnpm@10.27.0 --activate RUN corepack enable && corepack prepare pnpm@10.27.0 --activate
@@ -21,6 +21,10 @@ FROM base AS deps
COPY packages/shared/package.json ./packages/shared/ COPY packages/shared/package.json ./packages/shared/
COPY packages/config/package.json ./packages/config/ COPY packages/config/package.json ./packages/config/
COPY apps/orchestrator/package.json ./apps/orchestrator/ COPY apps/orchestrator/package.json ./apps/orchestrator/
# API schema is available via apps/orchestrator/prisma/schema.prisma symlink
# Copy npm configuration for native binary architecture hints
COPY .npmrc ./
# Install ALL dependencies (not just production) # Install ALL dependencies (not just production)
# No cache mount — Kaniko builds are ephemeral in CI # No cache mount — Kaniko builds are ephemeral in CI
@@ -43,6 +47,15 @@ COPY --from=deps /app/packages/shared/node_modules ./packages/shared/node_module
COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules COPY --from=deps /app/packages/config/node_modules ./packages/config/node_modules
COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules COPY --from=deps /app/apps/orchestrator/node_modules ./apps/orchestrator/node_modules
# The repo has apps/orchestrator/prisma/schema.prisma as a symlink for CI use.
# Kaniko resolves destination symlinks on COPY, which fails because the symlink
# target (../../api/prisma/schema.prisma) doesn't exist in the container.
# Fix: remove the dangling symlink first, then copy the real schema file there.
RUN rm -f apps/orchestrator/prisma/schema.prisma
COPY apps/api/prisma/schema.prisma ./apps/orchestrator/prisma/schema.prisma
# pnpm turbo build runs prisma:generate (--schema=./prisma/schema.prisma) from the
# orchestrator package context — no cross-package project-root issues.
# Build the orchestrator app using TurboRepo # Build the orchestrator app using TurboRepo
RUN pnpm turbo build --filter=@mosaic/orchestrator RUN pnpm turbo build --filter=@mosaic/orchestrator
@@ -54,7 +67,7 @@ RUN find ./apps/orchestrator/dist \( -name '*.spec.js' -o -name '*.spec.js.map'
# ====================== # ======================
# Production stage # Production stage
# ====================== # ======================
FROM node:24-slim AS production FROM git.mosaicstack.dev/mosaic/node-base:24-slim AS production
# Add metadata labels # Add metadata labels
LABEL maintainer="mosaic-team@mosaicstack.dev" LABEL maintainer="mosaic-team@mosaicstack.dev"
@@ -65,13 +78,12 @@ LABEL org.opencontainers.image.vendor="Mosaic Stack"
LABEL org.opencontainers.image.title="Mosaic Orchestrator" LABEL org.opencontainers.image.title="Mosaic Orchestrator"
LABEL org.opencontainers.image.description="Agent orchestration service for Mosaic Stack" LABEL org.opencontainers.image.description="Agent orchestration service for Mosaic Stack"
# Install dumb-init for proper signal handling (static binary from GitHub, # dumb-init, ca-certificates pre-installed in base image
# avoids apt-get which fails under Kaniko with bookworm GPG signature errors)
ADD https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 /usr/local/bin/dumb-init
# Single RUN to minimize Kaniko filesystem snapshots (each RUN = full snapshot) # Single RUN to minimize Kaniko filesystem snapshots (each RUN = full snapshot)
# - Remove npm/npx to reduce image size (not used in production)
# - Create non-root user
RUN rm -rf /usr/local/lib/node_modules/npm /usr/local/bin/npm /usr/local/bin/npx \ RUN rm -rf /usr/local/lib/node_modules/npm /usr/local/bin/npm /usr/local/bin/npx \
&& chmod 755 /usr/local/bin/dumb-init \
&& groupadd -g 1001 nodejs && useradd -m -u 1001 -g nodejs nestjs && groupadd -g 1001 nodejs && useradd -m -u 1001 -g nodejs nestjs
WORKDIR /app WORKDIR /app

View File

@@ -1,32 +1,35 @@
{ {
"name": "@mosaic/orchestrator", "name": "@mosaic/orchestrator",
"version": "0.0.20", "version": "0.0.23",
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "nest start --watch",
"build": "nest build", "build": "nest build",
"dev": "nest start --watch",
"lint": "eslint src/",
"lint:fix": "eslint src/ --fix",
"prisma:generate": "prisma generate --schema=./prisma/schema.prisma",
"start": "node dist/main.js", "start": "node dist/main.js",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch", "start:debug": "nest start --debug --watch",
"start:dev": "nest start --watch",
"start:prod": "node dist/main.js", "start:prod": "node dist/main.js",
"test": "vitest", "test": "vitest",
"test:watch": "vitest watch",
"test:e2e": "vitest run --config tests/integration/vitest.config.ts", "test:e2e": "vitest run --config tests/integration/vitest.config.ts",
"test:perf": "vitest run --config tests/performance/vitest.config.ts", "test:perf": "vitest run --config tests/performance/vitest.config.ts",
"typecheck": "tsc --noEmit", "test:watch": "vitest watch",
"lint": "eslint src/", "typecheck": "tsc --noEmit"
"lint:fix": "eslint src/ --fix"
}, },
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.72.1", "@anthropic-ai/sdk": "^0.72.1",
"@mosaic/config": "workspace:*", "@mosaic/config": "workspace:*",
"@mosaic/shared": "workspace:*", "@mosaic/shared": "workspace:*",
"@nestjs/axios": "^4.0.1",
"@nestjs/bullmq": "^11.0.4", "@nestjs/bullmq": "^11.0.4",
"@nestjs/common": "^11.1.12", "@nestjs/common": "^11.1.12",
"@nestjs/config": "^4.0.2", "@nestjs/config": "^4.0.2",
"@nestjs/core": "^11.1.12", "@nestjs/core": "^11.1.12",
"@nestjs/platform-express": "^11.1.12", "@nestjs/platform-express": "^11.1.12",
"@nestjs/throttler": "^6.5.0", "@nestjs/throttler": "^6.5.0",
"@prisma/client": "^6.19.2",
"bullmq": "^5.67.2", "bullmq": "^5.67.2",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.1", "class-validator": "^0.14.1",
@@ -45,6 +48,7 @@
"@types/express": "^5.0.1", "@types/express": "^5.0.1",
"@types/node": "^22.13.4", "@types/node": "^22.13.4",
"@vitest/coverage-v8": "^4.0.18", "@vitest/coverage-v8": "^4.0.18",
"prisma": "^6.19.2",
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsconfig-paths": "^4.2.0", "tsconfig-paths": "^4.2.0",
"typescript": "^5.8.2", "typescript": "^5.8.2",

View File

@@ -0,0 +1 @@
../../api/prisma/schema.prisma

View File

@@ -0,0 +1,10 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../prisma/prisma.module";
import { AgentIngestionService } from "./agent-ingestion.service";
@Module({
imports: [PrismaModule],
providers: [AgentIngestionService],
exports: [AgentIngestionService],
})
export class AgentIngestionModule {}

View File

@@ -0,0 +1,141 @@
import { Injectable, Logger } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
export type AgentConversationRole = "agent" | "user" | "system" | "operator";
@Injectable()
export class AgentIngestionService {
private readonly logger = new Logger(AgentIngestionService.name);
constructor(private readonly prisma: PrismaService) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
async recordAgentSpawned(
agentId: string,
parentAgentId?: string,
missionId?: string,
taskId?: string,
agentType?: string
): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
parentSessionId: parentAgentId ?? null,
missionId,
taskId,
agentType,
status: "spawning",
},
update: {
parentSessionId: parentAgentId ?? null,
missionId,
taskId,
agentType,
status: "spawning",
completedAt: null,
},
});
this.logger.debug(`Recorded spawned state for agent ${agentId}`);
}
async recordAgentStarted(agentId: string): Promise<void> {
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "running",
},
update: {
status: "running",
},
});
this.logger.debug(`Recorded running state for agent ${agentId}`);
}
async recordAgentCompleted(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "completed",
completedAt,
},
update: {
status: "completed",
completedAt,
},
});
this.logger.debug(`Recorded completed state for agent ${agentId}`);
}
async recordAgentFailed(agentId: string, error?: string): Promise<void> {
const completedAt = new Date();
const metadata = error ? this.toJsonValue({ error }) : undefined;
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "failed",
completedAt,
...(metadata && { metadata }),
},
update: {
status: "failed",
completedAt,
...(metadata && { metadata }),
},
});
this.logger.debug(`Recorded failed state for agent ${agentId}`);
}
async recordAgentKilled(agentId: string): Promise<void> {
const completedAt = new Date();
await this.prisma.agentSessionTree.upsert({
where: { sessionId: agentId },
create: {
sessionId: agentId,
status: "killed",
completedAt,
},
update: {
status: "killed",
completedAt,
},
});
this.logger.debug(`Recorded killed state for agent ${agentId}`);
}
async recordMessage(
sessionId: string,
role: AgentConversationRole,
content: string,
provider = "internal",
metadata?: Record<string, unknown>
): Promise<void> {
await this.prisma.agentConversationMessage.create({
data: {
sessionId,
role,
content,
provider,
...(metadata && { metadata: this.toJsonValue(metadata) }),
},
});
this.logger.debug(`Recorded message for session ${sessionId}`);
}
}

View File

@@ -0,0 +1,54 @@
import {
Body,
Controller,
Delete,
Get,
Param,
Patch,
Post,
UseGuards,
UsePipes,
ValidationPipe,
} from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentProvidersService } from "./agent-providers.service";
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
@Controller("agent-providers")
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
export class AgentProvidersController {
constructor(private readonly agentProvidersService: AgentProvidersService) {}
@Get()
async list(): Promise<AgentProviderConfig[]> {
return this.agentProvidersService.list();
}
@Get(":id")
async getById(@Param("id") id: string): Promise<AgentProviderConfig> {
return this.agentProvidersService.getById(id);
}
@Post()
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async create(@Body() dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
return this.agentProvidersService.create(dto);
}
@Patch(":id")
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async update(
@Param("id") id: string,
@Body() dto: UpdateAgentProviderDto
): Promise<AgentProviderConfig> {
return this.agentProvidersService.update(id, dto);
}
@Delete(":id")
async delete(@Param("id") id: string): Promise<AgentProviderConfig> {
return this.agentProvidersService.delete(id);
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../../prisma/prisma.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { EncryptionService } from "../../security/encryption.service";
import { AgentProvidersController } from "./agent-providers.controller";
import { AgentProvidersService } from "./agent-providers.service";
@Module({
imports: [PrismaModule],
controllers: [AgentProvidersController],
providers: [OrchestratorApiKeyGuard, EncryptionService, AgentProvidersService],
})
export class AgentProvidersModule {}

View File

@@ -0,0 +1,299 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { NotFoundException } from "@nestjs/common";
import { EncryptionService } from "../../security/encryption.service";
import { AgentProvidersService } from "./agent-providers.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentProvidersService", () => {
let service: AgentProvidersService;
let prisma: {
agentProviderConfig: {
findMany: ReturnType<typeof vi.fn>;
findUnique: ReturnType<typeof vi.fn>;
create: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
delete: ReturnType<typeof vi.fn>;
};
};
let encryptionService: {
encryptIfNeeded: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
prisma = {
agentProviderConfig: {
findMany: vi.fn(),
findUnique: vi.fn(),
create: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
},
};
encryptionService = {
encryptIfNeeded: vi.fn((value: string) => `enc:${value}`),
};
service = new AgentProvidersService(
prisma as unknown as PrismaService,
encryptionService as unknown as EncryptionService
);
});
it("lists all provider configs", async () => {
const expected = [
{
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Primary",
provider: "openai",
gatewayUrl: "https://gateway.example.com",
credentials: {},
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
},
];
prisma.agentProviderConfig.findMany.mockResolvedValue(expected);
const result = await service.list();
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
});
expect(result).toEqual(expected);
});
it("returns a single provider config", async () => {
const expected = {
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Primary",
provider: "openai",
gatewayUrl: "https://gateway.example.com",
credentials: { apiKeyRef: "vault:openai" },
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
};
prisma.agentProviderConfig.findUnique.mockResolvedValue(expected);
const result = await service.getById("cfg-1");
expect(prisma.agentProviderConfig.findUnique).toHaveBeenCalledWith({
where: { id: "cfg-1" },
});
expect(result).toEqual(expected);
});
it("throws NotFoundException when provider config is missing", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
await expect(service.getById("missing")).rejects.toBeInstanceOf(NotFoundException);
});
it("creates a provider config with default credentials", async () => {
const created = {
id: "cfg-created",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "New Provider",
provider: "claude",
gatewayUrl: "https://gateway.example.com",
credentials: {},
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
};
prisma.agentProviderConfig.create.mockResolvedValue(created);
const result = await service.create({
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "New Provider",
provider: "claude",
gatewayUrl: "https://gateway.example.com",
});
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
data: {
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "New Provider",
provider: "claude",
gatewayUrl: "https://gateway.example.com",
credentials: {},
},
});
expect(encryptionService.encryptIfNeeded).not.toHaveBeenCalled();
expect(result).toEqual(created);
});
it("encrypts openclaw token credentials when creating provider config", async () => {
const created = {
id: "cfg-openclaw",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "OpenClaw",
provider: "openclaw",
gatewayUrl: "https://openclaw.example.com",
credentials: { apiToken: "enc:top-secret" },
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
};
prisma.agentProviderConfig.create.mockResolvedValue(created);
const result = await service.create({
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "OpenClaw",
provider: "openclaw",
gatewayUrl: "https://openclaw.example.com",
credentials: { apiToken: "top-secret" },
});
expect(encryptionService.encryptIfNeeded).toHaveBeenCalledWith("top-secret");
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
data: {
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "OpenClaw",
provider: "openclaw",
gatewayUrl: "https://openclaw.example.com",
credentials: { apiToken: "enc:top-secret" },
},
});
expect(result).toEqual(created);
});
it("updates a provider config", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue({
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Primary",
provider: "openai",
gatewayUrl: "https://gateway.example.com",
credentials: {},
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
});
const updated = {
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Secondary",
provider: "openai",
gatewayUrl: "https://gateway2.example.com",
credentials: { apiKeyRef: "vault:new" },
isActive: false,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
};
prisma.agentProviderConfig.update.mockResolvedValue(updated);
const result = await service.update("cfg-1", {
name: "Secondary",
gatewayUrl: "https://gateway2.example.com",
credentials: { apiKeyRef: "vault:new" },
isActive: false,
});
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
where: { id: "cfg-1" },
data: {
name: "Secondary",
gatewayUrl: "https://gateway2.example.com",
credentials: { apiKeyRef: "vault:new" },
isActive: false,
},
});
expect(encryptionService.encryptIfNeeded).not.toHaveBeenCalled();
expect(result).toEqual(updated);
});
it("encrypts openclaw token credentials when updating provider config", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue({
id: "cfg-openclaw",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "OpenClaw",
provider: "openclaw",
gatewayUrl: "https://openclaw.example.com",
credentials: { apiToken: "enc:existing" },
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
});
const updated = {
id: "cfg-openclaw",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "OpenClaw",
provider: "openclaw",
gatewayUrl: "https://openclaw.example.com",
credentials: { apiToken: "enc:rotated-token" },
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
};
prisma.agentProviderConfig.update.mockResolvedValue(updated);
const result = await service.update("cfg-openclaw", {
credentials: { apiToken: "rotated-token" },
});
expect(encryptionService.encryptIfNeeded).toHaveBeenCalledWith("rotated-token");
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
where: { id: "cfg-openclaw" },
data: {
credentials: { apiToken: "enc:rotated-token" },
},
});
expect(result).toEqual(updated);
});
it("throws NotFoundException when updating a missing provider config", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
await expect(service.update("missing", { name: "Updated" })).rejects.toBeInstanceOf(
NotFoundException
);
expect(prisma.agentProviderConfig.update).not.toHaveBeenCalled();
});
it("deletes a provider config", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue({
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Primary",
provider: "openai",
gatewayUrl: "https://gateway.example.com",
credentials: {},
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
});
const deleted = {
id: "cfg-1",
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
name: "Primary",
provider: "openai",
gatewayUrl: "https://gateway.example.com",
credentials: {},
isActive: true,
createdAt: new Date("2026-03-07T18:00:00.000Z"),
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
};
prisma.agentProviderConfig.delete.mockResolvedValue(deleted);
const result = await service.delete("cfg-1");
expect(prisma.agentProviderConfig.delete).toHaveBeenCalledWith({
where: { id: "cfg-1" },
});
expect(result).toEqual(deleted);
});
it("throws NotFoundException when deleting a missing provider config", async () => {
prisma.agentProviderConfig.findUnique.mockResolvedValue(null);
await expect(service.delete("missing")).rejects.toBeInstanceOf(NotFoundException);
expect(prisma.agentProviderConfig.delete).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,102 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import type { AgentProviderConfig, Prisma } from "@prisma/client";
import { EncryptionService } from "../../security/encryption.service";
import { PrismaService } from "../../prisma/prisma.service";
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
const OPENCLAW_PROVIDER_TYPE = "openclaw";
const OPENCLAW_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
@Injectable()
export class AgentProvidersService {
constructor(
private readonly prisma: PrismaService,
private readonly encryptionService: EncryptionService
) {}
async list(): Promise<AgentProviderConfig[]> {
return this.prisma.agentProviderConfig.findMany({
orderBy: [{ createdAt: "desc" }, { id: "desc" }],
});
}
async getById(id: string): Promise<AgentProviderConfig> {
const providerConfig = await this.prisma.agentProviderConfig.findUnique({
where: { id },
});
if (!providerConfig) {
throw new NotFoundException(`Agent provider config with id ${id} not found`);
}
return providerConfig;
}
async create(dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
const credentials = this.sanitizeCredentials(dto.provider, dto.credentials ?? {});
return this.prisma.agentProviderConfig.create({
data: {
workspaceId: dto.workspaceId,
name: dto.name,
provider: dto.provider,
gatewayUrl: dto.gatewayUrl,
credentials: this.toJsonValue(credentials),
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
},
});
}
async update(id: string, dto: UpdateAgentProviderDto): Promise<AgentProviderConfig> {
const existingConfig = await this.getById(id);
const provider = dto.provider ?? existingConfig.provider;
const data: Prisma.AgentProviderConfigUpdateInput = {
...(dto.workspaceId !== undefined ? { workspaceId: dto.workspaceId } : {}),
...(dto.name !== undefined ? { name: dto.name } : {}),
...(dto.provider !== undefined ? { provider: dto.provider } : {}),
...(dto.gatewayUrl !== undefined ? { gatewayUrl: dto.gatewayUrl } : {}),
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
...(dto.credentials !== undefined
? { credentials: this.toJsonValue(this.sanitizeCredentials(provider, dto.credentials)) }
: {}),
};
return this.prisma.agentProviderConfig.update({
where: { id },
data,
});
}
async delete(id: string): Promise<AgentProviderConfig> {
await this.getById(id);
return this.prisma.agentProviderConfig.delete({
where: { id },
});
}
private sanitizeCredentials(
provider: string,
credentials: Record<string, unknown>
): Record<string, unknown> {
if (provider.toLowerCase() !== OPENCLAW_PROVIDER_TYPE) {
return credentials;
}
const nextCredentials: Record<string, unknown> = { ...credentials };
for (const key of OPENCLAW_TOKEN_KEYS) {
const tokenValue = nextCredentials[key];
if (typeof tokenValue === "string" && tokenValue.length > 0) {
nextCredentials[key] = this.encryptionService.encryptIfNeeded(tokenValue);
}
}
return nextCredentials;
}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
}

View File

@@ -0,0 +1,26 @@
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
export class CreateAgentProviderDto {
@IsUUID()
workspaceId!: string;
@IsString()
@IsNotEmpty()
name!: string;
@IsString()
@IsNotEmpty()
provider!: string;
@IsString()
@IsNotEmpty()
gatewayUrl!: string;
@IsOptional()
@IsObject()
credentials?: Record<string, unknown>;
@IsOptional()
@IsBoolean()
isActive?: boolean;
}

View File

@@ -0,0 +1,30 @@
import { IsBoolean, IsNotEmpty, IsObject, IsOptional, IsString, IsUUID } from "class-validator";
export class UpdateAgentProviderDto {
@IsOptional()
@IsUUID()
workspaceId?: string;
@IsOptional()
@IsString()
@IsNotEmpty()
name?: string;
@IsOptional()
@IsString()
@IsNotEmpty()
provider?: string;
@IsOptional()
@IsString()
@IsNotEmpty()
gatewayUrl?: string;
@IsOptional()
@IsObject()
credentials?: Record<string, unknown>;
@IsOptional()
@IsBoolean()
isActive?: boolean;
}

View File

@@ -0,0 +1,172 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentControlService } from "./agent-control.service";
import { PrismaService } from "../../prisma/prisma.service";
import { KillswitchService } from "../../killswitch/killswitch.service";
describe("AgentControlService", () => {
let service: AgentControlService;
let prisma: {
agentSessionTree: {
findUnique: ReturnType<typeof vi.fn>;
updateMany: ReturnType<typeof vi.fn>;
};
agentConversationMessage: {
create: ReturnType<typeof vi.fn>;
};
operatorAuditLog: {
create: ReturnType<typeof vi.fn>;
};
};
let killswitchService: {
killAgent: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findUnique: vi.fn(),
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
agentConversationMessage: {
create: vi.fn().mockResolvedValue(undefined),
},
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
killswitchService = {
killAgent: vi.fn().mockResolvedValue(undefined),
};
service = new AgentControlService(
prisma as unknown as PrismaService,
killswitchService as unknown as KillswitchService
);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("injectMessage", () => {
it("creates conversation message and audit log when tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue({ id: "tree-1" });
await service.injectMessage("agent-123", "operator-abc", "Please continue");
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
where: { sessionId: "agent-123" },
select: { id: true },
});
expect(prisma.agentConversationMessage.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
role: "operator",
content: "Please continue",
provider: "internal",
metadata: {},
},
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-123",
userId: "operator-abc",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Please continue",
},
},
},
});
});
it("creates only audit log when no tree entry exists", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
await service.injectMessage("agent-456", "operator-def", "Nudge message");
expect(prisma.agentConversationMessage.create).not.toHaveBeenCalled();
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-456",
userId: "operator-def",
provider: "internal",
action: "inject",
metadata: {
payload: {
message: "Nudge message",
},
},
},
});
});
});
describe("pauseAgent", () => {
it("updates tree status to paused and creates audit log", async () => {
await service.pauseAgent("agent-789", "operator-pause");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-789" },
data: { status: "paused" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-789",
userId: "operator-pause",
provider: "internal",
action: "pause",
metadata: {
payload: {},
},
},
});
});
});
describe("resumeAgent", () => {
it("updates tree status to running and creates audit log", async () => {
await service.resumeAgent("agent-321", "operator-resume");
expect(prisma.agentSessionTree.updateMany).toHaveBeenCalledWith({
where: { sessionId: "agent-321" },
data: { status: "running" },
});
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-321",
userId: "operator-resume",
provider: "internal",
action: "resume",
metadata: {
payload: {},
},
},
});
});
});
describe("killAgent", () => {
it("delegates kill to killswitch and logs audit", async () => {
await service.killAgent("agent-654", "operator-kill", false);
expect(killswitchService.killAgent).toHaveBeenCalledWith("agent-654");
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: "agent-654",
userId: "operator-kill",
provider: "internal",
action: "kill",
metadata: {
payload: {
force: false,
},
},
},
});
});
});
});

View File

@@ -0,0 +1,77 @@
import { Injectable } from "@nestjs/common";
import type { Prisma } from "@prisma/client";
import { KillswitchService } from "../../killswitch/killswitch.service";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentControlService {
constructor(
private readonly prisma: PrismaService,
private readonly killswitchService: KillswitchService
) {}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
private async createOperatorAuditLog(
agentId: string,
operatorId: string,
action: "inject" | "pause" | "resume" | "kill",
payload: Record<string, unknown>
): Promise<void> {
await this.prisma.operatorAuditLog.create({
data: {
sessionId: agentId,
userId: operatorId,
provider: "internal",
action,
metadata: this.toJsonValue({ payload }),
},
});
}
async injectMessage(agentId: string, operatorId: string, message: string): Promise<void> {
const treeEntry = await this.prisma.agentSessionTree.findUnique({
where: { sessionId: agentId },
select: { id: true },
});
if (treeEntry) {
await this.prisma.agentConversationMessage.create({
data: {
sessionId: agentId,
role: "operator",
content: message,
provider: "internal",
metadata: this.toJsonValue({}),
},
});
}
await this.createOperatorAuditLog(agentId, operatorId, "inject", { message });
}
async pauseAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "paused" },
});
await this.createOperatorAuditLog(agentId, operatorId, "pause", {});
}
async resumeAgent(agentId: string, operatorId: string): Promise<void> {
await this.prisma.agentSessionTree.updateMany({
where: { sessionId: agentId },
data: { status: "running" },
});
await this.createOperatorAuditLog(agentId, operatorId, "resume", {});
}
async killAgent(agentId: string, operatorId: string, force = true): Promise<void> {
await this.killswitchService.killAgent(agentId);
await this.createOperatorAuditLog(agentId, operatorId, "kill", { force });
}
}

View File

@@ -0,0 +1,103 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentMessagesService } from "./agent-messages.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentMessagesService", () => {
let service: AgentMessagesService;
let prisma: {
agentConversationMessage: {
findMany: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentConversationMessage: {
findMany: vi.fn(),
count: vi.fn(),
},
};
service = new AgentMessagesService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("getMessages", () => {
it("returns paginated messages from Prisma", async () => {
const sessionId = "agent-123";
const messages = [
{
id: "msg-1",
sessionId,
provider: "internal",
role: "assistant",
content: "First message",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
metadata: {},
},
{
id: "msg-2",
sessionId,
provider: "internal",
role: "user",
content: "Second message",
timestamp: new Date("2026-03-07T15:59:00.000Z"),
metadata: {},
},
];
prisma.agentConversationMessage.findMany.mockResolvedValue(messages);
prisma.agentConversationMessage.count.mockResolvedValue(2);
const result = await service.getMessages(sessionId, 50, 0);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: 50,
skip: 0,
});
expect(prisma.agentConversationMessage.count).toHaveBeenCalledWith({ where: { sessionId } });
expect(result).toEqual({
messages,
total: 2,
});
});
it("applies limit and cursor (skip) correctly", async () => {
const sessionId = "agent-456";
const limit = 10;
const cursor = 20;
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(42);
await service.getMessages(sessionId, limit, cursor);
expect(prisma.agentConversationMessage.findMany).toHaveBeenCalledWith({
where: { sessionId },
orderBy: { timestamp: "desc" },
take: limit,
skip: cursor,
});
});
it("returns empty messages array when no messages exist", async () => {
const sessionId = "agent-empty";
prisma.agentConversationMessage.findMany.mockResolvedValue([]);
prisma.agentConversationMessage.count.mockResolvedValue(0);
const result = await service.getMessages(sessionId, 25, 0);
expect(result).toEqual({
messages: [],
total: 0,
});
});
});
});

View File

@@ -0,0 +1,84 @@
import { Injectable } from "@nestjs/common";
import { type AgentConversationMessage, type Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
@Injectable()
export class AgentMessagesService {
constructor(private readonly prisma: PrismaService) {}
async getMessages(
sessionId: string,
limit: number,
skip: number
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
const where = { sessionId };
const [messages, total] = await Promise.all([
this.prisma.agentConversationMessage.findMany({
where,
orderBy: {
timestamp: "desc",
},
take: limit,
skip,
}),
this.prisma.agentConversationMessage.count({ where }),
]);
return {
messages,
total,
};
}
async getReplayMessages(sessionId: string, limit = 50): Promise<AgentConversationMessage[]> {
const messages = await this.prisma.agentConversationMessage.findMany({
where: { sessionId },
orderBy: {
timestamp: "desc",
},
take: limit,
});
return messages.reverse();
}
async getMessagesAfter(
sessionId: string,
lastSeenTimestamp: Date,
lastSeenMessageId: string | null
): Promise<AgentConversationMessage[]> {
const where: Prisma.AgentConversationMessageWhereInput = {
sessionId,
...(lastSeenMessageId
? {
OR: [
{
timestamp: {
gt: lastSeenTimestamp,
},
},
{
timestamp: lastSeenTimestamp,
id: {
gt: lastSeenMessageId,
},
},
],
}
: {
timestamp: {
gt: lastSeenTimestamp,
},
}),
};
return this.prisma.agentConversationMessage.findMany({
where,
orderBy: [{ timestamp: "asc" }, { id: "asc" }],
});
}
}

View File

@@ -0,0 +1,202 @@
import { Logger } from "@nestjs/common";
import type {
AgentMessage,
AgentSession,
AgentSessionList,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { AgentProviderRegistry } from "./agent-provider.registry";
import { InternalAgentProvider } from "./internal-agent.provider";
type MockProvider = IAgentProvider & {
listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
};
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
return;
};
const createProvider = (providerId: string, sessions: AgentSession[] = []): MockProvider => {
return {
providerId,
providerType: providerId,
displayName: providerId,
listSessions: vi.fn().mockResolvedValue({
sessions,
total: sessions.length,
} as AgentSessionList),
getSession: vi.fn().mockResolvedValue(null),
getMessages: vi.fn().mockResolvedValue([]),
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
pauseSession: vi.fn().mockResolvedValue(undefined),
resumeSession: vi.fn().mockResolvedValue(undefined),
killSession: vi.fn().mockResolvedValue(undefined),
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
isAvailable: vi.fn().mockResolvedValue(true),
};
};
describe("AgentProviderRegistry", () => {
let registry: AgentProviderRegistry;
let internalProvider: MockProvider;
beforeEach(() => {
internalProvider = createProvider("internal");
registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider);
});
afterEach(() => {
vi.restoreAllMocks();
});
it("registers InternalAgentProvider on module init", () => {
registry.onModuleInit();
expect(registry.getProvider("internal")).toBe(internalProvider);
});
it("registers providers and returns null for unknown provider ids", () => {
const externalProvider = createProvider("openclaw");
registry.registerProvider(externalProvider);
expect(registry.getProvider("openclaw")).toBe(externalProvider);
expect(registry.getProvider("missing")).toBeNull();
});
it("aggregates and sorts sessions from all providers", async () => {
const internalSessions: AgentSession[] = [
{
id: "session-older",
providerId: "internal",
providerType: "internal",
status: "active",
createdAt: new Date("2026-03-07T10:00:00.000Z"),
updatedAt: new Date("2026-03-07T10:10:00.000Z"),
},
];
const externalSessions: AgentSession[] = [
{
id: "session-newer",
providerId: "openclaw",
providerType: "external",
status: "paused",
createdAt: new Date("2026-03-07T09:00:00.000Z"),
updatedAt: new Date("2026-03-07T10:20:00.000Z"),
},
];
internalProvider.listSessions.mockResolvedValue({
sessions: internalSessions,
total: internalSessions.length,
} as AgentSessionList);
const externalProvider = createProvider("openclaw", externalSessions);
registry.onModuleInit();
registry.registerProvider(externalProvider);
const result = await registry.listAllSessions();
expect(result.map((session) => session.id)).toEqual(["session-newer", "session-older"]);
expect(internalProvider.listSessions).toHaveBeenCalledTimes(1);
expect(externalProvider.listSessions).toHaveBeenCalledTimes(1);
});
it("skips provider failures and logs warning", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const healthyProvider = createProvider("healthy", [
{
id: "session-1",
providerId: "healthy",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T11:00:00.000Z"),
updatedAt: new Date("2026-03-07T11:00:00.000Z"),
},
]);
const failingProvider = createProvider("failing");
failingProvider.listSessions.mockRejectedValue(new Error("provider offline"));
registry.onModuleInit();
registry.registerProvider(healthyProvider);
registry.registerProvider(failingProvider);
const result = await registry.listAllSessions();
expect(result).toHaveLength(1);
expect(result[0]?.id).toBe("session-1");
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Failed to list sessions for provider failing")
);
});
it("finds a provider for an existing session", async () => {
const targetSession: AgentSession = {
id: "session-found",
providerId: "openclaw",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T12:00:00.000Z"),
updatedAt: new Date("2026-03-07T12:10:00.000Z"),
};
const openclawProvider = createProvider("openclaw");
openclawProvider.getSession.mockResolvedValue(targetSession);
registry.onModuleInit();
registry.registerProvider(openclawProvider);
const result = await registry.getProviderForSession(targetSession.id);
expect(result).toEqual({
provider: openclawProvider,
session: targetSession,
});
expect(internalProvider.getSession).toHaveBeenCalledWith(targetSession.id);
expect(openclawProvider.getSession).toHaveBeenCalledWith(targetSession.id);
});
it("returns null when no provider has the requested session", async () => {
const openclawProvider = createProvider("openclaw");
registry.onModuleInit();
registry.registerProvider(openclawProvider);
await expect(registry.getProviderForSession("missing-session")).resolves.toBeNull();
});
it("continues searching providers when getSession throws", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const failingProvider = createProvider("failing");
failingProvider.getSession.mockRejectedValue(new Error("provider timeout"));
const healthySession: AgentSession = {
id: "session-healthy",
providerId: "healthy",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T12:15:00.000Z"),
updatedAt: new Date("2026-03-07T12:16:00.000Z"),
};
const healthyProvider = createProvider("healthy");
healthyProvider.getSession.mockResolvedValue(healthySession);
registry.onModuleInit();
registry.registerProvider(failingProvider);
registry.registerProvider(healthyProvider);
const result = await registry.getProviderForSession(healthySession.id);
expect(result).toEqual({ provider: healthyProvider, session: healthySession });
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Failed to get session session-healthy for provider failing")
);
});
});

View File

@@ -0,0 +1,79 @@
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
import type { AgentSession, IAgentProvider } from "@mosaic/shared";
import { InternalAgentProvider } from "./internal-agent.provider";
@Injectable()
export class AgentProviderRegistry implements OnModuleInit {
private readonly logger = new Logger(AgentProviderRegistry.name);
private readonly providers = new Map<string, IAgentProvider>();
constructor(private readonly internalProvider: InternalAgentProvider) {}
onModuleInit(): void {
this.registerProvider(this.internalProvider);
}
registerProvider(provider: IAgentProvider): void {
const existingProvider = this.providers.get(provider.providerId);
if (existingProvider !== undefined) {
this.logger.warn(`Replacing existing provider registration for ${provider.providerId}`);
}
this.providers.set(provider.providerId, provider);
}
getProvider(providerId: string): IAgentProvider | null {
return this.providers.get(providerId) ?? null;
}
async getProviderForSession(
sessionId: string
): Promise<{ provider: IAgentProvider; session: AgentSession } | null> {
for (const provider of this.providers.values()) {
try {
const session = await provider.getSession(sessionId);
if (session !== null) {
return {
provider,
session,
};
}
} catch (error) {
this.logger.warn(
`Failed to get session ${sessionId} for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
);
}
}
return null;
}
async listAllSessions(): Promise<AgentSession[]> {
const providers = [...this.providers.values()];
const sessionsByProvider = await Promise.all(
providers.map(async (provider) => {
try {
const { sessions } = await provider.listSessions();
return sessions;
} catch (error) {
this.logger.warn(
`Failed to list sessions for provider ${provider.providerId}: ${this.toErrorMessage(error)}`
);
return [];
}
})
);
return sessionsByProvider
.flat()
.sort((left, right) => right.updatedAt.getTime() - left.updatedAt.getTime());
}
private toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
}

View File

@@ -0,0 +1,245 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { AgentTreeService } from "./agent-tree.service";
import { PrismaService } from "../../prisma/prisma.service";
describe("AgentTreeService", () => {
let service: AgentTreeService;
let prisma: {
agentSessionTree: {
findMany: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
findUnique: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
prisma = {
agentSessionTree: {
findMany: vi.fn(),
count: vi.fn(),
findUnique: vi.fn(),
},
};
service = new AgentTreeService(prisma as unknown as PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("listSessions", () => {
it("returns paginated sessions and cursor", async () => {
const sessions = [
{
id: "tree-2",
sessionId: "agent-2",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-2",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
completedAt: null,
metadata: {},
},
{
id: "tree-1",
sessionId: "agent-1",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: null,
metadata: {},
},
];
prisma.agentSessionTree.findMany.mockResolvedValue(sessions);
prisma.agentSessionTree.count.mockResolvedValue(7);
const result = await service.listSessions(undefined, 2);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: undefined,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 2,
});
expect(prisma.agentSessionTree.count).toHaveBeenCalledWith();
expect(result.sessions).toEqual(sessions);
expect(result.total).toBe(7);
expect(result.cursor).toBeTypeOf("string");
});
it("applies cursor filter when provided", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
prisma.agentSessionTree.count.mockResolvedValue(0);
const cursorDate = "2026-03-07T10:00:00.000Z";
const cursorSessionId = "agent-5";
const cursor = Buffer.from(
JSON.stringify({
spawnedAt: cursorDate,
sessionId: cursorSessionId,
}),
"utf8"
).toString("base64url");
await service.listSessions(cursor, 25);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: {
OR: [
{
spawnedAt: {
lt: new Date(cursorDate),
},
},
{
spawnedAt: new Date(cursorDate),
sessionId: {
lt: cursorSessionId,
},
},
],
},
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 25,
});
});
it("ignores invalid cursor values", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
prisma.agentSessionTree.count.mockResolvedValue(0);
await service.listSessions("invalid-cursor", 10);
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
where: undefined,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: 10,
});
});
});
describe("getSession", () => {
it("returns matching session entry", async () => {
const session = {
id: "tree-1",
sessionId: "agent-123",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T11:00:00.000Z"),
completedAt: null,
metadata: {},
};
prisma.agentSessionTree.findUnique.mockResolvedValue(session);
const result = await service.getSession("agent-123");
expect(prisma.agentSessionTree.findUnique).toHaveBeenCalledWith({
where: { sessionId: "agent-123" },
});
expect(result).toEqual(session);
});
it("returns null when session does not exist", async () => {
prisma.agentSessionTree.findUnique.mockResolvedValue(null);
const result = await service.getSession("agent-missing");
expect(result).toBeNull();
});
});
describe("getTree", () => {
it("returns mapped entries from Prisma", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-1",
sessionId: "agent-1",
parentSessionId: "agent-root",
provider: "internal",
missionId: "mission-1",
taskId: "task-1",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: new Date("2026-03-07T11:00:00.000Z"),
metadata: {},
},
]);
const result = await service.getTree();
expect(prisma.agentSessionTree.findMany).toHaveBeenCalledWith({
orderBy: { spawnedAt: "desc" },
take: 200,
});
expect(result).toEqual([
{
sessionId: "agent-1",
parentSessionId: "agent-root",
status: "running",
agentType: "worker",
taskSource: "queue",
spawnedAt: "2026-03-07T10:00:00.000Z",
completedAt: "2026-03-07T11:00:00.000Z",
},
]);
});
it("returns empty array when no entries exist", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([]);
const result = await service.getTree();
expect(result).toEqual([]);
});
it("maps null parentSessionId and completedAt correctly", async () => {
prisma.agentSessionTree.findMany.mockResolvedValue([
{
id: "tree-2",
sessionId: "agent-root",
parentSessionId: null,
provider: "internal",
missionId: null,
taskId: null,
taskSource: null,
agentType: null,
status: "spawning",
spawnedAt: new Date("2026-03-07T09:00:00.000Z"),
completedAt: null,
metadata: {},
},
]);
const result = await service.getTree();
expect(result).toEqual([
{
sessionId: "agent-root",
parentSessionId: null,
status: "spawning",
agentType: null,
taskSource: null,
spawnedAt: "2026-03-07T09:00:00.000Z",
completedAt: null,
},
]);
});
});
});

View File

@@ -0,0 +1,146 @@
import { Injectable } from "@nestjs/common";
import type { AgentSessionTree, Prisma } from "@prisma/client";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
import { PrismaService } from "../../prisma/prisma.service";
const DEFAULT_PAGE_LIMIT = 50;
const MAX_PAGE_LIMIT = 200;
interface SessionCursor {
spawnedAt: Date;
sessionId: string;
}
export interface AgentSessionTreeListResult {
sessions: AgentSessionTree[];
total: number;
cursor?: string;
}
@Injectable()
export class AgentTreeService {
constructor(private readonly prisma: PrismaService) {}
async listSessions(
cursor?: string,
limit = DEFAULT_PAGE_LIMIT
): Promise<AgentSessionTreeListResult> {
const safeLimit = this.normalizeLimit(limit);
const parsedCursor = this.parseCursor(cursor);
const where: Prisma.AgentSessionTreeWhereInput | undefined = parsedCursor
? {
OR: [
{
spawnedAt: {
lt: parsedCursor.spawnedAt,
},
},
{
spawnedAt: parsedCursor.spawnedAt,
sessionId: {
lt: parsedCursor.sessionId,
},
},
],
}
: undefined;
const [sessions, total] = await Promise.all([
this.prisma.agentSessionTree.findMany({
where,
orderBy: [{ spawnedAt: "desc" }, { sessionId: "desc" }],
take: safeLimit,
}),
this.prisma.agentSessionTree.count(),
]);
const nextCursor =
sessions.length === safeLimit
? this.serializeCursor(sessions[sessions.length - 1])
: undefined;
return {
sessions,
total,
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
};
}
async getSession(sessionId: string): Promise<AgentSessionTree | null> {
return this.prisma.agentSessionTree.findUnique({
where: { sessionId },
});
}
async getTree(): Promise<AgentTreeResponseDto[]> {
const entries = await this.prisma.agentSessionTree.findMany({
orderBy: { spawnedAt: "desc" },
take: 200,
});
const response: AgentTreeResponseDto[] = [];
for (const entry of entries) {
response.push({
sessionId: entry.sessionId,
parentSessionId: entry.parentSessionId ?? null,
status: entry.status,
agentType: entry.agentType ?? null,
taskSource: entry.taskSource ?? null,
spawnedAt: entry.spawnedAt.toISOString(),
completedAt: entry.completedAt?.toISOString() ?? null,
});
}
return response;
}
private normalizeLimit(limit: number): number {
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_PAGE_LIMIT;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_PAGE_LIMIT);
}
private serializeCursor(entry: Pick<AgentSessionTree, "spawnedAt" | "sessionId">): string {
return Buffer.from(
JSON.stringify({
spawnedAt: entry.spawnedAt.toISOString(),
sessionId: entry.sessionId,
}),
"utf8"
).toString("base64url");
}
private parseCursor(cursor?: string): SessionCursor | null {
if (!cursor) {
return null;
}
try {
const decoded = Buffer.from(cursor, "base64url").toString("utf8");
const parsed = JSON.parse(decoded) as {
spawnedAt?: string;
sessionId?: string;
};
if (typeof parsed.spawnedAt !== "string" || typeof parsed.sessionId !== "string") {
return null;
}
const spawnedAt = new Date(parsed.spawnedAt);
if (Number.isNaN(spawnedAt.getTime())) {
return null;
}
return {
spawnedAt,
sessionId: parsed.sessionId,
};
} catch {
return null;
}
}
}

View File

@@ -5,6 +5,9 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import type { KillAllResult } from "../../killswitch/killswitch.service"; import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => { describe("AgentsController - Killswitch Endpoints", () => {
@@ -27,6 +30,20 @@ describe("AgentsController - Killswitch Endpoints", () => {
subscribe: ReturnType<typeof vi.fn>; subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>; getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
let mockMessagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let mockControlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let mockTreeService: {
getTree: ReturnType<typeof vi.fn>;
}; };
beforeEach(() => { beforeEach(() => {
@@ -61,6 +78,23 @@ describe("AgentsController - Killswitch Endpoints", () => {
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
data: { heartbeat: true }, data: { heartbeat: true },
}), }),
getRecentEvents: vi.fn().mockReturnValue([]),
};
mockMessagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
mockControlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
mockTreeService = {
getTree: vi.fn().mockResolvedValue([]),
}; };
controller = new AgentsController( controller = new AgentsController(
@@ -68,7 +102,10 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockSpawnerService as unknown as AgentSpawnerService, mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService, mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService, mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService mockEventsService as unknown as AgentEventsService,
mockMessagesService as unknown as AgentMessagesService,
mockControlService as unknown as AgentControlService,
mockTreeService as unknown as AgentTreeService
); );
}); });

View File

@@ -4,6 +4,9 @@ import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => { describe("AgentsController", () => {
@@ -30,6 +33,19 @@ describe("AgentsController", () => {
createHeartbeat: ReturnType<typeof vi.fn>; createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>; getRecentEvents: ReturnType<typeof vi.fn>;
}; };
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let controlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
};
let treeService: {
getTree: ReturnType<typeof vi.fn>;
};
beforeEach(() => { beforeEach(() => {
// Create mock services // Create mock services
@@ -69,13 +85,32 @@ describe("AgentsController", () => {
getRecentEvents: vi.fn().mockReturnValue([]), getRecentEvents: vi.fn().mockReturnValue([]),
}; };
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn().mockResolvedValue([]),
getMessagesAfter: vi.fn().mockResolvedValue([]),
};
controlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
};
treeService = {
getTree: vi.fn().mockResolvedValue([]),
};
// Create controller with mocked services // Create controller with mocked services
controller = new AgentsController( controller = new AgentsController(
queueService as unknown as QueueService, queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService, spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService, lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService, killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService eventsService as unknown as AgentEventsService,
messagesService as unknown as AgentMessagesService,
controlService as unknown as AgentControlService,
treeService as unknown as AgentTreeService
); );
}); });
@@ -87,6 +122,27 @@ describe("AgentsController", () => {
expect(controller).toBeDefined(); expect(controller).toBeDefined();
}); });
describe("getAgentTree", () => {
it("should return tree entries", async () => {
const entries = [
{
sessionId: "agent-1",
parentSessionId: null,
status: "running",
agentType: "worker",
taskSource: "internal",
spawnedAt: "2026-03-07T00:00:00.000Z",
completedAt: null,
},
];
treeService.getTree.mockResolvedValue(entries);
await expect(controller.getAgentTree()).resolves.toEqual(entries);
expect(treeService.getTree).toHaveBeenCalledTimes(1);
});
});
describe("listAgents", () => { describe("listAgents", () => {
it("should return empty array when no agents exist", () => { it("should return empty array when no agents exist", () => {
// Arrange // Arrange
@@ -365,6 +421,93 @@ describe("AgentsController", () => {
}); });
}); });
describe("agent control endpoints", () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
it("should inject an operator message", async () => {
const req = { apiKey: "control-key" };
const result = await controller.injectAgentMessage(
agentId,
{ message: "pause and summarize" },
req
);
expect(controlService.injectMessage).toHaveBeenCalledWith(
agentId,
"control-key",
"pause and summarize"
);
expect(result).toEqual({ message: `Message injected into agent ${agentId}` });
});
it("should default operator id when request api key is missing", async () => {
await controller.injectAgentMessage(agentId, { message: "continue" }, {});
expect(controlService.injectMessage).toHaveBeenCalledWith(agentId, "operator", "continue");
});
it("should pause an agent", async () => {
const result = await controller.pauseAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.pauseAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} paused` });
});
it("should resume an agent", async () => {
const result = await controller.resumeAgent(agentId, {}, { apiKey: "ops-user" });
expect(controlService.resumeAgent).toHaveBeenCalledWith(agentId, "ops-user");
expect(result).toEqual({ message: `Agent ${agentId} resumed` });
});
});
describe("getAgentMessages", () => {
it("should return paginated message history", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 25,
skip: 10,
};
const response = {
messages: [
{
id: "msg-1",
sessionId: agentId,
role: "agent",
content: "hello",
provider: "internal",
timestamp: new Date("2026-03-07T03:00:00.000Z"),
metadata: {},
},
],
total: 101,
};
messagesService.getMessages.mockResolvedValue(response);
const result = await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 25, 10);
expect(result).toEqual(response);
});
it("should use default pagination values", async () => {
const agentId = "0b64079f-4487-42b9-92eb-cf8ea0042a64";
const query = {
limit: 50,
skip: 0,
};
messagesService.getMessages.mockResolvedValue({ messages: [], total: 0 });
await controller.getAgentMessages(agentId, query);
expect(messagesService.getMessages).toHaveBeenCalledWith(agentId, 50, 0);
});
});
describe("getRecentEvents", () => { describe("getRecentEvents", () => {
it("should return recent events with default limit", () => { it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([ eventsService.getRecentEvents.mockReturnValue([

View File

@@ -14,7 +14,9 @@ import {
Sse, Sse,
MessageEvent, MessageEvent,
Query, Query,
Request,
} from "@nestjs/common"; } from "@nestjs/common";
import type { AgentConversationMessage } from "@prisma/client";
import { Throttle } from "@nestjs/throttler"; import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs"; import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service"; import { QueueService } from "../../queue/queue.service";
@@ -25,6 +27,13 @@ import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"; import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { GetMessagesQueryDto } from "./dto/get-messages-query.dto";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { AgentTreeResponseDto } from "./dto/agent-tree-response.dto";
import { InjectAgentDto } from "./dto/inject-agent.dto";
import { PauseAgentDto, ResumeAgentDto } from "./dto/control-agent.dto";
/** /**
* Controller for agent management endpoints * Controller for agent management endpoints
@@ -47,7 +56,10 @@ export class AgentsController {
private readonly spawnerService: AgentSpawnerService, private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService, private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService, private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService private readonly eventsService: AgentEventsService,
private readonly messagesService: AgentMessagesService,
private readonly agentControlService: AgentControlService,
private readonly agentTreeService: AgentTreeService
) {} ) {}
/** /**
@@ -69,6 +81,7 @@ export class AgentsController {
// Spawn agent using spawner service // Spawn agent using spawner service
const spawnResponse = this.spawnerService.spawnAgent({ const spawnResponse = this.spawnerService.spawnAgent({
taskId: dto.taskId, taskId: dto.taskId,
...(dto.parentAgentId !== undefined ? { parentAgentId: dto.parentAgentId } : {}),
agentType: dto.agentType, agentType: dto.agentType,
context: dto.context, context: dto.context,
}); });
@@ -133,7 +146,7 @@ export class AgentsController {
* Return recent orchestrator events for non-streaming consumers. * Return recent orchestrator events for non-streaming consumers.
*/ */
@Get("events/recent") @Get("events/recent")
@Throttle({ status: { limit: 200, ttl: 60000 } }) @Throttle({ default: { limit: 1000, ttl: 60000 } })
getRecentEvents(@Query("limit") limit?: string): { getRecentEvents(@Query("limit") limit?: string): {
events: ReturnType<AgentEventsService["getRecentEvents"]>; events: ReturnType<AgentEventsService["getRecentEvents"]>;
} { } {
@@ -143,6 +156,13 @@ export class AgentsController {
}; };
} }
@Get("tree")
@UseGuards(OrchestratorApiKeyGuard)
@Throttle({ default: { limit: 200, ttl: 60000 } })
async getAgentTree(): Promise<AgentTreeResponseDto[]> {
return this.agentTreeService.getTree();
}
/** /**
* List all agents * List all agents
* @returns Array of all agent sessions with their status * @returns Array of all agent sessions with their status
@@ -185,6 +205,107 @@ export class AgentsController {
} }
} }
/**
* Get paginated message history for an agent.
*/
@Get(":agentId/messages")
@Throttle({ status: { limit: 200, ttl: 60000 } })
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getAgentMessages(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Query() query: GetMessagesQueryDto
): Promise<{
messages: AgentConversationMessage[];
total: number;
}> {
return this.messagesService.getMessages(agentId, query.limit, query.skip);
}
/**
* Stream per-agent conversation messages as server-sent events (SSE).
*/
@Sse(":agentId/messages/stream")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamAgentMessages(@Param("agentId", ParseUUIDPipe) agentId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
const emitMessage = (message: AgentConversationMessage): void => {
if (isClosed) {
return;
}
subscriber.next({
data: this.toMessageStreamPayload(message),
});
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
};
void this.messagesService
.getReplayMessages(agentId, 50)
.then((messages) => {
if (isClosed) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
if (messages.length === 0) {
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
}
})
.catch((error: unknown) => {
this.logger.error(
`Failed to load replay messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
lastSeenTimestamp = new Date();
lastSeenMessageId = null;
});
const pollInterval = setInterval(() => {
if (isClosed) {
return;
}
void this.messagesService
.getMessagesAfter(agentId, lastSeenTimestamp, lastSeenMessageId)
.then((messages) => {
if (isClosed || messages.length === 0) {
return;
}
messages.forEach((message) => {
emitMessage(message);
});
})
.catch((error: unknown) => {
this.logger.error(
`Failed to poll messages for ${agentId}: ${error instanceof Error ? error.message : String(error)}`
);
});
}, 1000);
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: { type: "heartbeat" } });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(pollInterval);
clearInterval(heartbeat);
};
});
}
/** /**
* Get agent status * Get agent status
* @param agentId Agent ID to query * @param agentId Agent ID to query
@@ -269,6 +390,57 @@ export class AgentsController {
} }
} }
@Post(":agentId/inject")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async injectAgentMessage(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() dto: InjectAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.injectMessage(agentId, operatorId, dto.message);
return {
message: `Message injected into agent ${agentId}`,
};
}
@Post(":agentId/pause")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async pauseAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: PauseAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.pauseAgent(agentId, operatorId);
return {
message: `Agent ${agentId} paused`,
};
}
@Post(":agentId/resume")
@Throttle({ default: { limit: 10, ttl: 60000 } })
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async resumeAgent(
@Param("agentId", ParseUUIDPipe) agentId: string,
@Body() _dto: ResumeAgentDto,
@Request() req: { apiKey?: string }
): Promise<{ message: string }> {
const operatorId = req.apiKey ?? "operator";
await this.agentControlService.resumeAgent(agentId, operatorId);
return {
message: `Agent ${agentId} resumed`,
};
}
/** /**
* Kill all active agents * Kill all active agents
* @returns Summary of kill operation * @returns Summary of kill operation
@@ -301,4 +473,24 @@ export class AgentsController {
throw error; throw error;
} }
} }
private toMessageStreamPayload(message: AgentConversationMessage): {
messageId: string;
sessionId: string;
role: string;
content: string;
provider: string;
timestamp: string;
metadata: unknown;
} {
return {
messageId: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
provider: message.provider,
timestamp: message.timestamp.toISOString(),
metadata: message.metadata,
};
}
} }

View File

@@ -6,10 +6,25 @@ import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module"; import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service"; import { AgentEventsService } from "./agent-events.service";
import { PrismaModule } from "../../prisma/prisma.module";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentControlService } from "./agent-control.service";
import { AgentTreeService } from "./agent-tree.service";
import { InternalAgentProvider } from "./internal-agent.provider";
import { AgentProviderRegistry } from "./agent-provider.registry";
@Module({ @Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule], imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule, PrismaModule],
controllers: [AgentsController], controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard, AgentEventsService], providers: [
OrchestratorApiKeyGuard,
AgentEventsService,
AgentMessagesService,
AgentControlService,
AgentTreeService,
InternalAgentProvider,
AgentProviderRegistry,
],
exports: [InternalAgentProvider, AgentProviderRegistry],
}) })
export class AgentsModule {} export class AgentsModule {}

View File

@@ -0,0 +1,9 @@
export class AgentTreeResponseDto {
sessionId!: string;
parentSessionId!: string | null;
status!: string;
agentType!: string | null;
taskSource!: string | null;
spawnedAt!: string;
completedAt!: string | null;
}

View File

@@ -0,0 +1,3 @@
export class PauseAgentDto {}
export class ResumeAgentDto {}

View File

@@ -0,0 +1,37 @@
import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { describe, expect, it } from "vitest";
import { GetMessagesQueryDto } from "./get-messages-query.dto";
describe("GetMessagesQueryDto", () => {
it("should use defaults when empty", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {});
const errors = await validate(dto);
expect(errors).toHaveLength(0);
expect(dto.limit).toBe(50);
expect(dto.skip).toBe(0);
});
it("should reject limit greater than 200", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 201,
skip: 0,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "limit")).toBe(true);
});
it("should reject negative skip", async () => {
const dto = plainToInstance(GetMessagesQueryDto, {
limit: 50,
skip: -1,
});
const errors = await validate(dto);
expect(errors.length).toBeGreaterThan(0);
expect(errors.some((error) => error.property === "skip")).toBe(true);
});
});

View File

@@ -0,0 +1,17 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, Max, Min } from "class-validator";
export class GetMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(0)
skip = 0;
}

View File

@@ -0,0 +1,7 @@
import { IsNotEmpty, IsString } from "class-validator";
export class InjectAgentDto {
@IsString()
@IsNotEmpty()
message!: string;
}

View File

@@ -116,6 +116,10 @@ export class SpawnAgentDto {
@IsOptional() @IsOptional()
@IsIn(["strict", "standard", "minimal", "custom"]) @IsIn(["strict", "standard", "minimal", "custom"])
gateProfile?: GateProfileType; gateProfile?: GateProfileType;
@IsOptional()
@IsString()
parentAgentId?: string;
} }
/** /**

View File

@@ -0,0 +1,216 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
import { AgentControlService } from "./agent-control.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentTreeService } from "./agent-tree.service";
import { InternalAgentProvider } from "./internal-agent.provider";
describe("InternalAgentProvider", () => {
let provider: InternalAgentProvider;
let messagesService: {
getMessages: ReturnType<typeof vi.fn>;
getReplayMessages: ReturnType<typeof vi.fn>;
getMessagesAfter: ReturnType<typeof vi.fn>;
};
let controlService: {
injectMessage: ReturnType<typeof vi.fn>;
pauseAgent: ReturnType<typeof vi.fn>;
resumeAgent: ReturnType<typeof vi.fn>;
killAgent: ReturnType<typeof vi.fn>;
};
let treeService: {
listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
messagesService = {
getMessages: vi.fn(),
getReplayMessages: vi.fn(),
getMessagesAfter: vi.fn(),
};
controlService = {
injectMessage: vi.fn().mockResolvedValue(undefined),
pauseAgent: vi.fn().mockResolvedValue(undefined),
resumeAgent: vi.fn().mockResolvedValue(undefined),
killAgent: vi.fn().mockResolvedValue(undefined),
};
treeService = {
listSessions: vi.fn(),
getSession: vi.fn(),
};
provider = new InternalAgentProvider(
messagesService as unknown as AgentMessagesService,
controlService as unknown as AgentControlService,
treeService as unknown as AgentTreeService
);
});
it("maps paginated sessions", async () => {
const sessionEntry: AgentSessionTree = {
id: "tree-1",
sessionId: "session-1",
parentSessionId: "parent-1",
provider: "internal",
missionId: null,
taskId: "task-123",
taskSource: "queue",
agentType: "worker",
status: "running",
spawnedAt: new Date("2026-03-07T10:00:00.000Z"),
completedAt: null,
metadata: { branch: "feat/test" },
};
treeService.listSessions.mockResolvedValue({
sessions: [sessionEntry],
total: 1,
cursor: "next-cursor",
});
const result = await provider.listSessions("cursor-1", 25);
expect(treeService.listSessions).toHaveBeenCalledWith("cursor-1", 25);
expect(result).toEqual({
sessions: [
{
id: "session-1",
providerId: "internal",
providerType: "internal",
label: "task-123",
status: "active",
parentSessionId: "parent-1",
createdAt: new Date("2026-03-07T10:00:00.000Z"),
updatedAt: new Date("2026-03-07T10:00:00.000Z"),
metadata: { branch: "feat/test" },
},
],
total: 1,
cursor: "next-cursor",
});
});
it("returns null for missing session", async () => {
treeService.getSession.mockResolvedValue(null);
const result = await provider.getSession("missing-session");
expect(treeService.getSession).toHaveBeenCalledWith("missing-session");
expect(result).toBeNull();
});
it("maps message history and parses skip cursor", async () => {
const message: AgentConversationMessage = {
id: "msg-1",
sessionId: "session-1",
provider: "internal",
role: "agent",
content: "hello",
timestamp: new Date("2026-03-07T10:05:00.000Z"),
metadata: { tokens: 42 },
};
messagesService.getMessages.mockResolvedValue({
messages: [message],
total: 10,
});
const result = await provider.getMessages("session-1", 30, "2");
expect(messagesService.getMessages).toHaveBeenCalledWith("session-1", 30, 2);
expect(result).toEqual([
{
id: "msg-1",
sessionId: "session-1",
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T10:05:00.000Z"),
metadata: { tokens: 42 },
},
]);
});
it("routes control operations through AgentControlService", async () => {
const injectResult = await provider.injectMessage("session-1", "new instruction");
await provider.pauseSession("session-1");
await provider.resumeSession("session-1");
await provider.killSession("session-1", false);
expect(controlService.injectMessage).toHaveBeenCalledWith(
"session-1",
"internal-provider",
"new instruction"
);
expect(injectResult).toEqual({ accepted: true });
expect(controlService.pauseAgent).toHaveBeenCalledWith("session-1", "internal-provider");
expect(controlService.resumeAgent).toHaveBeenCalledWith("session-1", "internal-provider");
expect(controlService.killAgent).toHaveBeenCalledWith("session-1", "internal-provider", false);
});
it("streams replay and incremental messages", async () => {
const replayMessage: AgentConversationMessage = {
id: "msg-replay",
sessionId: "session-1",
provider: "internal",
role: "agent",
content: "replay",
timestamp: new Date("2026-03-07T10:00:00.000Z"),
metadata: {},
};
const incrementalMessage: AgentConversationMessage = {
id: "msg-live",
sessionId: "session-1",
provider: "internal",
role: "operator",
content: "live",
timestamp: new Date("2026-03-07T10:00:01.000Z"),
metadata: {},
};
messagesService.getReplayMessages.mockResolvedValue([replayMessage]);
messagesService.getMessagesAfter
.mockResolvedValueOnce([incrementalMessage])
.mockResolvedValueOnce([]);
const iterator = provider.streamMessages("session-1")[Symbol.asyncIterator]();
const first = await iterator.next();
const second = await iterator.next();
expect(first.done).toBe(false);
expect(first.value).toEqual({
id: "msg-replay",
sessionId: "session-1",
role: "assistant",
content: "replay",
timestamp: new Date("2026-03-07T10:00:00.000Z"),
metadata: {},
});
expect(second.done).toBe(false);
expect(second.value).toEqual({
id: "msg-live",
sessionId: "session-1",
role: "user",
content: "live",
timestamp: new Date("2026-03-07T10:00:01.000Z"),
metadata: {},
});
await iterator.return?.();
expect(messagesService.getReplayMessages).toHaveBeenCalledWith("session-1", 50);
expect(messagesService.getMessagesAfter).toHaveBeenCalledWith(
"session-1",
new Date("2026-03-07T10:00:00.000Z"),
"msg-replay"
);
});
it("reports provider availability", async () => {
await expect(provider.isAvailable()).resolves.toBe(true);
});
});

View File

@@ -0,0 +1,218 @@
import { Injectable } from "@nestjs/common";
import type {
AgentMessage,
AgentMessageRole,
AgentSession,
AgentSessionList,
AgentSessionStatus,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import type { AgentConversationMessage, AgentSessionTree } from "@prisma/client";
import { AgentControlService } from "./agent-control.service";
import { AgentMessagesService } from "./agent-messages.service";
import { AgentTreeService } from "./agent-tree.service";
const DEFAULT_SESSION_LIMIT = 50;
const DEFAULT_MESSAGE_LIMIT = 50;
const MAX_MESSAGE_LIMIT = 200;
const STREAM_POLL_INTERVAL_MS = 1000;
const INTERNAL_OPERATOR_ID = "internal-provider";
@Injectable()
export class InternalAgentProvider implements IAgentProvider {
readonly providerId = "internal";
readonly providerType = "internal";
readonly displayName = "Internal Orchestrator";
constructor(
private readonly messagesService: AgentMessagesService,
private readonly controlService: AgentControlService,
private readonly treeService: AgentTreeService
) {}
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
const {
sessions,
total,
cursor: nextCursor,
} = await this.treeService.listSessions(cursor, limit);
return {
sessions: sessions.map((session) => this.toAgentSession(session)),
total,
...(nextCursor !== undefined ? { cursor: nextCursor } : {}),
};
}
async getSession(sessionId: string): Promise<AgentSession | null> {
const session = await this.treeService.getSession(sessionId);
return session ? this.toAgentSession(session) : null;
}
async getMessages(
sessionId: string,
limit = DEFAULT_MESSAGE_LIMIT,
before?: string
): Promise<AgentMessage[]> {
const safeLimit = this.normalizeMessageLimit(limit);
const skip = this.parseSkip(before);
const result = await this.messagesService.getMessages(sessionId, safeLimit, skip);
return result.messages.map((message) => this.toAgentMessage(message));
}
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
await this.controlService.injectMessage(sessionId, INTERNAL_OPERATOR_ID, content);
return {
accepted: true,
};
}
async pauseSession(sessionId: string): Promise<void> {
await this.controlService.pauseAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async resumeSession(sessionId: string): Promise<void> {
await this.controlService.resumeAgent(sessionId, INTERNAL_OPERATOR_ID);
}
async killSession(sessionId: string, force = true): Promise<void> {
await this.controlService.killAgent(sessionId, INTERNAL_OPERATOR_ID, force);
}
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
const replayMessages = await this.messagesService.getReplayMessages(
sessionId,
DEFAULT_MESSAGE_LIMIT
);
let lastSeenTimestamp = new Date();
let lastSeenMessageId: string | null = null;
for (const message of replayMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
for (;;) {
const newMessages = await this.messagesService.getMessagesAfter(
sessionId,
lastSeenTimestamp,
lastSeenMessageId
);
for (const message of newMessages) {
yield this.toAgentMessage(message);
lastSeenTimestamp = message.timestamp;
lastSeenMessageId = message.id;
}
await this.delay(STREAM_POLL_INTERVAL_MS);
}
}
isAvailable(): Promise<boolean> {
return Promise.resolve(true);
}
private toAgentSession(session: AgentSessionTree): AgentSession {
const metadata = this.toMetadata(session.metadata);
return {
id: session.sessionId,
providerId: this.providerId,
providerType: this.providerType,
...(session.taskId !== null ? { label: session.taskId } : {}),
status: this.toSessionStatus(session.status),
...(session.parentSessionId !== null ? { parentSessionId: session.parentSessionId } : {}),
createdAt: session.spawnedAt,
updatedAt: session.completedAt ?? session.spawnedAt,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toAgentMessage(message: AgentConversationMessage): AgentMessage {
const metadata = this.toMetadata(message.metadata);
return {
id: message.id,
sessionId: message.sessionId,
role: this.toMessageRole(message.role),
content: message.content,
timestamp: message.timestamp,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toSessionStatus(status: string): AgentSessionStatus {
switch (status) {
case "running":
return "active";
case "paused":
return "paused";
case "completed":
return "completed";
case "failed":
case "killed":
return "failed";
case "spawning":
default:
return "idle";
}
}
private toMessageRole(role: string): AgentMessageRole {
switch (role) {
case "agent":
case "assistant":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private normalizeMessageLimit(limit: number): number {
const normalized = Number.isFinite(limit) ? Math.trunc(limit) : DEFAULT_MESSAGE_LIMIT;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_MESSAGE_LIMIT);
}
private parseSkip(before?: string): number {
if (!before) {
return 0;
}
const parsed = Number.parseInt(before, 10);
if (Number.isNaN(parsed) || parsed < 0) {
return 0;
}
return parsed;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (value !== null && typeof value === "object" && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return undefined;
}
private async delay(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}

View File

@@ -0,0 +1,21 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
export class GetMissionControlAuditLogQueryDto {
@IsOptional()
@IsString()
sessionId?: string;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
page = 1;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit = 50;
}

View File

@@ -0,0 +1,15 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, IsString, Max, Min } from "class-validator";
export class GetMissionControlMessagesQueryDto {
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(200)
limit?: number;
@IsOptional()
@IsString()
before?: string;
}

View File

@@ -0,0 +1,7 @@
import { IsBoolean, IsOptional } from "class-validator";
export class KillSessionDto {
@IsOptional()
@IsBoolean()
force?: boolean;
}

View File

@@ -0,0 +1,67 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AgentSession } from "@mosaic/shared";
import type { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
import { MissionControlController } from "./mission-control.controller";
import { MissionControlService } from "./mission-control.service";
describe("MissionControlController", () => {
let controller: MissionControlController;
let registry: {
listAllSessions: ReturnType<typeof vi.fn>;
getProviderForSession: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
registry = {
listAllSessions: vi.fn(),
getProviderForSession: vi.fn(),
};
const prisma = {
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
const service = new MissionControlService(
registry as unknown as AgentProviderRegistry,
prisma as unknown as PrismaService
);
controller = new MissionControlController(service);
});
it("Phase 1 gate: unified sessions endpoint returns internal provider sessions", async () => {
const internalSession: AgentSession = {
id: "session-internal-1",
providerId: "internal",
providerType: "internal",
status: "active",
createdAt: new Date("2026-03-07T20:00:00.000Z"),
updatedAt: new Date("2026-03-07T20:01:00.000Z"),
};
const externalSession: AgentSession = {
id: "session-openclaw-1",
providerId: "openclaw",
providerType: "external",
status: "active",
createdAt: new Date("2026-03-07T20:02:00.000Z"),
updatedAt: new Date("2026-03-07T20:03:00.000Z"),
};
registry.listAllSessions.mockResolvedValue([internalSession, externalSession]);
const response = await controller.listSessions();
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
expect(response.sessions).toEqual([internalSession, externalSession]);
expect(response.sessions).toContainEqual(
expect.objectContaining({
id: "session-internal-1",
providerId: "internal",
})
);
});
});

View File

@@ -0,0 +1,192 @@
import {
Body,
Controller,
Get,
Header,
HttpCode,
MessageEvent,
Param,
Post,
Query,
Request,
Sse,
UseGuards,
UsePipes,
ValidationPipe,
} from "@nestjs/common";
import type { AgentMessage, AgentSession, InjectResult } from "@mosaic/shared";
import { Observable } from "rxjs";
import { AuthGuard } from "../../auth/guards/auth.guard";
import { InjectAgentDto } from "../agents/dto/inject-agent.dto";
import { GetMissionControlAuditLogQueryDto } from "./dto/get-mission-control-audit-log-query.dto";
import { GetMissionControlMessagesQueryDto } from "./dto/get-mission-control-messages-query.dto";
import { KillSessionDto } from "./dto/kill-session.dto";
import { MissionControlService, type MissionControlAuditLogPage } from "./mission-control.service";
const DEFAULT_OPERATOR_ID = "mission-control";
interface MissionControlRequest {
user?: {
id?: string;
};
}
@Controller("api/mission-control")
@UseGuards(AuthGuard)
export class MissionControlController {
constructor(private readonly missionControlService: MissionControlService) {}
@Get("sessions")
async listSessions(): Promise<{ sessions: AgentSession[] }> {
const sessions = await this.missionControlService.listSessions();
return { sessions };
}
@Get("sessions/:sessionId")
getSession(@Param("sessionId") sessionId: string): Promise<AgentSession> {
return this.missionControlService.getSession(sessionId);
}
@Get("sessions/:sessionId/messages")
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async getMessages(
@Param("sessionId") sessionId: string,
@Query() query: GetMissionControlMessagesQueryDto
): Promise<{ messages: AgentMessage[] }> {
const messages = await this.missionControlService.getMessages(
sessionId,
query.limit,
query.before
);
return { messages };
}
@Get("audit-log")
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
getAuditLog(
@Query() query: GetMissionControlAuditLogQueryDto
): Promise<MissionControlAuditLogPage> {
return this.missionControlService.getAuditLog(query.sessionId, query.page, query.limit);
}
@Post("sessions/:sessionId/inject")
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
injectMessage(
@Param("sessionId") sessionId: string,
@Body() dto: InjectAgentDto,
@Request() req: MissionControlRequest
): Promise<InjectResult> {
return this.missionControlService.injectMessage(
sessionId,
dto.message,
this.resolveOperatorId(req)
);
}
@Post("sessions/:sessionId/pause")
@HttpCode(200)
async pauseSession(
@Param("sessionId") sessionId: string,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.pauseSession(sessionId, this.resolveOperatorId(req));
return { message: `Session ${sessionId} paused` };
}
@Post("sessions/:sessionId/resume")
@HttpCode(200)
async resumeSession(
@Param("sessionId") sessionId: string,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.resumeSession(sessionId, this.resolveOperatorId(req));
return { message: `Session ${sessionId} resumed` };
}
@Post("sessions/:sessionId/kill")
@HttpCode(200)
@UsePipes(new ValidationPipe({ transform: true, whitelist: true }))
async killSession(
@Param("sessionId") sessionId: string,
@Body() dto: KillSessionDto,
@Request() req: MissionControlRequest
): Promise<{ message: string }> {
await this.missionControlService.killSession(
sessionId,
dto.force ?? true,
this.resolveOperatorId(req)
);
return { message: `Session ${sessionId} killed` };
}
@Sse("sessions/:sessionId/stream")
@Header("Content-Type", "text/event-stream")
@Header("Cache-Control", "no-cache")
streamSessionMessages(@Param("sessionId") sessionId: string): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
let iterator: AsyncIterator<AgentMessage> | null = null;
void this.missionControlService
.streamMessages(sessionId)
.then(async (stream) => {
iterator = stream[Symbol.asyncIterator]();
for (;;) {
if (isClosed) {
break;
}
const next = (await iterator.next()) as { done: boolean; value: AgentMessage };
if (next.done) {
break;
}
subscriber.next({
data: this.toStreamPayload(next.value),
});
}
subscriber.complete();
})
.catch((error: unknown) => {
subscriber.error(error);
});
return () => {
isClosed = true;
void iterator?.return?.();
};
});
}
private resolveOperatorId(req: MissionControlRequest): string {
const operatorId = req.user?.id;
return typeof operatorId === "string" && operatorId.length > 0
? operatorId
: DEFAULT_OPERATOR_ID;
}
private toStreamPayload(message: AgentMessage): {
id: string;
sessionId: string;
role: string;
content: string;
timestamp: string;
metadata?: Record<string, unknown>;
} {
return {
id: message.id,
sessionId: message.sessionId,
role: message.role,
content: message.content,
timestamp: message.timestamp.toISOString(),
...(message.metadata !== undefined ? { metadata: message.metadata } : {}),
};
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { AgentsModule } from "../agents/agents.module";
import { AuthModule } from "../../auth/auth.module";
import { PrismaModule } from "../../prisma/prisma.module";
import { MissionControlController } from "./mission-control.controller";
import { MissionControlService } from "./mission-control.service";
@Module({
imports: [AgentsModule, AuthModule, PrismaModule],
controllers: [MissionControlController],
providers: [MissionControlService],
})
export class MissionControlModule {}

View File

@@ -0,0 +1,213 @@
import { NotFoundException } from "@nestjs/common";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
import type { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
import { MissionControlService } from "./mission-control.service";
type MockProvider = IAgentProvider & {
listSessions: ReturnType<typeof vi.fn>;
getSession: ReturnType<typeof vi.fn>;
getMessages: ReturnType<typeof vi.fn>;
injectMessage: ReturnType<typeof vi.fn>;
pauseSession: ReturnType<typeof vi.fn>;
resumeSession: ReturnType<typeof vi.fn>;
killSession: ReturnType<typeof vi.fn>;
streamMessages: ReturnType<typeof vi.fn>;
};
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
return;
};
const createProvider = (providerId = "internal"): MockProvider => ({
providerId,
providerType: providerId,
displayName: providerId,
listSessions: vi.fn().mockResolvedValue({ sessions: [], total: 0 }),
getSession: vi.fn().mockResolvedValue(null),
getMessages: vi.fn().mockResolvedValue([]),
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
pauseSession: vi.fn().mockResolvedValue(undefined),
resumeSession: vi.fn().mockResolvedValue(undefined),
killSession: vi.fn().mockResolvedValue(undefined),
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
isAvailable: vi.fn().mockResolvedValue(true),
});
describe("MissionControlService", () => {
let service: MissionControlService;
let registry: {
listAllSessions: ReturnType<typeof vi.fn>;
getProviderForSession: ReturnType<typeof vi.fn>;
};
let prisma: {
operatorAuditLog: {
create: ReturnType<typeof vi.fn>;
};
};
const session: AgentSession = {
id: "session-1",
providerId: "internal",
providerType: "internal",
status: "active",
createdAt: new Date("2026-03-07T14:00:00.000Z"),
updatedAt: new Date("2026-03-07T14:01:00.000Z"),
};
beforeEach(() => {
registry = {
listAllSessions: vi.fn().mockResolvedValue([session]),
getProviderForSession: vi.fn().mockResolvedValue(null),
};
prisma = {
operatorAuditLog: {
create: vi.fn().mockResolvedValue(undefined),
},
};
service = new MissionControlService(
registry as unknown as AgentProviderRegistry,
prisma as unknown as PrismaService
);
});
it("lists sessions from the registry", async () => {
await expect(service.listSessions()).resolves.toEqual([session]);
expect(registry.listAllSessions).toHaveBeenCalledTimes(1);
});
it("returns a session when it is found", async () => {
const provider = createProvider("internal");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.getSession(session.id)).resolves.toEqual(session);
});
it("throws NotFoundException when session lookup fails", async () => {
await expect(service.getSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
});
it("gets messages from the resolved provider", async () => {
const provider = createProvider("openclaw");
const messages: AgentMessage[] = [
{
id: "message-1",
sessionId: session.id,
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T14:01:00.000Z"),
},
];
provider.getMessages.mockResolvedValue(messages);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.getMessages(session.id, 25, "10")).resolves.toEqual(messages);
expect(provider.getMessages).toHaveBeenCalledWith(session.id, 25, "10");
});
it("injects a message and writes an audit log", async () => {
const provider = createProvider("internal");
const injectResult: InjectResult = { accepted: true, messageId: "msg-1" };
provider.injectMessage.mockResolvedValue(injectResult);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.injectMessage(session.id, "ship it", "operator-1")).resolves.toEqual(
injectResult
);
expect(provider.injectMessage).toHaveBeenCalledWith(session.id, "ship it");
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: session.id,
userId: "operator-1",
provider: "internal",
action: "inject",
content: "ship it",
metadata: {
payload: { message: "ship it" },
},
},
});
});
it("pauses and resumes using default operator id", async () => {
const provider = createProvider("openclaw");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await service.pauseSession(session.id);
await service.resumeSession(session.id);
expect(provider.pauseSession).toHaveBeenCalledWith(session.id);
expect(provider.resumeSession).toHaveBeenCalledWith(session.id);
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
data: {
sessionId: session.id,
userId: "mission-control",
provider: "openclaw",
action: "pause",
metadata: {
payload: {},
},
},
});
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(2, {
data: {
sessionId: session.id,
userId: "mission-control",
provider: "openclaw",
action: "resume",
metadata: {
payload: {},
},
},
});
});
it("kills with provided force value and writes audit log", async () => {
const provider = createProvider("openclaw");
registry.getProviderForSession.mockResolvedValue({ provider, session });
await service.killSession(session.id, false, "operator-2");
expect(provider.killSession).toHaveBeenCalledWith(session.id, false);
expect(prisma.operatorAuditLog.create).toHaveBeenCalledWith({
data: {
sessionId: session.id,
userId: "operator-2",
provider: "openclaw",
action: "kill",
metadata: {
payload: { force: false },
},
},
});
});
it("resolves provider message stream", async () => {
const provider = createProvider("internal");
const messageStream = (async function* (): AsyncIterable<AgentMessage> {
yield {
id: "message-1",
sessionId: session.id,
role: "assistant",
content: "stream",
timestamp: new Date("2026-03-07T14:03:00.000Z"),
};
})();
provider.streamMessages.mockReturnValue(messageStream);
registry.getProviderForSession.mockResolvedValue({ provider, session });
await expect(service.streamMessages(session.id)).resolves.toBe(messageStream);
expect(provider.streamMessages).toHaveBeenCalledWith(session.id);
});
it("does not write audit log when session cannot be resolved", async () => {
await expect(service.pauseSession("missing-session")).rejects.toBeInstanceOf(NotFoundException);
expect(prisma.operatorAuditLog.create).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,186 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import type { AgentMessage, AgentSession, IAgentProvider, InjectResult } from "@mosaic/shared";
import type { Prisma } from "@prisma/client";
import { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
type MissionControlAction = "inject" | "pause" | "resume" | "kill";
const DEFAULT_OPERATOR_ID = "mission-control";
export interface AuditLogEntry {
id: string;
userId: string;
sessionId: string;
provider: string;
action: string;
content: string | null;
metadata: Prisma.JsonValue;
createdAt: Date;
}
export interface MissionControlAuditLogPage {
items: AuditLogEntry[];
total: number;
page: number;
pages: number;
}
@Injectable()
export class MissionControlService {
constructor(
private readonly registry: AgentProviderRegistry,
private readonly prisma: PrismaService
) {}
listSessions(): Promise<AgentSession[]> {
return this.registry.listAllSessions();
}
async getSession(sessionId: string): Promise<AgentSession> {
const resolved = await this.registry.getProviderForSession(sessionId);
if (!resolved) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
return resolved.session;
}
async getMessages(sessionId: string, limit?: number, before?: string): Promise<AgentMessage[]> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
return provider.getMessages(sessionId, limit, before);
}
async getAuditLog(
sessionId: string | undefined,
page: number,
limit: number
): Promise<MissionControlAuditLogPage> {
const normalizedSessionId = sessionId?.trim();
const where: Prisma.OperatorAuditLogWhereInput =
normalizedSessionId && normalizedSessionId.length > 0
? { sessionId: normalizedSessionId }
: {};
const [total, items] = await this.prisma.$transaction([
this.prisma.operatorAuditLog.count({ where }),
this.prisma.operatorAuditLog.findMany({
where,
orderBy: { createdAt: "desc" },
skip: (page - 1) * limit,
take: limit,
}),
]);
return {
items,
total,
page,
pages: total === 0 ? 0 : Math.ceil(total / limit),
};
}
async injectMessage(
sessionId: string,
message: string,
operatorId = DEFAULT_OPERATOR_ID
): Promise<InjectResult> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
const result = await provider.injectMessage(sessionId, message);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "inject",
content: message,
payload: { message },
});
return result;
}
async pauseSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.pauseSession(sessionId);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "pause",
payload: {},
});
}
async resumeSession(sessionId: string, operatorId = DEFAULT_OPERATOR_ID): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.resumeSession(sessionId);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "resume",
payload: {},
});
}
async killSession(
sessionId: string,
force = true,
operatorId = DEFAULT_OPERATOR_ID
): Promise<void> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
await provider.killSession(sessionId, force);
await this.writeOperatorAuditLog({
sessionId,
providerId: provider.providerId,
operatorId,
action: "kill",
payload: { force },
});
}
async streamMessages(sessionId: string): Promise<AsyncIterable<AgentMessage>> {
const { provider } = await this.getProviderForSessionOrThrow(sessionId);
return provider.streamMessages(sessionId);
}
private async getProviderForSessionOrThrow(
sessionId: string
): Promise<{ provider: IAgentProvider; session: AgentSession }> {
const resolved = await this.registry.getProviderForSession(sessionId);
if (!resolved) {
throw new NotFoundException(`Session ${sessionId} not found`);
}
return resolved;
}
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
return value as Prisma.InputJsonValue;
}
private async writeOperatorAuditLog(params: {
sessionId: string;
providerId: string;
operatorId: string;
action: MissionControlAction;
content?: string;
payload: Record<string, unknown>;
}): Promise<void> {
await this.prisma.operatorAuditLog.create({
data: {
sessionId: params.sessionId,
userId: params.operatorId,
provider: params.providerId,
action: params.action,
...(params.content !== undefined ? { content: params.content } : {}),
metadata: this.toJsonValue({ payload: params.payload }),
},
});
}
}

View File

@@ -0,0 +1,145 @@
import type { HttpService } from "@nestjs/axios";
import type { AgentMessage } from "@mosaic/shared";
import { Readable } from "node:stream";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
describe("OpenClawSseBridge", () => {
let bridge: OpenClawSseBridge;
let httpService: {
axiosRef: {
get: ReturnType<typeof vi.fn>;
};
};
beforeEach(() => {
httpService = {
axiosRef: {
get: vi.fn(),
},
};
bridge = new OpenClawSseBridge(httpService as unknown as HttpService);
});
afterEach(() => {
vi.useRealTimers();
});
it("maps message and status events, and skips heartbeats", async () => {
httpService.axiosRef.get.mockResolvedValue({
data: Readable.from([
'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n',
"event: heartbeat\ndata: {}\n\n",
'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T16:00:01.000Z"}\n\n',
"data: [DONE]\n\n",
]),
});
const messages = await collectMessages(
bridge.streamSession("https://gateway.example.com/", "session-1", {
Authorization: "Bearer test-token",
})
);
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
"https://gateway.example.com/api/sessions/session-1/stream",
{
headers: {
Authorization: "Bearer test-token",
Accept: "text/event-stream",
},
responseType: "stream",
}
);
expect(messages).toHaveLength(2);
expect(messages[0]).toEqual({
id: "msg-1",
sessionId: "session-1",
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
});
expect(messages[1]).toEqual({
id: expect.any(String),
sessionId: "session-1",
role: "system",
content: "Session status changed to paused",
timestamp: new Date("2026-03-07T16:00:01.000Z"),
metadata: {
status: "paused",
timestamp: "2026-03-07T16:00:01.000Z",
},
});
});
it("retries after disconnect and resumes streaming", async () => {
vi.useFakeTimers();
httpService.axiosRef.get
.mockResolvedValueOnce({
data: Readable.from([
'event: message\ndata: {"id":"msg-1","content":"first","timestamp":"2026-03-07T16:10:00.000Z"}\n\n',
]),
})
.mockResolvedValueOnce({
data: Readable.from(["data: [DONE]\n\n"]),
});
const consumePromise = collectMessages(
bridge.streamSession("https://gateway.example.com", "session-1", {
Authorization: "Bearer test-token",
})
);
await vi.advanceTimersByTimeAsync(2000);
const messages = await consumePromise;
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(2);
expect(messages).toEqual([
{
id: "msg-1",
sessionId: "session-1",
role: "user",
content: "first",
timestamp: new Date("2026-03-07T16:10:00.000Z"),
},
]);
});
it("throws after exhausting reconnect retries", async () => {
vi.useFakeTimers();
httpService.axiosRef.get.mockRejectedValue(new Error("socket closed"));
const consumePromise = collectMessages(
bridge.streamSession("https://gateway.example.com", "session-1", {
Authorization: "Bearer test-token",
})
);
const rejection = expect(consumePromise).rejects.toThrow(
"Failed to reconnect OpenClaw stream for session session-1 after 5 retries: socket closed"
);
for (let attempt = 0; attempt < 5; attempt += 1) {
await vi.advanceTimersByTimeAsync(2000);
}
await rejection;
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6);
});
});
async function collectMessages(stream: AsyncIterable<AgentMessage>): Promise<AgentMessage[]> {
const messages: AgentMessage[] = [];
for await (const message of stream) {
messages.push(message);
}
return messages;
}

View File

@@ -0,0 +1,420 @@
import { HttpService } from "@nestjs/axios";
import { Injectable } from "@nestjs/common";
import type { AgentMessage, AgentMessageRole } from "@mosaic/shared";
import { randomUUID } from "node:crypto";
const STREAM_RETRY_DELAY_MS = 2000;
const STREAM_MAX_RETRIES = 5;
type JsonRecord = Record<string, unknown>;
type AsyncChunkStream = AsyncIterable<string | Uint8Array | Buffer>;
type ParsedStreamEvent =
| {
type: "message";
message: AgentMessage;
}
| {
type: "done";
};
@Injectable()
export class OpenClawSseBridge {
constructor(private readonly httpService: HttpService) {}
async *streamSession(
baseUrl: string,
sessionId: string,
headers: Record<string, string>
): AsyncIterable<AgentMessage> {
let retryCount = 0;
let lastError: unknown = new Error("OpenClaw stream disconnected");
while (retryCount <= STREAM_MAX_RETRIES) {
try {
const response = await this.httpService.axiosRef.get(
this.buildStreamUrl(baseUrl, sessionId),
{
headers: {
...headers,
Accept: "text/event-stream",
},
responseType: "stream",
}
);
const stream = this.asAsyncChunkStream(response.data);
if (stream === null) {
throw new Error("OpenClaw stream response is not readable");
}
retryCount = 0;
let streamCompleted = false;
for await (const event of this.parseStream(stream, sessionId)) {
if (event.type === "done") {
streamCompleted = true;
break;
}
yield event.message;
}
if (streamCompleted) {
return;
}
lastError = new Error("OpenClaw stream disconnected");
} catch (error) {
lastError = error;
}
if (retryCount >= STREAM_MAX_RETRIES) {
throw new Error(
`Failed to reconnect OpenClaw stream for session ${sessionId} after ${String(STREAM_MAX_RETRIES)} retries: ${this.toErrorMessage(lastError)}`
);
}
retryCount += 1;
await this.delay(STREAM_RETRY_DELAY_MS);
}
}
private async *parseStream(
stream: AsyncChunkStream,
sessionId: string
): AsyncGenerator<ParsedStreamEvent> {
const decoder = new TextDecoder();
let buffer = "";
for await (const chunk of stream) {
const textChunk = typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true });
buffer += textChunk.replace(/\r\n/gu, "\n");
const rawEvents = buffer.split("\n\n");
buffer = rawEvents.pop() ?? "";
for (const rawEvent of rawEvents) {
const parsedEvent = this.parseRawEvent(rawEvent);
if (parsedEvent === null) {
continue;
}
if (parsedEvent.data === "[DONE]") {
yield {
type: "done",
};
return;
}
const payload = this.tryParseJson(parsedEvent.data) ?? parsedEvent.data;
const message = this.mapEventToMessage(parsedEvent.type, payload, sessionId);
if (message !== null) {
yield {
type: "message",
message,
};
}
}
}
buffer += decoder.decode();
const trailingEvent = this.parseRawEvent(buffer.trim());
if (trailingEvent === null) {
return;
}
if (trailingEvent.data === "[DONE]") {
yield {
type: "done",
};
return;
}
const payload = this.tryParseJson(trailingEvent.data) ?? trailingEvent.data;
const message = this.mapEventToMessage(trailingEvent.type, payload, sessionId);
if (message !== null) {
yield {
type: "message",
message,
};
}
}
private parseRawEvent(rawEvent: string): { type: string; data: string } | null {
if (rawEvent.trim().length === 0) {
return null;
}
let type = "message";
const dataLines: string[] = [];
for (const line of rawEvent.split("\n")) {
const trimmedLine = line.trimEnd();
if (trimmedLine.length === 0 || trimmedLine.startsWith(":")) {
continue;
}
if (trimmedLine.startsWith("event:")) {
type = trimmedLine.slice(6).trim().toLowerCase();
continue;
}
if (trimmedLine.startsWith("data:")) {
dataLines.push(trimmedLine.slice(5).trimStart());
}
}
if (dataLines.length > 0) {
return {
type,
data: dataLines.join("\n").trim(),
};
}
const trimmedEvent = rawEvent.trim();
if (trimmedEvent.startsWith("{") || trimmedEvent.startsWith("[")) {
return {
type,
data: trimmedEvent,
};
}
return null;
}
private mapEventToMessage(
eventType: string,
payload: unknown,
fallbackSessionId: string
): AgentMessage | null {
switch (eventType) {
case "heartbeat":
return null;
case "status":
return this.toStatusMessage(payload, fallbackSessionId);
case "message":
default:
return this.toAgentMessage(payload, fallbackSessionId);
}
}
private toStatusMessage(value: unknown, sessionId: string): AgentMessage | null {
if (typeof value === "string") {
const status = value.trim();
if (status.length === 0) {
return null;
}
return {
id: randomUUID(),
sessionId,
role: "system",
content: `Session status changed to ${status}`,
timestamp: new Date(),
metadata: {
status,
},
};
}
if (!this.isRecord(value)) {
return null;
}
const status = this.readString(value.status);
if (!status) {
return null;
}
return {
id: randomUUID(),
sessionId,
role: "system",
content: `Session status changed to ${status}`,
timestamp: this.parseDate(value.timestamp ?? value.updatedAt),
metadata: value,
};
}
private toAgentMessage(value: unknown, fallbackSessionId: string): AgentMessage | null {
if (typeof value === "string") {
const content = value.trim();
if (content.length === 0) {
return null;
}
return {
id: randomUUID(),
sessionId: fallbackSessionId,
role: "assistant",
content,
timestamp: new Date(),
};
}
let candidate: JsonRecord | null = null;
if (this.isRecord(value) && this.isRecord(value.message)) {
candidate = value.message;
} else if (this.isRecord(value)) {
candidate = value;
}
if (candidate === null) {
return null;
}
const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId;
if (!sessionId) {
return null;
}
const content = this.extractMessageContent(
candidate.content ?? candidate.text ?? candidate.message
);
if (content.length === 0) {
return null;
}
const metadata = this.toMetadata(candidate.metadata);
return {
id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(),
sessionId,
role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)),
content,
timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt),
...(metadata !== undefined ? { metadata } : {}),
};
}
private extractMessageContent(content: unknown): string {
if (typeof content === "string") {
return content.trim();
}
if (Array.isArray(content)) {
const parts: string[] = [];
for (const part of content) {
if (typeof part === "string") {
const trimmed = part.trim();
if (trimmed.length > 0) {
parts.push(trimmed);
}
continue;
}
if (!this.isRecord(part)) {
continue;
}
const text = this.readString(part.text) ?? this.readString(part.content);
if (text !== undefined && text.trim().length > 0) {
parts.push(text.trim());
}
}
return parts.join("\n\n").trim();
}
if (this.isRecord(content)) {
const text = this.readString(content.text) ?? this.readString(content.content);
return text?.trim() ?? "";
}
return "";
}
private toMessageRole(role?: string): AgentMessageRole {
switch (role?.toLowerCase()) {
case "assistant":
case "agent":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private parseDate(value: unknown, fallback = new Date()): Date {
if (value instanceof Date) {
return value;
}
if (typeof value === "string" || typeof value === "number") {
const parsed = new Date(value);
if (!Number.isNaN(parsed.getTime())) {
return parsed;
}
}
return fallback;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (this.isRecord(value)) {
return value;
}
return undefined;
}
private buildStreamUrl(baseUrl: string, sessionId: string): string {
const normalizedBaseUrl = baseUrl.replace(/\/$/u, "");
return new URL(
`/api/sessions/${encodeURIComponent(sessionId)}/stream`,
`${normalizedBaseUrl}/`
).toString();
}
private tryParseJson(value: string): unknown {
try {
return JSON.parse(value) as unknown;
} catch {
return null;
}
}
private asAsyncChunkStream(value: unknown): AsyncChunkStream | null {
if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) {
return value as AsyncChunkStream;
}
return null;
}
private isRecord(value: unknown): value is JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
private readString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
private async delay(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
private toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
}

View File

@@ -0,0 +1,24 @@
import { HttpService } from "@nestjs/axios";
import { Injectable } from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
import { OpenClawProvider } from "./openclaw.provider";
@Injectable()
export class OpenClawProviderFactory {
constructor(
private readonly encryptionService: EncryptionService,
private readonly httpService: HttpService,
private readonly openClawSseBridge: OpenClawSseBridge
) {}
createProvider(config: AgentProviderConfig): OpenClawProvider {
return new OpenClawProvider(
config,
this.encryptionService,
this.httpService,
this.openClawSseBridge
);
}
}

View File

@@ -0,0 +1,183 @@
import type { HttpService } from "@nestjs/axios";
import { ServiceUnavailableException } from "@nestjs/common";
import type { AgentMessage } from "@mosaic/shared";
import type { AgentProviderConfig } from "@prisma/client";
import { Readable } from "node:stream";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
import { OpenClawProvider } from "./openclaw.provider";
describe("Phase 3 gate: OpenClaw provider config registered in DB → provider loaded on boot → sessions returned from /api/mission-control/sessions → inject/pause/kill proxied to gateway", () => {
let provider: OpenClawProvider;
let httpService: {
axiosRef: {
get: ReturnType<typeof vi.fn>;
post: ReturnType<typeof vi.fn>;
};
};
let encryptionService: {
decryptIfNeeded: ReturnType<typeof vi.fn>;
};
const config: AgentProviderConfig = {
id: "cfg-openclaw-1",
workspaceId: "workspace-1",
name: "openclaw-home",
provider: "openclaw",
gatewayUrl: "https://gateway.example.com",
credentials: {
apiToken: "enc:token",
},
isActive: true,
createdAt: new Date("2026-03-07T15:00:00.000Z"),
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
};
beforeEach(() => {
httpService = {
axiosRef: {
get: vi.fn(),
post: vi.fn(),
},
};
encryptionService = {
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
};
provider = new OpenClawProvider(
config,
encryptionService as unknown as EncryptionService,
httpService as unknown as HttpService,
new OpenClawSseBridge(httpService as unknown as HttpService)
);
});
afterEach(() => {
vi.useRealTimers();
});
it("maps listSessions from mocked OpenClaw gateway HTTP responses", async () => {
httpService.axiosRef.get.mockResolvedValue({
data: {
sessions: [
{
id: "session-1",
status: "running",
createdAt: "2026-03-07T15:01:00.000Z",
updatedAt: "2026-03-07T15:02:00.000Z",
},
],
total: 1,
},
});
await expect(provider.listSessions()).resolves.toEqual({
sessions: [
{
id: "session-1",
providerId: "openclaw-home",
providerType: "openclaw",
status: "active",
createdAt: new Date("2026-03-07T15:01:00.000Z"),
updatedAt: new Date("2026-03-07T15:02:00.000Z"),
},
],
total: 1,
});
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
"https://gateway.example.com/api/sessions",
{
headers: {
Authorization: "Bearer plain-token",
},
params: {
limit: 50,
},
}
);
});
it("maps streamMessages from mock SSE events into AgentMessage output", async () => {
httpService.axiosRef.get.mockResolvedValue({
data: Readable.from([
'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello from stream","timestamp":"2026-03-07T15:03:00.000Z"}\n\n',
'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T15:04:00.000Z"}\n\n',
"data: [DONE]\n\n",
]),
});
const messages = await collectMessages(provider.streamMessages("session-1"));
expect(messages).toEqual([
{
id: "msg-1",
sessionId: "session-1",
role: "assistant",
content: "hello from stream",
timestamp: new Date("2026-03-07T15:03:00.000Z"),
},
{
id: expect.any(String),
sessionId: "session-1",
role: "system",
content: "Session status changed to paused",
timestamp: new Date("2026-03-07T15:04:00.000Z"),
metadata: {
status: "paused",
timestamp: "2026-03-07T15:04:00.000Z",
},
},
]);
});
it("handles unavailable gateway errors", async () => {
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unavailable"));
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
await expect(provider.listSessions()).rejects.toThrow("gateway unavailable");
});
it("handles bad token decryption errors", async () => {
encryptionService.decryptIfNeeded.mockImplementation(() => {
throw new Error("bad token");
});
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
await expect(provider.listSessions()).rejects.toThrow("Failed to decrypt API token");
});
it("handles malformed SSE stream responses", async () => {
vi.useFakeTimers();
httpService.axiosRef.get.mockResolvedValue({
data: {
malformed: true,
},
});
const streamPromise = collectMessages(provider.streamMessages("session-malformed"));
const rejection = expect(streamPromise).rejects.toThrow(
"OpenClaw provider openclaw-home failed to stream messages for session session-malformed"
);
for (let attempt = 0; attempt < 5; attempt += 1) {
await vi.advanceTimersByTimeAsync(2000);
}
await rejection;
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6);
});
});
async function collectMessages(stream: AsyncIterable<AgentMessage>): Promise<AgentMessage[]> {
const messages: AgentMessage[] = [];
for await (const message of stream) {
messages.push(message);
}
return messages;
}

View File

@@ -0,0 +1,271 @@
import type { HttpService } from "@nestjs/axios";
import { ServiceUnavailableException } from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
import { OpenClawProvider } from "./openclaw.provider";
describe("OpenClawProvider", () => {
let provider: OpenClawProvider;
let httpService: {
axiosRef: {
get: ReturnType<typeof vi.fn>;
post: ReturnType<typeof vi.fn>;
};
};
let encryptionService: {
decryptIfNeeded: ReturnType<typeof vi.fn>;
};
let sseBridge: {
streamSession: ReturnType<typeof vi.fn>;
};
const config: AgentProviderConfig = {
id: "cfg-openclaw-1",
workspaceId: "workspace-1",
name: "openclaw-home",
provider: "openclaw",
gatewayUrl: "https://gateway.example.com/",
credentials: {
apiToken: "enc:token-value",
displayName: "Home OpenClaw",
},
isActive: true,
createdAt: new Date("2026-03-07T15:00:00.000Z"),
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
};
beforeEach(() => {
httpService = {
axiosRef: {
get: vi.fn(),
post: vi.fn(),
},
};
encryptionService = {
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
};
sseBridge = {
streamSession: vi.fn(),
};
provider = new OpenClawProvider(
config,
encryptionService as unknown as EncryptionService,
httpService as unknown as HttpService,
sseBridge as unknown as OpenClawSseBridge
);
});
it("maps listSessions from OpenClaw API", async () => {
httpService.axiosRef.get.mockResolvedValue({
data: {
sessions: [
{
id: "session-1",
status: "running",
createdAt: "2026-03-07T15:01:00.000Z",
updatedAt: "2026-03-07T15:02:00.000Z",
},
],
total: 1,
cursor: "next-cursor",
},
});
const result = await provider.listSessions("cursor-1", 25);
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
"https://gateway.example.com/api/sessions",
{
headers: {
Authorization: "Bearer plain-token",
},
params: {
cursor: "cursor-1",
limit: 25,
},
}
);
expect(result).toEqual({
sessions: [
{
id: "session-1",
providerId: "openclaw-home",
providerType: "openclaw",
status: "active",
createdAt: new Date("2026-03-07T15:01:00.000Z"),
updatedAt: new Date("2026-03-07T15:02:00.000Z"),
},
],
total: 1,
cursor: "next-cursor",
});
expect(encryptionService.decryptIfNeeded).toHaveBeenCalledWith("enc:token-value");
});
it("returns null from getSession when OpenClaw returns 404", async () => {
httpService.axiosRef.get.mockRejectedValue({
response: {
status: 404,
},
});
await expect(provider.getSession("missing-session")).resolves.toBeNull();
});
it("maps getMessages response", async () => {
httpService.axiosRef.get.mockResolvedValue({
data: {
messages: [
{
id: "message-1",
sessionId: "session-1",
role: "agent",
content: "hello",
timestamp: "2026-03-07T15:03:00.000Z",
metadata: {
tokens: 128,
},
},
],
},
});
const result = await provider.getMessages("session-1", 20, "before-cursor");
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
"https://gateway.example.com/api/messages",
{
headers: {
Authorization: "Bearer plain-token",
},
params: {
sessionId: "session-1",
limit: 20,
before: "before-cursor",
},
}
);
expect(result).toEqual([
{
id: "message-1",
sessionId: "session-1",
role: "assistant",
content: "hello",
timestamp: new Date("2026-03-07T15:03:00.000Z"),
metadata: {
tokens: 128,
},
},
]);
});
it("maps inject and control endpoints", async () => {
httpService.axiosRef.post
.mockResolvedValueOnce({
data: {
accepted: true,
messageId: "message-2",
},
})
.mockResolvedValueOnce({ data: {} })
.mockResolvedValueOnce({ data: {} })
.mockResolvedValueOnce({ data: {} });
await expect(provider.injectMessage("session-1", "barge in")).resolves.toEqual({
accepted: true,
messageId: "message-2",
});
await provider.pauseSession("session-1");
await provider.resumeSession("session-1");
await provider.killSession("session-1", false);
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
1,
"https://gateway.example.com/api/sessions/session-1/inject",
{ content: "barge in" },
{
headers: {
Authorization: "Bearer plain-token",
},
}
);
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
2,
"https://gateway.example.com/api/sessions/session-1/pause",
{},
{
headers: {
Authorization: "Bearer plain-token",
},
}
);
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
3,
"https://gateway.example.com/api/sessions/session-1/resume",
{},
{
headers: {
Authorization: "Bearer plain-token",
},
}
);
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
4,
"https://gateway.example.com/api/sessions/session-1/kill",
{ force: false },
{
headers: {
Authorization: "Bearer plain-token",
},
}
);
});
it("delegates streaming to OpenClawSseBridge", async () => {
const streamedMessage = {
id: "message-stream",
sessionId: "session-stream",
role: "assistant",
content: "stream hello",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
};
sseBridge.streamSession.mockReturnValue(
(async function* () {
yield streamedMessage;
})()
);
const messages: Array<unknown> = [];
for await (const message of provider.streamMessages("session-stream")) {
messages.push(message);
}
expect(sseBridge.streamSession).toHaveBeenCalledWith(
"https://gateway.example.com",
"session-stream",
{
Authorization: "Bearer plain-token",
}
);
expect(messages).toEqual([streamedMessage]);
});
it("throws ServiceUnavailableException for request failures", async () => {
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable"));
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
});
it("returns false from isAvailable when gateway check fails", async () => {
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable"));
await expect(provider.isAvailable()).resolves.toBe(false);
});
});

View File

@@ -0,0 +1,613 @@
import { HttpService } from "@nestjs/axios";
import { Injectable, ServiceUnavailableException } from "@nestjs/common";
import type {
AgentMessage,
AgentMessageRole,
AgentSession,
AgentSessionList,
AgentSessionStatus,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import type { AgentProviderConfig } from "@prisma/client";
import { randomUUID } from "node:crypto";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
const DEFAULT_SESSION_LIMIT = 50;
const DEFAULT_MESSAGE_LIMIT = 50;
const MAX_MESSAGE_LIMIT = 200;
const OPENCLAW_PROVIDER_TYPE = "openclaw";
const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
const DISPLAY_NAME_KEYS = ["displayName", "label"] as const;
type JsonRecord = Record<string, unknown>;
interface HttpErrorWithResponse {
response?: {
status?: number;
};
}
@Injectable()
export class OpenClawProvider implements IAgentProvider {
readonly providerId: string;
readonly providerType = OPENCLAW_PROVIDER_TYPE;
readonly displayName: string;
constructor(
private readonly config: AgentProviderConfig,
private readonly encryptionService: EncryptionService,
private readonly httpService: HttpService,
private readonly sseBridge: OpenClawSseBridge
) {
this.providerId = this.config.name;
this.displayName = this.resolveDisplayName();
}
validateBaseUrl(): void {
void this.resolveBaseUrl();
}
validateToken(): void {
void this.resolveApiToken();
}
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
const safeLimit = this.normalizeLimit(limit, DEFAULT_SESSION_LIMIT);
const params: Record<string, number | string> = { limit: safeLimit };
if (typeof cursor === "string" && cursor.length > 0) {
params.cursor = cursor;
}
try {
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
headers: this.authHeaders(),
params,
});
const page = this.extractSessionPage(response.data);
const sessions = page.records
.map((record) => this.toAgentSession(record))
.filter((session): session is AgentSession => session !== null);
return {
sessions,
total: page.total ?? sessions.length,
...(page.cursor !== undefined ? { cursor: page.cursor } : {}),
};
} catch (error) {
throw this.toServiceUnavailable("list sessions", error);
}
}
async getSession(sessionId: string): Promise<AgentSession | null> {
try {
const response = await this.httpService.axiosRef.get(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}`),
{
headers: this.authHeaders(),
}
);
const payload = this.unwrapContainer(response.data, ["session", "data"]);
return this.toAgentSession(payload);
} catch (error) {
if (this.getHttpStatus(error) === 404) {
return null;
}
throw this.toServiceUnavailable(`get session ${sessionId}`, error);
}
}
async getMessages(
sessionId: string,
limit = DEFAULT_MESSAGE_LIMIT,
before?: string
): Promise<AgentMessage[]> {
const safeLimit = this.normalizeLimit(limit, DEFAULT_MESSAGE_LIMIT);
const params: Record<string, number | string> = {
sessionId,
limit: safeLimit,
};
if (typeof before === "string" && before.length > 0) {
params.before = before;
}
try {
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/messages"), {
headers: this.authHeaders(),
params,
});
return this.extractMessageRecords(response.data)
.map((record) => this.toAgentMessage(record, sessionId))
.filter((message): message is AgentMessage => message !== null);
} catch (error) {
throw this.toServiceUnavailable(`get messages for session ${sessionId}`, error);
}
}
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
try {
const response = await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/inject`),
{ content },
{
headers: this.authHeaders(),
}
);
const payload = this.isRecord(response.data) ? response.data : {};
return {
accepted: typeof payload.accepted === "boolean" ? payload.accepted : true,
...(this.readString(payload.messageId) !== undefined
? { messageId: this.readString(payload.messageId) }
: {}),
};
} catch (error) {
throw this.toServiceUnavailable(`inject message into session ${sessionId}`, error);
}
}
async pauseSession(sessionId: string): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/pause`),
{},
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`pause session ${sessionId}`, error);
}
}
async resumeSession(sessionId: string): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/resume`),
{},
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`resume session ${sessionId}`, error);
}
}
async killSession(sessionId: string, force = true): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/kill`),
{ force },
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`kill session ${sessionId}`, error);
}
}
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
try {
yield* this.sseBridge.streamSession(this.resolveBaseUrl(), sessionId, this.authHeaders());
} catch (error) {
throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error);
}
}
async isAvailable(): Promise<boolean> {
try {
this.validateBaseUrl();
this.validateToken();
await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
headers: this.authHeaders(),
params: { limit: 1 },
});
return true;
} catch {
return false;
}
}
private extractSessionPage(payload: unknown): {
records: unknown[];
total?: number;
cursor?: string;
} {
if (Array.isArray(payload)) {
return {
records: payload,
total: payload.length,
};
}
if (!this.isRecord(payload)) {
return {
records: [],
};
}
let records: unknown[] = [];
if (Array.isArray(payload.sessions)) {
records = payload.sessions;
} else if (Array.isArray(payload.items)) {
records = payload.items;
} else if (Array.isArray(payload.data)) {
records = payload.data;
}
const total = typeof payload.total === "number" ? payload.total : undefined;
const cursor = this.readString(payload.cursor) ?? this.readString(payload.nextCursor);
return {
records,
total,
...(cursor !== undefined ? { cursor } : {}),
};
}
private extractMessageRecords(payload: unknown): unknown[] {
if (Array.isArray(payload)) {
return payload;
}
if (!this.isRecord(payload)) {
return [];
}
if (Array.isArray(payload.messages)) {
return payload.messages;
}
if (Array.isArray(payload.items)) {
return payload.items;
}
if (Array.isArray(payload.data)) {
return payload.data;
}
return [];
}
private unwrapContainer(payload: unknown, keys: string[]): unknown {
if (!this.isRecord(payload)) {
return payload;
}
for (const key of keys) {
if (key in payload) {
return payload[key];
}
}
return payload;
}
private toAgentSession(record: unknown): AgentSession | null {
if (!this.isRecord(record)) {
return null;
}
const id =
this.readString(record.id) ??
this.readString(record.sessionId) ??
this.readString(record.key);
if (!id) {
return null;
}
const createdAt = this.parseDate(record.createdAt ?? record.spawnedAt ?? record.startedAt);
const updatedAt = this.parseDate(
record.updatedAt ?? record.completedAt ?? record.lastActivityAt ?? record.endedAt,
createdAt
);
const label =
this.readString(record.label) ??
this.readString(record.title) ??
this.readString(record.name) ??
undefined;
const parentSessionId = this.readString(record.parentSessionId) ?? undefined;
const metadata = this.toMetadata(record.metadata);
return {
id,
providerId: this.providerId,
providerType: this.providerType,
...(label !== undefined ? { label } : {}),
status: this.toSessionStatus(this.readString(record.status)),
...(parentSessionId !== undefined ? { parentSessionId } : {}),
createdAt,
updatedAt,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toAgentMessage(value: unknown, fallbackSessionId?: string): AgentMessage | null {
if (typeof value === "string") {
const content = value.trim();
if (content.length === 0 || fallbackSessionId === undefined) {
return null;
}
return {
id: randomUUID(),
sessionId: fallbackSessionId,
role: "assistant",
content,
timestamp: new Date(),
};
}
let candidate: JsonRecord | null = null;
if (this.isRecord(value) && this.isRecord(value.message)) {
candidate = value.message;
} else if (this.isRecord(value)) {
candidate = value;
}
if (candidate === null) {
return null;
}
const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId;
if (!sessionId) {
return null;
}
const content = this.extractMessageContent(
candidate.content ?? candidate.text ?? candidate.message
);
if (content.length === 0) {
return null;
}
const metadata = this.toMetadata(candidate.metadata);
return {
id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(),
sessionId,
role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)),
content,
timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt),
...(metadata !== undefined ? { metadata } : {}),
};
}
private extractMessageContent(content: unknown): string {
if (typeof content === "string") {
return content.trim();
}
if (Array.isArray(content)) {
const parts: string[] = [];
for (const part of content) {
if (typeof part === "string") {
const trimmed = part.trim();
if (trimmed.length > 0) {
parts.push(trimmed);
}
continue;
}
if (!this.isRecord(part)) {
continue;
}
const text = this.readString(part.text) ?? this.readString(part.content);
if (text !== undefined && text.trim().length > 0) {
parts.push(text.trim());
}
}
return parts.join("\n\n").trim();
}
if (this.isRecord(content)) {
const text = this.readString(content.text) ?? this.readString(content.content);
return text?.trim() ?? "";
}
return "";
}
private toSessionStatus(status?: string): AgentSessionStatus {
switch (status?.toLowerCase()) {
case "active":
case "running":
return "active";
case "paused":
return "paused";
case "completed":
case "done":
case "succeeded":
return "completed";
case "failed":
case "error":
case "killed":
case "terminated":
case "cancelled":
return "failed";
case "idle":
case "pending":
case "queued":
default:
return "idle";
}
}
private toMessageRole(role?: string): AgentMessageRole {
switch (role?.toLowerCase()) {
case "assistant":
case "agent":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private normalizeLimit(value: number, fallback: number): number {
const normalized = Number.isFinite(value) ? Math.trunc(value) : fallback;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_MESSAGE_LIMIT);
}
private parseDate(value: unknown, fallback = new Date()): Date {
if (value instanceof Date) {
return value;
}
if (typeof value === "string" || typeof value === "number") {
const parsed = new Date(value);
if (!Number.isNaN(parsed.getTime())) {
return parsed;
}
}
return fallback;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (this.isRecord(value)) {
return value;
}
return undefined;
}
private resolveDisplayName(): string {
const credentials = this.readCredentials();
for (const key of DISPLAY_NAME_KEYS) {
const value = this.readString(credentials[key]);
if (value !== undefined) {
return value;
}
}
return this.config.name;
}
private resolveBaseUrl(): string {
const configRecord = this.config as unknown as JsonRecord;
const rawBaseUrl =
this.readString(this.config.gatewayUrl) ?? this.readString(configRecord.baseUrl);
if (rawBaseUrl === undefined) {
throw new Error(`OpenClaw provider ${this.providerId} is missing gateway URL`);
}
try {
const parsed = new URL(rawBaseUrl);
return parsed.toString().replace(/\/$/u, "");
} catch {
throw new Error(`OpenClaw provider ${this.providerId} has invalid gateway URL`);
}
}
private resolveApiToken(): string {
const configRecord = this.config as unknown as JsonRecord;
const credentials = this.readCredentials();
const rawToken =
this.readString(configRecord.apiToken) ??
this.readString(configRecord.token) ??
this.readString(configRecord.bearerToken) ??
this.findFirstString(credentials, API_TOKEN_KEYS);
if (rawToken === undefined) {
throw new Error(`OpenClaw provider ${this.providerId} is missing apiToken credentials`);
}
try {
return this.encryptionService.decryptIfNeeded(rawToken);
} catch (error) {
throw new Error(`Failed to decrypt API token: ${this.toErrorMessage(error)}`);
}
}
private readCredentials(): JsonRecord {
return this.isRecord(this.config.credentials) ? this.config.credentials : {};
}
private findFirstString(record: JsonRecord, keys: readonly string[]): string | undefined {
for (const key of keys) {
const value = this.readString(record[key]);
if (value !== undefined) {
return value;
}
}
return undefined;
}
private authHeaders(extraHeaders: Record<string, string> = {}): Record<string, string> {
return {
Authorization: `Bearer ${this.resolveApiToken()}`,
...extraHeaders,
};
}
private buildUrl(path: string): string {
return new URL(path, `${this.resolveBaseUrl()}/`).toString();
}
private isRecord(value: unknown): value is JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
private readString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
private getHttpStatus(error: unknown): number | undefined {
if (typeof error !== "object" || error === null || !("response" in error)) {
return undefined;
}
const response = (error as HttpErrorWithResponse).response;
return typeof response?.status === "number" ? response.status : undefined;
}
private toServiceUnavailable(operation: string, error: unknown): ServiceUnavailableException {
return new ServiceUnavailableException(
`OpenClaw provider ${this.providerId} failed to ${operation}: ${this.toErrorMessage(error)}`
);
}
private toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
}

View File

@@ -0,0 +1,131 @@
import { Logger } from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { PrismaService } from "../../prisma/prisma.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory";
import { ProvidersModule } from "./providers.module";
type MockOpenClawProvider = {
providerId: string;
validateBaseUrl: ReturnType<typeof vi.fn>;
validateToken: ReturnType<typeof vi.fn>;
isAvailable: ReturnType<typeof vi.fn>;
};
describe("ProvidersModule", () => {
let moduleRef: ProvidersModule;
let prisma: {
agentProviderConfig: {
findMany: ReturnType<typeof vi.fn>;
};
};
let registry: {
registerProvider: ReturnType<typeof vi.fn>;
};
let factory: {
createProvider: ReturnType<typeof vi.fn>;
};
const config: AgentProviderConfig = {
id: "cfg-openclaw-1",
workspaceId: "workspace-1",
name: "openclaw-home",
provider: "openclaw",
gatewayUrl: "https://gateway.example.com",
credentials: { apiToken: "enc:token-value" },
isActive: true,
createdAt: new Date("2026-03-07T15:00:00.000Z"),
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
};
beforeEach(() => {
prisma = {
agentProviderConfig: {
findMany: vi.fn(),
},
};
registry = {
registerProvider: vi.fn(),
};
factory = {
createProvider: vi.fn(),
};
moduleRef = new ProvidersModule(
prisma as unknown as PrismaService,
registry as unknown as AgentProviderRegistry,
factory as unknown as OpenClawProviderFactory
);
});
it("registers reachable OpenClaw providers", async () => {
const provider: MockOpenClawProvider = {
providerId: "openclaw-home",
validateBaseUrl: vi.fn(),
validateToken: vi.fn(),
isAvailable: vi.fn().mockResolvedValue(true),
};
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
factory.createProvider.mockReturnValue(provider);
await moduleRef.onModuleInit();
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
where: {
provider: "openclaw",
isActive: true,
},
orderBy: [{ createdAt: "asc" }, { id: "asc" }],
});
expect(factory.createProvider).toHaveBeenCalledWith(config);
expect(provider.validateBaseUrl).toHaveBeenCalledTimes(1);
expect(provider.validateToken).toHaveBeenCalledTimes(1);
expect(provider.isAvailable).toHaveBeenCalledTimes(1);
expect(registry.registerProvider).toHaveBeenCalledWith(provider);
});
it("skips provider registration when gateway is unreachable", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const provider: MockOpenClawProvider = {
providerId: "openclaw-home",
validateBaseUrl: vi.fn(),
validateToken: vi.fn(),
isAvailable: vi.fn().mockResolvedValue(false),
};
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
factory.createProvider.mockReturnValue(provider);
await moduleRef.onModuleInit();
expect(registry.registerProvider).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Skipping OpenClaw provider openclaw-home")
);
});
it("skips provider registration when token decryption fails", async () => {
const errorSpy = vi.spyOn(Logger.prototype, "error").mockImplementation(() => undefined);
const provider: MockOpenClawProvider = {
providerId: "openclaw-home",
validateBaseUrl: vi.fn(),
validateToken: vi.fn().mockImplementation(() => {
throw new Error("Failed to decrypt API token");
}),
isAvailable: vi.fn().mockResolvedValue(true),
};
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
factory.createProvider.mockReturnValue(provider);
await moduleRef.onModuleInit();
expect(registry.registerProvider).not.toHaveBeenCalled();
expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining("token decryption failed"));
expect(provider.isAvailable).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,95 @@
import { HttpModule } from "@nestjs/axios";
import { Logger, Module, OnModuleInit } from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { PrismaModule } from "../../prisma/prisma.module";
import { PrismaService } from "../../prisma/prisma.service";
import { EncryptionService } from "../../security/encryption.service";
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
import { AgentsModule } from "../agents/agents.module";
import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory";
import { OpenClawSseBridge } from "./openclaw/openclaw-sse.bridge";
const OPENCLAW_PROVIDER_TYPE = "openclaw";
@Module({
imports: [
AgentsModule,
PrismaModule,
HttpModule.register({
timeout: 10000,
maxRedirects: 5,
}),
],
providers: [EncryptionService, OpenClawSseBridge, OpenClawProviderFactory],
})
export class ProvidersModule implements OnModuleInit {
private readonly logger = new Logger(ProvidersModule.name);
constructor(
private readonly prisma: PrismaService,
private readonly registry: AgentProviderRegistry,
private readonly openClawProviderFactory: OpenClawProviderFactory
) {}
async onModuleInit(): Promise<void> {
const configs = await this.prisma.agentProviderConfig.findMany({
where: {
provider: OPENCLAW_PROVIDER_TYPE,
isActive: true,
},
orderBy: [{ createdAt: "asc" }, { id: "asc" }],
});
for (const config of configs) {
await this.registerProvider(config);
}
}
private async registerProvider(config: AgentProviderConfig): Promise<void> {
const provider = this.openClawProviderFactory.createProvider(config);
try {
provider.validateBaseUrl();
} catch (error) {
this.logger.warn(
`Skipping OpenClaw provider ${config.name}: invalid configuration (${this.toErrorMessage(error)})`
);
return;
}
try {
provider.validateToken();
} catch (error) {
this.logger.error(
`Skipping OpenClaw provider ${config.name}: token decryption failed (${this.toErrorMessage(error)})`
);
return;
}
try {
const available = await provider.isAvailable();
if (!available) {
this.logger.warn(
`Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable`
);
return;
}
} catch (error) {
this.logger.warn(
`Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable (${this.toErrorMessage(error)})`
);
return;
}
this.registry.registerProvider(provider);
this.logger.log(`Registered OpenClaw provider ${provider.providerId}`);
}
private toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
}

View File

@@ -4,7 +4,10 @@ import { BullModule } from "@nestjs/bullmq";
import { ThrottlerModule } from "@nestjs/throttler"; import { ThrottlerModule } from "@nestjs/throttler";
import { HealthModule } from "./api/health/health.module"; import { HealthModule } from "./api/health/health.module";
import { AgentsModule } from "./api/agents/agents.module"; import { AgentsModule } from "./api/agents/agents.module";
import { MissionControlModule } from "./api/mission-control/mission-control.module";
import { QueueApiModule } from "./api/queue/queue-api.module"; import { QueueApiModule } from "./api/queue/queue-api.module";
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
import { ProvidersModule } from "./api/providers/providers.module";
import { CoordinatorModule } from "./coordinator/coordinator.module"; import { CoordinatorModule } from "./coordinator/coordinator.module";
import { BudgetModule } from "./budget/budget.module"; import { BudgetModule } from "./budget/budget.module";
import { CIModule } from "./ci"; import { CIModule } from "./ci";
@@ -51,6 +54,9 @@ import { orchestratorConfig } from "./config/orchestrator.config";
]), ]),
HealthModule, HealthModule,
AgentsModule, AgentsModule,
AgentProvidersModule,
ProvidersModule,
MissionControlModule,
QueueApiModule, QueueApiModule,
CoordinatorModule, CoordinatorModule,
BudgetModule, BudgetModule,

View File

@@ -0,0 +1,9 @@
import { Module } from "@nestjs/common";
import { OrchestratorApiKeyGuard } from "../common/guards/api-key.guard";
import { AuthGuard } from "./guards/auth.guard";
@Module({
providers: [OrchestratorApiKeyGuard, AuthGuard],
exports: [OrchestratorApiKeyGuard, AuthGuard],
})
export class AuthModule {}

View File

@@ -0,0 +1,11 @@
import { CanActivate, ExecutionContext, Injectable } from "@nestjs/common";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
@Injectable()
export class AuthGuard implements CanActivate {
constructor(private readonly apiKeyGuard: OrchestratorApiKeyGuard) {}
canActivate(context: ExecutionContext): boolean | Promise<boolean> {
return this.apiKeyGuard.canActivate(context);
}
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from "@nestjs/common";
import { PrismaService } from "./prisma.service";
@Global()
@Module({
providers: [PrismaService],
exports: [PrismaService],
})
export class PrismaModule {}

View File

@@ -0,0 +1,26 @@
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { PrismaClient } from "@prisma/client";
/**
* Lightweight Prisma service for orchestrator ingestion persistence.
*/
@Injectable()
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(PrismaService.name);
constructor() {
super({
log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"],
});
}
async onModuleInit(): Promise<void> {
await this.$connect();
this.logger.log("Database connection established");
}
async onModuleDestroy(): Promise<void> {
await this.$disconnect();
this.logger.log("Database connection closed");
}
}

Some files were not shown because too many files have changed in this diff Show More