Files
stack/plugins/telegram/src/index.ts
Jason Woltje 96902bab44
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/pr/ci Pipeline failed
feat(plugins): add Telegram channel plugin
2026-03-13 12:05:42 -05:00

188 lines
5.3 KiB
TypeScript

import { Telegraf } from 'telegraf';
import { io, type Socket } from 'socket.io-client';
interface TelegramPluginConfig {
token: string;
gatewayUrl: string;
}
interface TelegramUser {
is_bot?: boolean;
}
interface TelegramChat {
id: number;
}
interface TelegramTextMessage {
chat: TelegramChat;
from?: TelegramUser;
text: string;
}
class TelegramPlugin {
readonly name = 'telegram';
private bot: Telegraf;
private socket: Socket | null = null;
private config: TelegramPluginConfig;
/** Map Telegram chat ID → Mosaic conversation ID */
private chatConversations = new Map<string, string>();
/** Track in-flight responses to avoid duplicate streaming */
private pendingResponses = new Map<string, string>();
constructor(config: TelegramPluginConfig) {
this.config = config;
this.bot = new Telegraf(this.config.token);
}
async start(): Promise<void> {
// Connect to gateway WebSocket
this.socket = io(`${this.config.gatewayUrl}/chat`, {
transports: ['websocket'],
});
this.socket.on('connect', () => {
console.log('[telegram] Connected to gateway');
});
this.socket.on('disconnect', (reason: string) => {
console.error(`[telegram] Disconnected from gateway: ${reason}`);
this.pendingResponses.clear();
});
this.socket.on('connect_error', (err: Error) => {
console.error(`[telegram] Gateway connection error: ${err.message}`);
});
// Handle streaming text from gateway
this.socket.on('agent:text', (data: { conversationId: string; text: string }) => {
const pending = this.pendingResponses.get(data.conversationId);
if (pending !== undefined) {
this.pendingResponses.set(data.conversationId, pending + data.text);
}
});
// When agent finishes, send the accumulated response
this.socket.on('agent:end', (data: { conversationId: string }) => {
const text = this.pendingResponses.get(data.conversationId);
if (text) {
this.pendingResponses.delete(data.conversationId);
this.sendToTelegram(data.conversationId, text).catch((err) => {
console.error(`[telegram] Error sending response for ${data.conversationId}:`, err);
});
}
});
this.socket.on('agent:start', (data: { conversationId: string }) => {
this.pendingResponses.set(data.conversationId, '');
});
// Set up Telegram message handler
this.bot.on('message', (ctx) => {
const message = this.getTextMessage(ctx.message);
if (message) {
this.handleTelegramMessage(message);
}
});
await this.bot.launch();
}
async stop(): Promise<void> {
this.bot.stop('SIGTERM');
this.socket?.disconnect();
}
private handleTelegramMessage(message: TelegramTextMessage): void {
// Ignore bot messages
if (message.from?.is_bot) return;
const content = message.text.trim();
if (!content) return;
// Get or create conversation for this Telegram chat
const chatId = String(message.chat.id);
let conversationId = this.chatConversations.get(chatId);
if (!conversationId) {
conversationId = `telegram-${chatId}`;
this.chatConversations.set(chatId, conversationId);
}
// Send to gateway
if (!this.socket?.connected) {
console.error(`[telegram] Cannot forward message: not connected to gateway. chat=${chatId}`);
return;
}
this.socket.emit('message', {
conversationId,
content,
role: 'user',
});
}
private getTextMessage(message: unknown): TelegramTextMessage | null {
if (!message || typeof message !== 'object') return null;
const candidate = message as Partial<TelegramTextMessage>;
if (typeof candidate.text !== 'string') return null;
if (!candidate.chat || typeof candidate.chat.id !== 'number') return null;
return {
chat: candidate.chat,
from: candidate.from,
text: candidate.text,
};
}
private async sendToTelegram(conversationId: string, text: string): Promise<void> {
// Find the Telegram chat for this conversation
const chatId = Array.from(this.chatConversations.entries()).find(
([, convId]) => convId === conversationId,
)?.[0];
if (!chatId) {
console.error(`[telegram] No chat found for conversation ${conversationId}`);
return;
}
// Chunk responses for Telegram's 4096-char limit
const chunks = this.chunkText(text, 4000);
for (const chunk of chunks) {
try {
await this.bot.telegram.sendMessage(chatId, chunk);
} catch (err) {
console.error(`[telegram] Failed to send message to chat ${chatId}:`, err);
}
}
}
private chunkText(text: string, maxLength: number): string[] {
if (text.length <= maxLength) return [text];
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
chunks.push(remaining);
break;
}
// Try to break at a newline
let breakPoint = remaining.lastIndexOf('\n', maxLength);
if (breakPoint <= 0) breakPoint = maxLength;
chunks.push(remaining.slice(0, breakPoint));
remaining = remaining.slice(breakPoint).trimStart();
}
return chunks;
}
}
export { TelegramPlugin };
export type { TelegramPluginConfig };
export const VERSION = '0.0.5';