feat(ms22-p2): add Discord channel → agent routing (#688)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #688.
This commit is contained in:
@@ -343,6 +343,11 @@ RATE_LIMIT_STORAGE=redis
|
|||||||
# DISCORD_CONTROL_CHANNEL_ID=channel-id-for-commands
|
# DISCORD_CONTROL_CHANNEL_ID=channel-id-for-commands
|
||||||
# DISCORD_WORKSPACE_ID=your-workspace-uuid
|
# DISCORD_WORKSPACE_ID=your-workspace-uuid
|
||||||
#
|
#
|
||||||
|
# Agent channel routing: Maps Discord channels to specific agents.
|
||||||
|
# Format: <channelId>:<agentName>,<channelId>:<agentName>
|
||||||
|
# Example: 123456789:jarvis,987654321:builder
|
||||||
|
# DISCORD_AGENT_CHANNELS=
|
||||||
|
#
|
||||||
# SECURITY: DISCORD_WORKSPACE_ID must be a valid workspace UUID from your database.
|
# SECURITY: DISCORD_WORKSPACE_ID must be a valid workspace UUID from your database.
|
||||||
# All Discord commands will execute within this workspace context for proper
|
# All Discord commands will execute within this workspace context for proper
|
||||||
# multi-tenant isolation. Each Discord bot instance should be configured for
|
# multi-tenant isolation. Each Discord bot instance should be configured for
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { MatrixService } from "./matrix/matrix.service";
|
|||||||
import { StitcherService } from "../stitcher/stitcher.service";
|
import { StitcherService } from "../stitcher/stitcher.service";
|
||||||
import { PrismaService } from "../prisma/prisma.service";
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
import { BullMqService } from "../bullmq/bullmq.service";
|
import { BullMqService } from "../bullmq/bullmq.service";
|
||||||
|
import { ChatProxyService } from "../chat-proxy/chat-proxy.service";
|
||||||
import { CHAT_PROVIDERS } from "./bridge.constants";
|
import { CHAT_PROVIDERS } from "./bridge.constants";
|
||||||
import type { IChatProvider } from "./interfaces";
|
import type { IChatProvider } from "./interfaces";
|
||||||
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||||
@@ -89,6 +90,7 @@ interface SavedEnvVars {
|
|||||||
MATRIX_CONTROL_ROOM_ID?: string;
|
MATRIX_CONTROL_ROOM_ID?: string;
|
||||||
MATRIX_WORKSPACE_ID?: string;
|
MATRIX_WORKSPACE_ID?: string;
|
||||||
ENCRYPTION_KEY?: string;
|
ENCRYPTION_KEY?: string;
|
||||||
|
MOSAIC_SECRET_KEY?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
describe("BridgeModule", () => {
|
describe("BridgeModule", () => {
|
||||||
@@ -106,6 +108,7 @@ describe("BridgeModule", () => {
|
|||||||
MATRIX_CONTROL_ROOM_ID: process.env.MATRIX_CONTROL_ROOM_ID,
|
MATRIX_CONTROL_ROOM_ID: process.env.MATRIX_CONTROL_ROOM_ID,
|
||||||
MATRIX_WORKSPACE_ID: process.env.MATRIX_WORKSPACE_ID,
|
MATRIX_WORKSPACE_ID: process.env.MATRIX_WORKSPACE_ID,
|
||||||
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY,
|
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY,
|
||||||
|
MOSAIC_SECRET_KEY: process.env.MOSAIC_SECRET_KEY,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Clear all bridge env vars
|
// Clear all bridge env vars
|
||||||
@@ -120,6 +123,8 @@ describe("BridgeModule", () => {
|
|||||||
|
|
||||||
// Set encryption key (needed by StitcherService)
|
// Set encryption key (needed by StitcherService)
|
||||||
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
|
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
|
||||||
|
// Set MOSAIC_SECRET_KEY (needed by CryptoService via ChatProxyModule)
|
||||||
|
process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long";
|
||||||
|
|
||||||
// Clear ready callbacks
|
// Clear ready callbacks
|
||||||
mockReadyCallbacks.length = 0;
|
mockReadyCallbacks.length = 0;
|
||||||
@@ -149,6 +154,10 @@ describe("BridgeModule", () => {
|
|||||||
.useValue({})
|
.useValue({})
|
||||||
.overrideProvider(BullMqService)
|
.overrideProvider(BullMqService)
|
||||||
.useValue({})
|
.useValue({})
|
||||||
|
.overrideProvider(ChatProxyService)
|
||||||
|
.useValue({
|
||||||
|
proxyChat: vi.fn().mockResolvedValue(new Response()),
|
||||||
|
})
|
||||||
.compile();
|
.compile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import { MatrixRoomService } from "./matrix/matrix-room.service";
|
|||||||
import { MatrixStreamingService } from "./matrix/matrix-streaming.service";
|
import { MatrixStreamingService } from "./matrix/matrix-streaming.service";
|
||||||
import { CommandParserService } from "./parser/command-parser.service";
|
import { CommandParserService } from "./parser/command-parser.service";
|
||||||
import { StitcherModule } from "../stitcher/stitcher.module";
|
import { StitcherModule } from "../stitcher/stitcher.module";
|
||||||
|
import { ChatProxyModule } from "../chat-proxy/chat-proxy.module";
|
||||||
|
import { PrismaModule } from "../prisma/prisma.module";
|
||||||
import { CHAT_PROVIDERS } from "./bridge.constants";
|
import { CHAT_PROVIDERS } from "./bridge.constants";
|
||||||
import type { IChatProvider } from "./interfaces";
|
import type { IChatProvider } from "./interfaces";
|
||||||
|
|
||||||
@@ -28,7 +30,7 @@ const logger = new Logger("BridgeModule");
|
|||||||
* MatrixRoomService handles workspace-to-Matrix-room mapping.
|
* MatrixRoomService handles workspace-to-Matrix-room mapping.
|
||||||
*/
|
*/
|
||||||
@Module({
|
@Module({
|
||||||
imports: [StitcherModule],
|
imports: [StitcherModule, ChatProxyModule, PrismaModule],
|
||||||
providers: [
|
providers: [
|
||||||
CommandParserService,
|
CommandParserService,
|
||||||
MatrixRoomService,
|
MatrixRoomService,
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { Test, TestingModule } from "@nestjs/testing";
|
import { Test, TestingModule } from "@nestjs/testing";
|
||||||
import { DiscordService } from "./discord.service";
|
import { DiscordService } from "./discord.service";
|
||||||
import { StitcherService } from "../../stitcher/stitcher.service";
|
import { StitcherService } from "../../stitcher/stitcher.service";
|
||||||
|
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
import { Client, Events, GatewayIntentBits, Message } from "discord.js";
|
import { Client, Events, GatewayIntentBits, Message } from "discord.js";
|
||||||
import { vi, describe, it, expect, beforeEach } from "vitest";
|
import { vi, describe, it, expect, beforeEach } from "vitest";
|
||||||
import type { ChatMessage, ChatCommand } from "../interfaces";
|
import type { ChatMessage, ChatCommand } from "../interfaces";
|
||||||
@@ -61,6 +63,8 @@ vi.mock("discord.js", () => {
|
|||||||
describe("DiscordService", () => {
|
describe("DiscordService", () => {
|
||||||
let service: DiscordService;
|
let service: DiscordService;
|
||||||
let stitcherService: StitcherService;
|
let stitcherService: StitcherService;
|
||||||
|
let chatProxyService: ChatProxyService;
|
||||||
|
let prismaService: PrismaService;
|
||||||
|
|
||||||
const mockStitcherService = {
|
const mockStitcherService = {
|
||||||
dispatchJob: vi.fn().mockResolvedValue({
|
dispatchJob: vi.fn().mockResolvedValue({
|
||||||
@@ -71,12 +75,29 @@ describe("DiscordService", () => {
|
|||||||
trackJobEvent: vi.fn().mockResolvedValue(undefined),
|
trackJobEvent: vi.fn().mockResolvedValue(undefined),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const mockChatProxyService = {
|
||||||
|
proxyChat: vi.fn().mockResolvedValue(
|
||||||
|
new Response('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\ndata: [DONE]\n\n', {
|
||||||
|
headers: { "Content-Type": "text/event-stream" },
|
||||||
|
})
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockPrismaService = {
|
||||||
|
workspace: {
|
||||||
|
findUnique: vi.fn().mockResolvedValue({
|
||||||
|
ownerId: "owner-user-id",
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
// Set environment variables for testing
|
// Set environment variables for testing
|
||||||
process.env.DISCORD_BOT_TOKEN = "test-token";
|
process.env.DISCORD_BOT_TOKEN = "test-token";
|
||||||
process.env.DISCORD_GUILD_ID = "test-guild-id";
|
process.env.DISCORD_GUILD_ID = "test-guild-id";
|
||||||
process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id";
|
process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id";
|
||||||
process.env.DISCORD_WORKSPACE_ID = "test-workspace-id";
|
process.env.DISCORD_WORKSPACE_ID = "test-workspace-id";
|
||||||
|
process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder";
|
||||||
|
|
||||||
// Clear callbacks
|
// Clear callbacks
|
||||||
mockReadyCallbacks.length = 0;
|
mockReadyCallbacks.length = 0;
|
||||||
@@ -89,11 +110,21 @@ describe("DiscordService", () => {
|
|||||||
provide: StitcherService,
|
provide: StitcherService,
|
||||||
useValue: mockStitcherService,
|
useValue: mockStitcherService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: ChatProxyService,
|
||||||
|
useValue: mockChatProxyService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: PrismaService,
|
||||||
|
useValue: mockPrismaService,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
service = module.get<DiscordService>(DiscordService);
|
service = module.get<DiscordService>(DiscordService);
|
||||||
stitcherService = module.get<StitcherService>(StitcherService);
|
stitcherService = module.get<StitcherService>(StitcherService);
|
||||||
|
chatProxyService = module.get<ChatProxyService>(ChatProxyService);
|
||||||
|
prismaService = module.get<PrismaService>(PrismaService);
|
||||||
|
|
||||||
// Clear all mocks
|
// Clear all mocks
|
||||||
vi.clearAllMocks();
|
vi.clearAllMocks();
|
||||||
@@ -449,6 +480,14 @@ describe("DiscordService", () => {
|
|||||||
provide: StitcherService,
|
provide: StitcherService,
|
||||||
useValue: mockStitcherService,
|
useValue: mockStitcherService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: ChatProxyService,
|
||||||
|
useValue: mockChatProxyService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: PrismaService,
|
||||||
|
useValue: mockPrismaService,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
@@ -470,6 +509,14 @@ describe("DiscordService", () => {
|
|||||||
provide: StitcherService,
|
provide: StitcherService,
|
||||||
useValue: mockStitcherService,
|
useValue: mockStitcherService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: ChatProxyService,
|
||||||
|
useValue: mockChatProxyService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: PrismaService,
|
||||||
|
useValue: mockPrismaService,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
@@ -492,6 +539,14 @@ describe("DiscordService", () => {
|
|||||||
provide: StitcherService,
|
provide: StitcherService,
|
||||||
useValue: mockStitcherService,
|
useValue: mockStitcherService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: ChatProxyService,
|
||||||
|
useValue: mockChatProxyService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: PrismaService,
|
||||||
|
useValue: mockPrismaService,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
@@ -654,4 +709,150 @@ describe("DiscordService", () => {
|
|||||||
expect(loggedError.statusCode).toBe(408);
|
expect(loggedError.statusCode).toBe(408);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Agent Channel Routing", () => {
|
||||||
|
it("should load agent channel mappings from environment", () => {
|
||||||
|
// The service should have loaded the agent channels from DISCORD_AGENT_CHANNELS
|
||||||
|
expect((service as any).agentChannels.size).toBe(2);
|
||||||
|
expect((service as any).agentChannels.get("jarvis-channel")).toBe("jarvis");
|
||||||
|
expect((service as any).agentChannels.get("builder-channel")).toBe("builder");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should handle empty agent channels config", async () => {
|
||||||
|
delete process.env.DISCORD_AGENT_CHANNELS;
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
providers: [
|
||||||
|
DiscordService,
|
||||||
|
{
|
||||||
|
provide: StitcherService,
|
||||||
|
useValue: mockStitcherService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: ChatProxyService,
|
||||||
|
useValue: mockChatProxyService,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: PrismaService,
|
||||||
|
useValue: mockPrismaService,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
const newService = module.get<DiscordService>(DiscordService);
|
||||||
|
expect((newService as any).agentChannels.size).toBe(0);
|
||||||
|
|
||||||
|
// Restore for other tests
|
||||||
|
process.env.DISCORD_AGENT_CHANNELS = "jarvis-channel:jarvis,builder-channel:builder";
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should route messages in agent channels to ChatProxyService", async () => {
|
||||||
|
const mockChannel = {
|
||||||
|
send: vi.fn().mockResolvedValue({}),
|
||||||
|
isTextBased: () => true,
|
||||||
|
sendTyping: vi.fn(),
|
||||||
|
};
|
||||||
|
(mockClient.channels.fetch as any).mockResolvedValue(mockChannel);
|
||||||
|
|
||||||
|
// Create a mock streaming response
|
||||||
|
const mockStreamResponse = new Response(
|
||||||
|
'data: {"choices":[{"delta":{"content":"Test response"}}]}\n\ndata: [DONE]\n\n',
|
||||||
|
{ headers: { "Content-Type": "text/event-stream" } }
|
||||||
|
);
|
||||||
|
mockChatProxyService.proxyChat.mockResolvedValue(mockStreamResponse);
|
||||||
|
|
||||||
|
await service.connect();
|
||||||
|
|
||||||
|
// Simulate a message in the jarvis channel
|
||||||
|
const message: ChatMessage = {
|
||||||
|
id: "msg-agent-1",
|
||||||
|
channelId: "jarvis-channel",
|
||||||
|
authorId: "user-1",
|
||||||
|
authorName: "TestUser",
|
||||||
|
content: "Hello Jarvis!",
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Call handleAgentChat directly
|
||||||
|
await (service as any).handleAgentChat(message, "jarvis");
|
||||||
|
|
||||||
|
// Verify ChatProxyService was called with workspace owner's ID and agent name
|
||||||
|
expect(mockChatProxyService.proxyChat).toHaveBeenCalledWith(
|
||||||
|
"owner-user-id",
|
||||||
|
[{ role: "user", content: "Hello Jarvis!" }],
|
||||||
|
undefined,
|
||||||
|
"jarvis"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify response was sent to channel
|
||||||
|
expect(mockChannel.send).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should not route empty messages", async () => {
|
||||||
|
const message: ChatMessage = {
|
||||||
|
id: "msg-empty",
|
||||||
|
channelId: "jarvis-channel",
|
||||||
|
authorId: "user-1",
|
||||||
|
authorName: "TestUser",
|
||||||
|
content: " ",
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
await (service as any).handleAgentChat(message, "jarvis");
|
||||||
|
|
||||||
|
expect(mockChatProxyService.proxyChat).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should handle ChatProxyService errors gracefully", async () => {
|
||||||
|
const mockChannel = {
|
||||||
|
send: vi.fn().mockResolvedValue({}),
|
||||||
|
isTextBased: () => true,
|
||||||
|
sendTyping: vi.fn(),
|
||||||
|
};
|
||||||
|
(mockClient.channels.fetch as any).mockResolvedValue(mockChannel);
|
||||||
|
|
||||||
|
mockChatProxyService.proxyChat.mockRejectedValue(new Error("Agent not found"));
|
||||||
|
|
||||||
|
await service.connect();
|
||||||
|
|
||||||
|
const message: ChatMessage = {
|
||||||
|
id: "msg-error",
|
||||||
|
channelId: "jarvis-channel",
|
||||||
|
authorId: "user-1",
|
||||||
|
authorName: "TestUser",
|
||||||
|
content: "Hello",
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
await (service as any).handleAgentChat(message, "jarvis");
|
||||||
|
|
||||||
|
// Should send error message to channel
|
||||||
|
expect(mockChannel.send).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Failed to get response from jarvis")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should split long messages for Discord", () => {
|
||||||
|
const longContent = "A".repeat(5000);
|
||||||
|
const chunks = (service as any).splitMessageForDiscord(longContent);
|
||||||
|
|
||||||
|
// Should split into chunks of 2000 or less
|
||||||
|
expect(chunks.length).toBeGreaterThan(1);
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
expect(chunk.length).toBeLessThanOrEqual(2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reassembled content should match original
|
||||||
|
expect(chunks.join("")).toBe(longContent.trim());
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should prefer paragraph breaks when splitting messages", () => {
|
||||||
|
const content = "A".repeat(1500) + "\n\n" + "B".repeat(1500);
|
||||||
|
const chunks = (service as any).splitMessageForDiscord(content);
|
||||||
|
|
||||||
|
expect(chunks.length).toBe(2);
|
||||||
|
expect(chunks[0]).toContain("A");
|
||||||
|
expect(chunks[1]).toContain("B");
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { Injectable, Logger } from "@nestjs/common";
|
import { Injectable, Logger } from "@nestjs/common";
|
||||||
import { Client, Events, GatewayIntentBits, TextChannel, ThreadChannel } from "discord.js";
|
import { Client, Events, GatewayIntentBits, TextChannel, ThreadChannel } from "discord.js";
|
||||||
import { StitcherService } from "../../stitcher/stitcher.service";
|
import { StitcherService } from "../../stitcher/stitcher.service";
|
||||||
|
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
import { sanitizeForLogging } from "../../common/utils";
|
import { sanitizeForLogging } from "../../common/utils";
|
||||||
import type {
|
import type {
|
||||||
IChatProvider,
|
IChatProvider,
|
||||||
@@ -17,6 +19,7 @@ import type {
|
|||||||
* - Connect to Discord via bot token
|
* - Connect to Discord via bot token
|
||||||
* - Listen for commands in designated channels
|
* - Listen for commands in designated channels
|
||||||
* - Forward commands to stitcher
|
* - Forward commands to stitcher
|
||||||
|
* - Route messages in agent channels to specific agents via ChatProxyService
|
||||||
* - Receive status updates from herald
|
* - Receive status updates from herald
|
||||||
* - Post updates to threads
|
* - Post updates to threads
|
||||||
*/
|
*/
|
||||||
@@ -28,12 +31,21 @@ export class DiscordService implements IChatProvider {
|
|||||||
private readonly botToken: string;
|
private readonly botToken: string;
|
||||||
private readonly controlChannelId: string;
|
private readonly controlChannelId: string;
|
||||||
private readonly workspaceId: string;
|
private readonly workspaceId: string;
|
||||||
|
private readonly agentChannels = new Map<string, string>();
|
||||||
|
private workspaceOwnerId: string | null = null;
|
||||||
|
|
||||||
constructor(private readonly stitcherService: StitcherService) {
|
constructor(
|
||||||
|
private readonly stitcherService: StitcherService,
|
||||||
|
private readonly chatProxyService: ChatProxyService,
|
||||||
|
private readonly prisma: PrismaService
|
||||||
|
) {
|
||||||
this.botToken = process.env.DISCORD_BOT_TOKEN ?? "";
|
this.botToken = process.env.DISCORD_BOT_TOKEN ?? "";
|
||||||
this.controlChannelId = process.env.DISCORD_CONTROL_CHANNEL_ID ?? "";
|
this.controlChannelId = process.env.DISCORD_CONTROL_CHANNEL_ID ?? "";
|
||||||
this.workspaceId = process.env.DISCORD_WORKSPACE_ID ?? "";
|
this.workspaceId = process.env.DISCORD_WORKSPACE_ID ?? "";
|
||||||
|
|
||||||
|
// Load agent channel mappings from environment
|
||||||
|
this.loadAgentChannels();
|
||||||
|
|
||||||
// Initialize Discord client with required intents
|
// Initialize Discord client with required intents
|
||||||
this.client = new Client({
|
this.client = new Client({
|
||||||
intents: [
|
intents: [
|
||||||
@@ -46,6 +58,51 @@ export class DiscordService implements IChatProvider {
|
|||||||
this.setupEventHandlers();
|
this.setupEventHandlers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load agent channel mappings from environment variables.
|
||||||
|
* Format: DISCORD_AGENT_CHANNELS=<channelId>:<agentName>,<channelId>:<agentName>
|
||||||
|
* Example: DISCORD_AGENT_CHANNELS=123456:jarvis,789012:builder
|
||||||
|
*/
|
||||||
|
private loadAgentChannels(): void {
|
||||||
|
const channelsConfig = process.env.DISCORD_AGENT_CHANNELS ?? "";
|
||||||
|
if (!channelsConfig) {
|
||||||
|
this.logger.debug("No agent channels configured (DISCORD_AGENT_CHANNELS not set)");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const channels = channelsConfig.split(",").map((pair) => pair.trim());
|
||||||
|
for (const channel of channels) {
|
||||||
|
const [channelId, agentName] = channel.split(":");
|
||||||
|
if (channelId && agentName) {
|
||||||
|
this.agentChannels.set(channelId.trim(), agentName.trim());
|
||||||
|
this.logger.log(`Agent channel mapped: ${channelId.trim()} → ${agentName.trim()}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the workspace owner's user ID for chat proxy routing.
|
||||||
|
* Caches the result after first lookup.
|
||||||
|
*/
|
||||||
|
private async getWorkspaceOwnerId(): Promise<string> {
|
||||||
|
if (this.workspaceOwnerId) {
|
||||||
|
return this.workspaceOwnerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
const workspace = await this.prisma.workspace.findUnique({
|
||||||
|
where: { id: this.workspaceId },
|
||||||
|
select: { ownerId: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!workspace) {
|
||||||
|
throw new Error(`Workspace not found: ${this.workspaceId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.workspaceOwnerId = workspace.ownerId;
|
||||||
|
this.logger.debug(`Workspace owner resolved: ${workspace.ownerId}`);
|
||||||
|
return workspace.ownerId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup event handlers for Discord client
|
* Setup event handlers for Discord client
|
||||||
*/
|
*/
|
||||||
@@ -60,9 +117,6 @@ export class DiscordService implements IChatProvider {
|
|||||||
// Ignore bot messages
|
// Ignore bot messages
|
||||||
if (message.author.bot) return;
|
if (message.author.bot) return;
|
||||||
|
|
||||||
// Check if message is in control channel
|
|
||||||
if (message.channelId !== this.controlChannelId) return;
|
|
||||||
|
|
||||||
// Parse message into ChatMessage format
|
// Parse message into ChatMessage format
|
||||||
const chatMessage: ChatMessage = {
|
const chatMessage: ChatMessage = {
|
||||||
id: message.id,
|
id: message.id,
|
||||||
@@ -74,6 +128,16 @@ export class DiscordService implements IChatProvider {
|
|||||||
...(message.channel.isThread() && { threadId: message.channelId }),
|
...(message.channel.isThread() && { threadId: message.channelId }),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Check if message is in an agent channel
|
||||||
|
const agentName = this.agentChannels.get(message.channelId);
|
||||||
|
if (agentName) {
|
||||||
|
void this.handleAgentChat(chatMessage, agentName);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if message is in control channel for commands
|
||||||
|
if (message.channelId !== this.controlChannelId) return;
|
||||||
|
|
||||||
// Parse command
|
// Parse command
|
||||||
const command = this.parseCommand(chatMessage);
|
const command = this.parseCommand(chatMessage);
|
||||||
if (command) {
|
if (command) {
|
||||||
@@ -394,4 +458,150 @@ export class DiscordService implements IChatProvider {
|
|||||||
|
|
||||||
await this.sendMessage(message.channelId, helpMessage);
|
await this.sendMessage(message.channelId, helpMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle agent chat - Route message to specific agent via ChatProxyService
|
||||||
|
* Messages in agent channels are sent directly to the agent without requiring @mosaic prefix.
|
||||||
|
*/
|
||||||
|
private async handleAgentChat(message: ChatMessage, agentName: string): Promise<void> {
|
||||||
|
this.logger.log(
|
||||||
|
`Routing message from ${message.authorName} to agent "${agentName}" in channel ${message.channelId}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// Ignore empty messages
|
||||||
|
if (!message.content.trim()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Get workspace owner ID for routing
|
||||||
|
const userId = await this.getWorkspaceOwnerId();
|
||||||
|
|
||||||
|
// Build message history (just the user's message for now)
|
||||||
|
const messages = [{ role: "user" as const, content: message.content }];
|
||||||
|
|
||||||
|
// Send typing indicator while waiting for response
|
||||||
|
const channel = await this.client.channels.fetch(message.channelId);
|
||||||
|
if (channel?.isTextBased()) {
|
||||||
|
void (channel as TextChannel).sendTyping();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxy to agent
|
||||||
|
const response = await this.chatProxyService.proxyChat(
|
||||||
|
userId,
|
||||||
|
messages,
|
||||||
|
undefined,
|
||||||
|
agentName
|
||||||
|
);
|
||||||
|
|
||||||
|
// Stream the response to channel
|
||||||
|
await this.streamResponseToChannel(message.channelId, response);
|
||||||
|
|
||||||
|
this.logger.debug(`Agent "${agentName}" response sent to channel ${message.channelId}`);
|
||||||
|
} catch (error) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.error(`Failed to route message to agent "${agentName}": ${errorMessage}`);
|
||||||
|
await this.sendMessage(
|
||||||
|
message.channelId,
|
||||||
|
`Failed to get response from ${agentName}. Please try again later.`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream SSE response from chat proxy and send to Discord channel.
|
||||||
|
* Collects the full response and sends as a single message for reliability.
|
||||||
|
*/
|
||||||
|
private async streamResponseToChannel(channelId: string, response: Response): Promise<string> {
|
||||||
|
const reader = response.body?.getReader();
|
||||||
|
if (!reader) {
|
||||||
|
throw new Error("Response body is not readable");
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let fullContent = "";
|
||||||
|
let buffer = "";
|
||||||
|
|
||||||
|
try {
|
||||||
|
let readResult = await reader.read();
|
||||||
|
while (!readResult.done) {
|
||||||
|
const { value } = readResult;
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
const lines = buffer.split("\n");
|
||||||
|
buffer = lines.pop() ?? "";
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
if (line.startsWith("data: ")) {
|
||||||
|
const data = line.slice(6);
|
||||||
|
if (data === "[DONE]") continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(data) as {
|
||||||
|
choices?: { delta?: { content?: string } }[];
|
||||||
|
};
|
||||||
|
const content = parsed.choices?.[0]?.delta?.content;
|
||||||
|
if (content) {
|
||||||
|
fullContent += content;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Skip invalid JSON
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
readResult = await reader.read();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the full response to Discord
|
||||||
|
if (fullContent.trim()) {
|
||||||
|
// Discord has a 2000 character limit, split if needed
|
||||||
|
const chunks = this.splitMessageForDiscord(fullContent);
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
await this.sendMessage(channelId, chunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullContent;
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Split a message into chunks that fit within Discord's 2000 character limit.
|
||||||
|
* Tries to split on paragraph or sentence boundaries when possible.
|
||||||
|
*/
|
||||||
|
private splitMessageForDiscord(content: string, maxLength = 2000): string[] {
|
||||||
|
if (content.length <= maxLength) {
|
||||||
|
return [content];
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunks: string[] = [];
|
||||||
|
let remaining = content;
|
||||||
|
|
||||||
|
while (remaining.length > maxLength) {
|
||||||
|
// Try to find a good break point
|
||||||
|
let breakPoint = remaining.lastIndexOf("\n\n", maxLength);
|
||||||
|
if (breakPoint < maxLength * 0.5) {
|
||||||
|
breakPoint = remaining.lastIndexOf("\n", maxLength);
|
||||||
|
}
|
||||||
|
if (breakPoint < maxLength * 0.5) {
|
||||||
|
breakPoint = remaining.lastIndexOf(". ", maxLength);
|
||||||
|
}
|
||||||
|
if (breakPoint < maxLength * 0.5) {
|
||||||
|
breakPoint = remaining.lastIndexOf(" ", maxLength);
|
||||||
|
}
|
||||||
|
if (breakPoint < maxLength * 0.5) {
|
||||||
|
breakPoint = maxLength - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks.push(remaining.slice(0, breakPoint + 1).trim());
|
||||||
|
remaining = remaining.slice(breakPoint + 1).trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remaining.length > 0) {
|
||||||
|
chunks.push(remaining);
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import { StitcherService } from "../../stitcher/stitcher.service";
|
|||||||
import { HeraldService } from "../../herald/herald.service";
|
import { HeraldService } from "../../herald/herald.service";
|
||||||
import { PrismaService } from "../../prisma/prisma.service";
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
import { BullMqService } from "../../bullmq/bullmq.service";
|
import { BullMqService } from "../../bullmq/bullmq.service";
|
||||||
|
import { ChatProxyService } from "../../chat-proxy/chat-proxy.service";
|
||||||
import type { IChatProvider } from "../interfaces";
|
import type { IChatProvider } from "../interfaces";
|
||||||
import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types";
|
import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types";
|
||||||
|
|
||||||
@@ -192,6 +193,7 @@ function setDiscordEnv(): void {
|
|||||||
|
|
||||||
function setEncryptionKey(): void {
|
function setEncryptionKey(): void {
|
||||||
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
|
process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
|
||||||
|
process.env.MOSAIC_SECRET_KEY = "test-mosaic-secret-key-minimum-32-characters-long";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -205,6 +207,10 @@ async function compileBridgeModule(): Promise<TestingModule> {
|
|||||||
.useValue({})
|
.useValue({})
|
||||||
.overrideProvider(BullMqService)
|
.overrideProvider(BullMqService)
|
||||||
.useValue({})
|
.useValue({})
|
||||||
|
.overrideProvider(ChatProxyService)
|
||||||
|
.useValue({
|
||||||
|
proxyChat: vi.fn().mockResolvedValue(new Response()),
|
||||||
|
})
|
||||||
.compile();
|
.compile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user