diff --git a/.env.example b/.env.example index 36dbd73..720cb59 100644 --- a/.env.example +++ b/.env.example @@ -343,6 +343,11 @@ RATE_LIMIT_STORAGE=redis # DISCORD_CONTROL_CHANNEL_ID=channel-id-for-commands # DISCORD_WORKSPACE_ID=your-workspace-uuid # +# Agent channel routing: Maps Discord channels to specific agents. +# Format: :,: +# Example: 123456789:jarvis,987654321:builder +# DISCORD_AGENT_CHANNELS= +# # SECURITY: DISCORD_WORKSPACE_ID must be a valid workspace UUID from your database. # All Discord commands will execute within this workspace context for proper # multi-tenant isolation. Each Discord bot instance should be configured for diff --git a/apps/api/src/bridge/bridge.module.spec.ts b/apps/api/src/bridge/bridge.module.spec.ts index 6660e7f..99405c8 100644 --- a/apps/api/src/bridge/bridge.module.spec.ts +++ b/apps/api/src/bridge/bridge.module.spec.ts @@ -5,6 +5,7 @@ import { MatrixService } from "./matrix/matrix.service"; import { StitcherService } from "../stitcher/stitcher.service"; import { PrismaService } from "../prisma/prisma.service"; import { BullMqService } from "../bullmq/bullmq.service"; +import { ChatProxyService } from "../chat-proxy/chat-proxy.service"; import { CHAT_PROVIDERS } from "./bridge.constants"; import type { IChatProvider } from "./interfaces"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; @@ -89,6 +90,7 @@ interface SavedEnvVars { MATRIX_CONTROL_ROOM_ID?: string; MATRIX_WORKSPACE_ID?: string; ENCRYPTION_KEY?: string; + MOSAIC_SECRET_KEY?: string; } describe("BridgeModule", () => { @@ -106,6 +108,7 @@ describe("BridgeModule", () => { MATRIX_CONTROL_ROOM_ID: process.env.MATRIX_CONTROL_ROOM_ID, MATRIX_WORKSPACE_ID: process.env.MATRIX_WORKSPACE_ID, ENCRYPTION_KEY: process.env.ENCRYPTION_KEY, + MOSAIC_SECRET_KEY: process.env.MOSAIC_SECRET_KEY, }; // Clear all bridge env vars @@ -120,6 +123,8 @@ describe("BridgeModule", () => { // Set encryption key (needed by StitcherService) process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; + // Set MOSAIC_SECRET_KEY (needed by CryptoService via ChatProxyModule) + process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long"; // Clear ready callbacks mockReadyCallbacks.length = 0; @@ -149,6 +154,10 @@ describe("BridgeModule", () => { .useValue({}) .overrideProvider(BullMqService) .useValue({}) + .overrideProvider(ChatProxyService) + .useValue({ + proxyChat: vi.fn().mockResolvedValue(new Response()), + }) .compile(); } diff --git a/apps/api/src/bridge/bridge.module.ts b/apps/api/src/bridge/bridge.module.ts index d68d204..4d90993 100644 --- a/apps/api/src/bridge/bridge.module.ts +++ b/apps/api/src/bridge/bridge.module.ts @@ -5,6 +5,8 @@ import { MatrixRoomService } from "./matrix/matrix-room.service"; import { MatrixStreamingService } from "./matrix/matrix-streaming.service"; import { CommandParserService } from "./parser/command-parser.service"; import { StitcherModule } from "../stitcher/stitcher.module"; +import { ChatProxyModule } from "../chat-proxy/chat-proxy.module"; +import { PrismaModule } from "../prisma/prisma.module"; import { CHAT_PROVIDERS } from "./bridge.constants"; import type { IChatProvider } from "./interfaces"; @@ -28,7 +30,7 @@ const logger = new Logger("BridgeModule"); * MatrixRoomService handles workspace-to-Matrix-room mapping. */ @Module({ - imports: [StitcherModule], + imports: [StitcherModule, ChatProxyModule, PrismaModule], providers: [ CommandParserService, MatrixRoomService, diff --git a/apps/api/src/bridge/discord/discord.service.spec.ts b/apps/api/src/bridge/discord/discord.service.spec.ts index 30d8d2a..cdf6e4e 100644 --- a/apps/api/src/bridge/discord/discord.service.spec.ts +++ b/apps/api/src/bridge/discord/discord.service.spec.ts @@ -1,6 +1,8 @@ import { Test, TestingModule } from "@nestjs/testing"; import { DiscordService } from "./discord.service"; import { StitcherService } from "../../stitcher/stitcher.service"; +import { ChatProxyService } from "../../chat-proxy/chat-proxy.service"; +import { PrismaService } from "../../prisma/prisma.service"; import { Client, Events, GatewayIntentBits, Message } from "discord.js"; import { vi, describe, it, expect, beforeEach } from "vitest"; import type { ChatMessage, ChatCommand } from "../interfaces"; @@ -61,6 +63,8 @@ vi.mock("discord.js", () => { describe("DiscordService", () => { let service: DiscordService; let stitcherService: StitcherService; + let chatProxyService: ChatProxyService; + let prismaService: PrismaService; const mockStitcherService = { dispatchJob: vi.fn().mockResolvedValue({ @@ -71,12 +75,29 @@ describe("DiscordService", () => { trackJobEvent: vi.fn().mockResolvedValue(undefined), }; + const mockChatProxyService = { + proxyChat: vi.fn().mockResolvedValue( + new Response('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\ndata: [DONE]\n\n', { + headers: { "Content-Type": "text/event-stream" }, + }) + ), + }; + + const mockPrismaService = { + workspace: { + findUnique: vi.fn().mockResolvedValue({ + ownerId: "owner-user-id", + }), + }, + }; + beforeEach(async () => { // Set environment variables for testing process.env.DISCORD_BOT_TOKEN = "test-token"; process.env.DISCORD_GUILD_ID = "test-guild-id"; process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id"; process.env.DISCORD_WORKSPACE_ID = "test-workspace-id"; + process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder"; // Clear callbacks mockReadyCallbacks.length = 0; @@ -89,11 +110,21 @@ describe("DiscordService", () => { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: ChatProxyService, + useValue: mockChatProxyService, + }, + { + provide: PrismaService, + useValue: mockPrismaService, + }, ], }).compile(); service = module.get(DiscordService); stitcherService = module.get(StitcherService); + chatProxyService = module.get(ChatProxyService); + prismaService = module.get(PrismaService); // Clear all mocks vi.clearAllMocks(); @@ -449,6 +480,14 @@ describe("DiscordService", () => { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: ChatProxyService, + useValue: mockChatProxyService, + }, + { + provide: PrismaService, + useValue: mockPrismaService, + }, ], }).compile(); @@ -470,6 +509,14 @@ describe("DiscordService", () => { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: ChatProxyService, + useValue: mockChatProxyService, + }, + { + provide: PrismaService, + useValue: mockPrismaService, + }, ], }).compile(); @@ -492,6 +539,14 @@ describe("DiscordService", () => { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: ChatProxyService, + useValue: mockChatProxyService, + }, + { + provide: PrismaService, + useValue: mockPrismaService, + }, ], }).compile(); @@ -654,4 +709,150 @@ describe("DiscordService", () => { expect(loggedError.statusCode).toBe(408); }); }); + + describe("Agent Channel Routing", () => { + it("should load agent channel mappings from environment", () => { + // The service should have loaded the agent channels from DISCORD_AGENT_CHANNELS + expect((service as any).agentChannels.size).toBe(2); + expect((service as any).agentChannels.get("jarvis-channel")).toBe("jarvis"); + expect((service as any).agentChannels.get("builder-channel")).toBe("builder"); + }); + + it("should handle empty agent channels config", async () => { + delete process.env.DISCORD_AGENT_CHANNELS; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + DiscordService, + { + provide: StitcherService, + useValue: mockStitcherService, + }, + { + provide: ChatProxyService, + useValue: mockChatProxyService, + }, + { + provide: PrismaService, + useValue: mockPrismaService, + }, + ], + }).compile(); + + const newService = module.get(DiscordService); + expect((newService as any).agentChannels.size).toBe(0); + + // Restore for other tests + process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder"; + }); + + it("should route messages in agent channels to ChatProxyService", async () => { + const mockChannel = { + send: vi.fn().mockResolvedValue({}), + isTextBased: () => true, + sendTyping: vi.fn(), + }; + (mockClient.channels.fetch as any).mockResolvedValue(mockChannel); + + // Create a mock streaming response + const mockStreamResponse = new Response( + 'data: {"choices":[{"delta":{"content":"Test response"}}]}\n\ndata: [DONE]\n\n', + { headers: { "Content-Type": "text/event-stream" } } + ); + mockChatProxyService.proxyChat.mockResolvedValue(mockStreamResponse); + + await service.connect(); + + // Simulate a message in the jarvis channel + const message: ChatMessage = { + id: "msg-agent-1", + channelId: "jarvis-channel", + authorId: "user-1", + authorName: "TestUser", + content: "Hello Jarvis!", + timestamp: new Date(), + }; + + // Call handleAgentChat directly + await (service as any).handleAgentChat(message, "jarvis"); + + // Verify ChatProxyService was called with workspace owner's ID and agent name + expect(mockChatProxyService.proxyChat).toHaveBeenCalledWith( + "owner-user-id", + [{ role: "user", content: "Hello Jarvis!" }], + undefined, + "jarvis" + ); + + // Verify response was sent to channel + expect(mockChannel.send).toHaveBeenCalled(); + }); + + it("should not route empty messages", async () => { + const message: ChatMessage = { + id: "msg-empty", + channelId: "jarvis-channel", + authorId: "user-1", + authorName: "TestUser", + content: " ", + timestamp: new Date(), + }; + + await (service as any).handleAgentChat(message, "jarvis"); + + expect(mockChatProxyService.proxyChat).not.toHaveBeenCalled(); + }); + + it("should handle ChatProxyService errors gracefully", async () => { + const mockChannel = { + send: vi.fn().mockResolvedValue({}), + isTextBased: () => true, + sendTyping: vi.fn(), + }; + (mockClient.channels.fetch as any).mockResolvedValue(mockChannel); + + mockChatProxyService.proxyChat.mockRejectedValue(new Error("Agent not found")); + + await service.connect(); + + const message: ChatMessage = { + id: "msg-error", + channelId: "jarvis-channel", + authorId: "user-1", + authorName: "TestUser", + content: "Hello", + timestamp: new Date(), + }; + + await (service as any).handleAgentChat(message, "jarvis"); + + // Should send error message to channel + expect(mockChannel.send).toHaveBeenCalledWith( + expect.stringContaining("Failed to get response from jarvis") + ); + }); + + it("should split long messages for Discord", () => { + const longContent = "A".repeat(5000); + const chunks = (service as any).splitMessageForDiscord(longContent); + + // Should split into chunks of 2000 or less + expect(chunks.length).toBeGreaterThan(1); + for (const chunk of chunks) { + expect(chunk.length).toBeLessThanOrEqual(2000); + } + + // Reassembled content should match original + expect(chunks.join("")).toBe(longContent.trim()); + }); + + it("should prefer paragraph breaks when splitting messages", () => { + const content = "A".repeat(1500) + "\n\n" + "B".repeat(1500); + const chunks = (service as any).splitMessageForDiscord(content); + + expect(chunks.length).toBe(2); + expect(chunks[0]).toContain("A"); + expect(chunks[1]).toContain("B"); + }); + }); }); diff --git a/apps/api/src/bridge/discord/discord.service.ts b/apps/api/src/bridge/discord/discord.service.ts index 2b7e488..4666830 100644 --- a/apps/api/src/bridge/discord/discord.service.ts +++ b/apps/api/src/bridge/discord/discord.service.ts @@ -1,6 +1,8 @@ import { Injectable, Logger } from "@nestjs/common"; import { Client, Events, GatewayIntentBits, TextChannel, ThreadChannel } from "discord.js"; import { StitcherService } from "../../stitcher/stitcher.service"; +import { ChatProxyService } from "../../chat-proxy/chat-proxy.service"; +import { PrismaService } from "../../prisma/prisma.service"; import { sanitizeForLogging } from "../../common/utils"; import type { IChatProvider, @@ -17,6 +19,7 @@ import type { * - Connect to Discord via bot token * - Listen for commands in designated channels * - Forward commands to stitcher + * - Route messages in agent channels to specific agents via ChatProxyService * - Receive status updates from herald * - Post updates to threads */ @@ -28,12 +31,21 @@ export class DiscordService implements IChatProvider { private readonly botToken: string; private readonly controlChannelId: string; private readonly workspaceId: string; + private readonly agentChannels = new Map(); + private workspaceOwnerId: string | null = null; - constructor(private readonly stitcherService: StitcherService) { + constructor( + private readonly stitcherService: StitcherService, + private readonly chatProxyService: ChatProxyService, + private readonly prisma: PrismaService + ) { this.botToken = process.env.DISCORD_BOT_TOKEN ?? ""; this.controlChannelId = process.env.DISCORD_CONTROL_CHANNEL_ID ?? ""; this.workspaceId = process.env.DISCORD_WORKSPACE_ID ?? ""; + // Load agent channel mappings from environment + this.loadAgentChannels(); + // Initialize Discord client with required intents this.client = new Client({ intents: [ @@ -46,6 +58,51 @@ export class DiscordService implements IChatProvider { this.setupEventHandlers(); } + /** + * Load agent channel mappings from environment variables. + * Format: DISCORD_AGENT_CHANNELS=:,: + * Example: DISCORD_AGENT_CHANNELS=123456:jarvis,789012:builder + */ + private loadAgentChannels(): void { + const channelsConfig = process.env.DISCORD_AGENT_CHANNELS ?? ""; + if (!channelsConfig) { + this.logger.debug("No agent channels configured (DISCORD_AGENT_CHANNELS not set)"); + return; + } + + const channels = channelsConfig.split(",").map((pair) => pair.trim()); + for (const channel of channels) { + const [channelId, agentName] = channel.split(":"); + if (channelId && agentName) { + this.agentChannels.set(channelId.trim(), agentName.trim()); + this.logger.log(`Agent channel mapped: ${channelId.trim()} → ${agentName.trim()}`); + } + } + } + + /** + * Get the workspace owner's user ID for chat proxy routing. + * Caches the result after first lookup. + */ + private async getWorkspaceOwnerId(): Promise { + if (this.workspaceOwnerId) { + return this.workspaceOwnerId; + } + + const workspace = await this.prisma.workspace.findUnique({ + where: { id: this.workspaceId }, + select: { ownerId: true }, + }); + + if (!workspace) { + throw new Error(`Workspace not found: ${this.workspaceId}`); + } + + this.workspaceOwnerId = workspace.ownerId; + this.logger.debug(`Workspace owner resolved: ${workspace.ownerId}`); + return workspace.ownerId; + } + /** * Setup event handlers for Discord client */ @@ -60,9 +117,6 @@ export class DiscordService implements IChatProvider { // Ignore bot messages if (message.author.bot) return; - // Check if message is in control channel - if (message.channelId !== this.controlChannelId) return; - // Parse message into ChatMessage format const chatMessage: ChatMessage = { id: message.id, @@ -74,6 +128,16 @@ export class DiscordService implements IChatProvider { ...(message.channel.isThread() && { threadId: message.channelId }), }; + // Check if message is in an agent channel + const agentName = this.agentChannels.get(message.channelId); + if (agentName) { + void this.handleAgentChat(chatMessage, agentName); + return; + } + + // Check if message is in control channel for commands + if (message.channelId !== this.controlChannelId) return; + // Parse command const command = this.parseCommand(chatMessage); if (command) { @@ -394,4 +458,150 @@ export class DiscordService implements IChatProvider { await this.sendMessage(message.channelId, helpMessage); } + + /** + * Handle agent chat - Route message to specific agent via ChatProxyService + * Messages in agent channels are sent directly to the agent without requiring @mosaic prefix. + */ + private async handleAgentChat(message: ChatMessage, agentName: string): Promise { + this.logger.log( + `Routing message from ${message.authorName} to agent "${agentName}" in channel ${message.channelId}` + ); + + // Ignore empty messages + if (!message.content.trim()) { + return; + } + + try { + // Get workspace owner ID for routing + const userId = await this.getWorkspaceOwnerId(); + + // Build message history (just the user's message for now) + const messages = [{ role: "user" as const, content: message.content }]; + + // Send typing indicator while waiting for response + const channel = await this.client.channels.fetch(message.channelId); + if (channel?.isTextBased()) { + void (channel as TextChannel).sendTyping(); + } + + // Proxy to agent + const response = await this.chatProxyService.proxyChat( + userId, + messages, + undefined, + agentName + ); + + // Stream the response to channel + await this.streamResponseToChannel(message.channelId, response); + + this.logger.debug(`Agent "${agentName}" response sent to channel ${message.channelId}`); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Failed to route message to agent "${agentName}": ${errorMessage}`); + await this.sendMessage( + message.channelId, + `Failed to get response from ${agentName}. Please try again later.` + ); + } + } + + /** + * Stream SSE response from chat proxy and send to Discord channel. + * Collects the full response and sends as a single message for reliability. + */ + private async streamResponseToChannel(channelId: string, response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) { + throw new Error("Response body is not readable"); + } + + const decoder = new TextDecoder(); + let fullContent = ""; + let buffer = ""; + + try { + let readResult = await reader.read(); + while (!readResult.done) { + const { value } = readResult; + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + if (line.startsWith("data: ")) { + const data = line.slice(6); + if (data === "[DONE]") continue; + + try { + const parsed = JSON.parse(data) as { + choices?: { delta?: { content?: string } }[]; + }; + const content = parsed.choices?.[0]?.delta?.content; + if (content) { + fullContent += content; + } + } catch { + // Skip invalid JSON + } + } + } + readResult = await reader.read(); + } + + // Send the full response to Discord + if (fullContent.trim()) { + // Discord has a 2000 character limit, split if needed + const chunks = this.splitMessageForDiscord(fullContent); + for (const chunk of chunks) { + await this.sendMessage(channelId, chunk); + } + } + + return fullContent; + } finally { + reader.releaseLock(); + } + } + + /** + * Split a message into chunks that fit within Discord's 2000 character limit. + * Tries to split on paragraph or sentence boundaries when possible. + */ + private splitMessageForDiscord(content: string, maxLength = 2000): string[] { + if (content.length <= maxLength) { + return [content]; + } + + const chunks: string[] = []; + let remaining = content; + + while (remaining.length > maxLength) { + // Try to find a good break point + let breakPoint = remaining.lastIndexOf("\n\n", maxLength); + if (breakPoint < maxLength * 0.5) { + breakPoint = remaining.lastIndexOf("\n", maxLength); + } + if (breakPoint < maxLength * 0.5) { + breakPoint = remaining.lastIndexOf(". ", maxLength); + } + if (breakPoint < maxLength * 0.5) { + breakPoint = remaining.lastIndexOf(" ", maxLength); + } + if (breakPoint < maxLength * 0.5) { + breakPoint = maxLength - 1; + } + + chunks.push(remaining.slice(0, breakPoint + 1).trim()); + remaining = remaining.slice(breakPoint + 1).trim(); + } + + if (remaining.length > 0) { + chunks.push(remaining); + } + + return chunks; + } } diff --git a/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts b/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts index 20c3700..1059db8 100644 --- a/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts +++ b/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts @@ -28,6 +28,7 @@ import { StitcherService } from "../../stitcher/stitcher.service"; import { HeraldService } from "../../herald/herald.service"; import { PrismaService } from "../../prisma/prisma.service"; import { BullMqService } from "../../bullmq/bullmq.service"; +import { ChatProxyService } from "../../chat-proxy/chat-proxy.service"; import type { IChatProvider } from "../interfaces"; import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types"; @@ -192,6 +193,7 @@ function setDiscordEnv(): void { function setEncryptionKey(): void { process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; + process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long"; } /** @@ -205,6 +207,10 @@ async function compileBridgeModule(): Promise { .useValue({}) .overrideProvider(BullMqService) .useValue({}) + .overrideProvider(ChatProxyService) + .useValue({ + proxyChat: vi.fn().mockResolvedValue(new Response()), + }) .compile(); }