feat(#382): Herald Service: broadcast to all active chat providers
Some checks failed
ci/woodpecker/push/api Pipeline failed

- Replace direct DiscordService injection with CHAT_PROVIDERS array
- Herald broadcasts to ALL active chat providers (Discord, Matrix, future)
- Graceful error handling — one provider failure doesn't block others
- Skips disconnected providers automatically
- Tests verify multi-provider broadcasting behavior
- Fix lint: remove unnecessary conditional in matrix.service.ts

Refs #382
This commit is contained in:
2026-02-15 02:25:55 -06:00
parent 4a9ecab4dd
commit ad24720616
8 changed files with 859 additions and 424 deletions

View File

@@ -1,6 +1,7 @@
import { Injectable, Logger } from "@nestjs/common";
import { Inject, Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { DiscordService } from "../bridge/discord/discord.service";
import { CHAT_PROVIDERS } from "../bridge/bridge.constants";
import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface";
import {
JOB_CREATED,
JOB_STARTED,
@@ -21,7 +22,7 @@ import {
* - Subscribe to job events
* - Format status messages with PDA-friendly language
* - Route to appropriate channels based on workspace config
* - Support Discord (via bridge) and PR comments
* - Broadcast to ALL active chat providers (Discord, Matrix, etc.)
*/
@Injectable()
export class HeraldService {
@@ -29,11 +30,11 @@ export class HeraldService {
constructor(
private readonly prisma: PrismaService,
private readonly discord: DiscordService
@Inject(CHAT_PROVIDERS) private readonly chatProviders: IChatProvider[]
) {}
/**
* Broadcast a job event to the appropriate channel
* Broadcast a job event to all connected chat providers
*/
async broadcastJobEvent(
jobId: string,
@@ -47,66 +48,65 @@ export class HeraldService {
payload: unknown;
}
): Promise<void> {
try {
// Get job details
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: {
id: true,
workspaceId: true,
type: true,
},
});
// Get job details
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: {
id: true,
workspaceId: true,
type: true,
},
});
if (!job) {
this.logger.warn(`Job ${jobId} not found, skipping broadcast`);
return;
}
// Check if Discord is connected
if (!this.discord.isConnected()) {
this.logger.debug("Discord not connected, skipping broadcast");
return;
}
// Get threadId from first event payload (job.created event has metadata)
const firstEvent = await this.prisma.jobEvent.findFirst({
where: {
jobId,
type: JOB_CREATED,
},
select: {
payload: true,
},
});
const firstEventPayload = firstEvent?.payload as Record<string, unknown> | undefined;
const metadata = firstEventPayload?.metadata as Record<string, unknown> | undefined;
const threadId = metadata?.threadId as string | undefined;
if (!threadId) {
this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`);
return;
}
// Format message
const message = this.formatJobEventMessage(event, job, metadata);
// Send to thread
await this.discord.sendThreadMessage({
threadId,
content: message,
});
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
} catch (error) {
// Log the error with full context for debugging
this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error);
// Re-throw the error so callers can handle it appropriately
// This enables proper error tracking, retry logic, and alerting
throw error;
if (!job) {
this.logger.warn(`Job ${jobId} not found, skipping broadcast`);
return;
}
// Get threadId from first event payload (job.created event has metadata)
const firstEvent = await this.prisma.jobEvent.findFirst({
where: {
jobId,
type: JOB_CREATED,
},
select: {
payload: true,
},
});
const firstEventPayload = firstEvent?.payload as Record<string, unknown> | undefined;
const metadata = firstEventPayload?.metadata as Record<string, unknown> | undefined;
const threadId = metadata?.threadId as string | undefined;
if (!threadId) {
this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`);
return;
}
// Format message
const message = this.formatJobEventMessage(event, job, metadata);
// Broadcast to all connected providers
for (const provider of this.chatProviders) {
if (!provider.isConnected()) {
continue;
}
try {
await provider.sendThreadMessage({
threadId,
content: message,
});
} catch (error) {
// Log and continue — one provider failure must not block others
this.logger.error(
`Failed to broadcast event ${event.type} for job ${jobId} via provider:`,
error
);
}
}
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
}
/**