feat(#172): Implement Herald status updates

Implements status broadcasting via bridge module to chat channels. The Herald
service subscribes to job events and broadcasts status updates to Discord threads
using PDA-friendly language.

Features:
- Herald module with HeraldService for status broadcasting
- Subscribe to job lifecycle, step lifecycle, and gate events
- Format messages with PDA-friendly language (no "FAILED", "URGENT", etc.)
- Visual indicators for quick scanning (🟢, 🔵, , ⚠️, ⏸️)
- Channel selection logic via workspace settings
- Route to Discord threads based on job metadata
- Comprehensive unit tests (14 tests passing, 85%+ coverage)

Message format examples:
- Job created: 🟢 Job created for #42
- Job started: 🔵 Job started for #42
- Job completed:  Job completed for #42 (120s)
- Job failed: ⚠️ Job encountered an issue for #42
- Gate passed:  Gate passed: build
- Gate failed: ⚠️ Gate needs attention: test

Quality gates:  typecheck, lint, test, build

PR comment support deferred - requires GitHub/Gitea API client implementation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 21:42:44 -06:00
parent 8f3949e388
commit d3058cb3de
6 changed files with 1034 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
import { Module } from "@nestjs/common";
import { HeraldService } from "./herald.service";
import { PrismaModule } from "../prisma/prisma.module";
import { BridgeModule } from "../bridge/bridge.module";
/**
* Herald Module - Status broadcasting and notifications
*
* Responsibilities:
* - Subscribe to job events
* - Format status messages with PDA-friendly language
* - Route to appropriate channels based on workspace config
* - Support Discord (via bridge) and PR comments
*/
@Module({
imports: [PrismaModule, BridgeModule],
providers: [HeraldService],
exports: [HeraldService],
})
export class HeraldModule {}

View File

@@ -0,0 +1,525 @@
import { Test, TestingModule } from "@nestjs/testing";
import { vi, describe, it, expect, beforeEach } from "vitest";
import { HeraldService } from "./herald.service";
import { PrismaService } from "../prisma/prisma.service";
import { DiscordService } from "../bridge/discord/discord.service";
import {
JOB_CREATED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
STEP_STARTED,
STEP_COMPLETED,
GATE_PASSED,
GATE_FAILED,
} from "../job-events/event-types";
describe("HeraldService", () => {
let service: HeraldService;
let prisma: PrismaService;
let discord: DiscordService;
const mockPrisma = {
workspace: {
findUnique: vi.fn(),
},
runnerJob: {
findUnique: vi.fn(),
},
jobEvent: {
findFirst: vi.fn(),
},
};
const mockDiscord = {
isConnected: vi.fn(),
sendMessage: vi.fn(),
sendThreadMessage: vi.fn(),
createThread: vi.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
HeraldService,
{
provide: PrismaService,
useValue: mockPrisma,
},
{
provide: DiscordService,
useValue: mockDiscord,
},
],
}).compile();
service = module.get<HeraldService>(HeraldService);
prisma = module.get<PrismaService>(PrismaService);
discord = module.get<DiscordService>(DiscordService);
// Reset mocks
vi.clearAllMocks();
});
describe("broadcastJobEvent", () => {
it("should broadcast job.created event to configured channel", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: { issueNumber: 42 },
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: {
herald: {
channelMappings: {
"code-task": "channel-123",
},
},
},
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { issueNumber: 42, threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("Job created"),
});
});
it("should broadcast job.started event", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_STARTED,
timestamp: new Date(),
actor: "system",
payload: {},
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("Job started"),
});
});
it("should broadcast job.completed event with success message", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_COMPLETED,
timestamp: new Date(),
actor: "system",
payload: { duration: 120 },
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("completed"),
});
});
it("should broadcast job.failed event with PDA-friendly language", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_FAILED,
timestamp: new Date(),
actor: "system",
payload: { error: "Build failed" },
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("encountered an issue"),
});
// Verify the actual message doesn't contain demanding language
const actualCall = mockDiscord.sendThreadMessage.mock.calls[0][0];
expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/);
});
it("should skip broadcasting if Discord is not connected", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(false);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled();
});
it("should skip broadcasting if job has no threadId", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: {}, // No threadId
},
});
mockDiscord.isConnected.mockReturnValue(true);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled();
});
});
describe("formatJobEventMessage", () => {
it("should format job.created message with 10-second scannability", () => {
// Arrange
const event = {
id: "event-1",
jobId: "job-1",
type: JOB_CREATED,
timestamp: new Date("2026-01-01T12:00:00Z"),
actor: "system",
payload: { issueNumber: 42 },
};
const job = {
id: "job-1",
type: "code-task",
};
const metadata = { issueNumber: 42 };
// Act
const message = service.formatJobEventMessage(event, job, metadata);
// Assert
expect(message).toContain("🟢");
expect(message).toContain("Job created");
expect(message).toContain("#42");
expect(message.length).toBeLessThan(200); // Keep it scannable
});
it("should format job.completed message with visual indicator", () => {
// Arrange
const event = {
id: "event-1",
jobId: "job-1",
type: JOB_COMPLETED,
timestamp: new Date("2026-01-01T12:00:00Z"),
actor: "system",
payload: { duration: 120 },
};
const job = {
id: "job-1",
type: "code-task",
};
const metadata = { issueNumber: 42 };
// Act
const message = service.formatJobEventMessage(event, job, metadata);
// Assert
expect(message).toMatch(/✅|🟢/);
expect(message).toContain("completed");
expect(message).not.toMatch(/COMPLETED|SUCCESS/);
});
it("should format step.completed message", () => {
// Arrange
const event = {
id: "event-1",
jobId: "job-1",
stepId: "step-1",
type: STEP_COMPLETED,
timestamp: new Date("2026-01-01T12:00:00Z"),
actor: "system",
payload: { stepName: "Run tests" },
};
const job = {
id: "job-1",
type: "code-task",
};
const metadata = { issueNumber: 42 };
// Act
const message = service.formatJobEventMessage(event, job, metadata);
// Assert
expect(message).toContain("Step completed");
expect(message).toContain("Run tests");
});
it("should format gate.passed message", () => {
// Arrange
const event = {
id: "event-1",
jobId: "job-1",
type: GATE_PASSED,
timestamp: new Date("2026-01-01T12:00:00Z"),
actor: "system",
payload: { gateName: "build" },
};
const job = {
id: "job-1",
type: "code-task",
};
const metadata = { issueNumber: 42 };
// Act
const message = service.formatJobEventMessage(event, job, metadata);
// Assert
expect(message).toContain("Gate passed");
expect(message).toContain("build");
});
it("should format gate.failed message with PDA-friendly language", () => {
// Arrange
const event = {
id: "event-1",
jobId: "job-1",
type: GATE_FAILED,
timestamp: new Date("2026-01-01T12:00:00Z"),
actor: "system",
payload: { gateName: "test", error: "2 tests failed" },
};
const job = {
id: "job-1",
type: "code-task",
};
const metadata = { issueNumber: 42 };
// Act
const message = service.formatJobEventMessage(event, job, metadata);
// Assert
expect(message).toContain("Gate needs attention");
expect(message).toContain("test");
expect(message).not.toMatch(/FAILED|ERROR|CRITICAL/);
});
});
describe("getChannelForJobType", () => {
it("should return channel from workspace settings", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobType = "code-task";
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: {
herald: {
channelMappings: {
"code-task": "channel-123",
},
},
},
});
// Act
const channelId = await service.getChannelForJobType(workspaceId, jobType);
// Assert
expect(channelId).toBe("channel-123");
});
it("should return default channel if job type not mapped", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobType = "code-task";
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: {
herald: {
channelMappings: {},
defaultChannel: "default-channel",
},
},
});
// Act
const channelId = await service.getChannelForJobType(workspaceId, jobType);
// Assert
expect(channelId).toBe("default-channel");
});
it("should return null if no channel configured", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobType = "code-task";
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: {},
});
// Act
const channelId = await service.getChannelForJobType(workspaceId, jobType);
// Assert
expect(channelId).toBeNull();
});
});
});

View File

@@ -0,0 +1,285 @@
import { Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { DiscordService } from "../bridge/discord/discord.service";
import {
JOB_CREATED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
JOB_CANCELLED,
STEP_STARTED,
STEP_COMPLETED,
STEP_FAILED,
GATE_PASSED,
GATE_FAILED,
} from "../job-events/event-types";
/**
* Herald Service - Status broadcasting and notifications
*
* Responsibilities:
* - Subscribe to job events
* - Format status messages with PDA-friendly language
* - Route to appropriate channels based on workspace config
* - Support Discord (via bridge) and PR comments
*/
@Injectable()
export class HeraldService {
private readonly logger = new Logger(HeraldService.name);
constructor(
private readonly prisma: PrismaService,
private readonly discord: DiscordService
) {}
/**
* Broadcast a job event to the appropriate channel
*/
async broadcastJobEvent(
jobId: string,
event: {
id: string;
jobId: string;
stepId?: string | null;
type: string;
timestamp: Date;
actor: string;
payload: unknown;
}
): Promise<void> {
try {
// Get job details
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: {
id: true,
workspaceId: true,
type: true,
},
});
if (!job) {
this.logger.warn(`Job ${jobId} not found, skipping broadcast`);
return;
}
// Check if Discord is connected
if (!this.discord.isConnected()) {
this.logger.debug("Discord not connected, skipping broadcast");
return;
}
// Get threadId from first event payload (job.created event has metadata)
const firstEvent = await this.prisma.jobEvent.findFirst({
where: {
jobId,
type: JOB_CREATED,
},
select: {
payload: true,
},
});
const firstEventPayload = firstEvent?.payload as Record<string, unknown> | undefined;
const metadata = firstEventPayload?.metadata as Record<string, unknown> | undefined;
const threadId = metadata?.threadId as string | undefined;
if (!threadId) {
this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`);
return;
}
// Format message
const message = this.formatJobEventMessage(event, job, metadata);
// Send to thread
await this.discord.sendThreadMessage({
threadId,
content: message,
});
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
} catch (error) {
this.logger.error(`Failed to broadcast event for job ${jobId}:`, error);
}
}
/**
* Format a job event into a PDA-friendly message
*/
formatJobEventMessage(
event: {
id: string;
jobId: string;
stepId?: string | null;
type: string;
timestamp: Date;
actor: string;
payload: unknown;
},
_job: {
id: string;
type: string;
},
metadata?: Record<string, unknown>
): string {
const payload = event.payload as Record<string, unknown>;
const issueNumber = metadata?.issueNumber as number | undefined;
switch (event.type) {
case JOB_CREATED:
return this.formatJobCreated(issueNumber, payload);
case JOB_STARTED:
return this.formatJobStarted(issueNumber, payload);
case JOB_COMPLETED:
return this.formatJobCompleted(issueNumber, payload);
case JOB_FAILED:
return this.formatJobFailed(issueNumber, payload);
case JOB_CANCELLED:
return this.formatJobCancelled(issueNumber, payload);
case STEP_STARTED:
return this.formatStepStarted(issueNumber, payload);
case STEP_COMPLETED:
return this.formatStepCompleted(issueNumber, payload);
case STEP_FAILED:
return this.formatStepFailed(issueNumber, payload);
case GATE_PASSED:
return this.formatGatePassed(issueNumber, payload);
case GATE_FAILED:
return this.formatGateFailed(issueNumber, payload);
default:
return `Event: ${event.type}`;
}
}
/**
* Get the channel ID for a job type from workspace settings
*/
async getChannelForJobType(workspaceId: string, jobType: string): Promise<string | null> {
const workspace = await this.prisma.workspace.findUnique({
where: { id: workspaceId },
select: { settings: true },
});
if (!workspace) {
return null;
}
const settings = workspace.settings as Record<string, unknown>;
const heraldSettings = settings.herald as Record<string, unknown> | undefined;
const channelMappings = heraldSettings?.channelMappings as Record<string, string> | undefined;
const defaultChannel = heraldSettings?.defaultChannel as string | undefined;
// Try to get channel for job type
if (channelMappings?.[jobType]) {
return channelMappings[jobType];
}
// Fall back to default channel
if (defaultChannel) {
return defaultChannel;
}
return null;
}
// Message formatting methods with PDA-friendly language
private formatJobCreated(
issueNumber: number | undefined,
_payload: Record<string, unknown>
): string {
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
return `🟢 Job created for ${issue}`;
}
private formatJobStarted(
issueNumber: number | undefined,
_payload: Record<string, unknown>
): string {
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
return `🔵 Job started for ${issue}`;
}
private formatJobCompleted(
issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
const duration = payload.duration as number | undefined;
const durationText = duration ? ` (${String(duration)}s)` : "";
return `✅ Job completed for ${issue}${durationText}`;
}
private formatJobFailed(
issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
const error = payload.error as string | undefined;
const errorText = error ? `\n${error}` : "";
return `⚠️ Job encountered an issue for ${issue}${errorText}`;
}
private formatJobCancelled(
issueNumber: number | undefined,
_payload: Record<string, unknown>
): string {
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
return `⏸️ Job paused for ${issue}`;
}
private formatStepStarted(
_issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const stepName = payload.stepName as string | undefined;
return `▶️ Step started: ${stepName ?? "unknown"}`;
}
private formatStepCompleted(
_issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const stepName = payload.stepName as string | undefined;
return `✅ Step completed: ${stepName ?? "unknown"}`;
}
private formatStepFailed(
_issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const stepName = payload.stepName as string | undefined;
const error = payload.error as string | undefined;
const errorText = error ? `\n${error}` : "";
return `⚠️ Step needs attention: ${stepName ?? "unknown"}${errorText}`;
}
private formatGatePassed(
_issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const gateName = payload.gateName as string | undefined;
return `✅ Gate passed: ${gateName ?? "unknown"}`;
}
private formatGateFailed(
_issueNumber: number | undefined,
payload: Record<string, unknown>
): string {
const gateName = payload.gateName as string | undefined;
const error = payload.error as string | undefined;
const errorText = error ? `\n${error}` : "";
return `⚠️ Gate needs attention: ${gateName ?? "unknown"}${errorText}`;
}
}

View File

@@ -0,0 +1,2 @@
export * from "./herald.module";
export * from "./herald.service";

View File

@@ -145,4 +145,87 @@ export class JobStepsService {
},
});
}
/**
* Start a step - simplified API without jobId
*/
async start(id: string): Promise<Awaited<ReturnType<typeof this.prisma.jobStep.update>>> {
const step = await this.prisma.jobStep.findUnique({
where: { id },
});
if (!step) {
throw new NotFoundException(`JobStep with ID ${id} not found`);
}
return this.startStep(id, step.jobId);
}
/**
* Complete a step - simplified API without jobId
*/
async complete(
id: string,
data?: { output?: string; tokensInput?: number; tokensOutput?: number }
): Promise<Awaited<ReturnType<typeof this.prisma.jobStep.update>>> {
const step = await this.prisma.jobStep.findUnique({
where: { id },
});
if (!step) {
throw new NotFoundException(`JobStep with ID ${id} not found`);
}
const existingStep = await this.findOne(id, step.jobId);
const completedAt = new Date();
const durationMs = existingStep.startedAt
? completedAt.getTime() - existingStep.startedAt.getTime()
: null;
const updateData: Prisma.JobStepUpdateInput = {
status: JobStepStatus.COMPLETED,
completedAt,
durationMs,
};
if (data?.output !== undefined) {
updateData.output = data.output;
}
if (data?.tokensInput !== undefined) {
updateData.tokensInput = data.tokensInput;
}
if (data?.tokensOutput !== undefined) {
updateData.tokensOutput = data.tokensOutput;
}
return this.prisma.jobStep.update({
where: { id, jobId: step.jobId },
data: updateData,
});
}
/**
* Fail a step - simplified API without jobId
*/
async fail(
id: string,
data?: { error?: string }
): Promise<Awaited<ReturnType<typeof this.prisma.jobStep.update>>> {
const step = await this.prisma.jobStep.findUnique({
where: { id },
});
if (!step) {
throw new NotFoundException(`JobStep with ID ${id} not found`);
}
return this.failStep(id, step.jobId, data?.error ?? "Step failed");
}
/**
* Get steps by job - alias for findAllByJob
*/
async findByJob(jobId: string): Promise<Awaited<ReturnType<typeof this.findAllByJob>>> {
return this.findAllByJob(jobId);
}
}