Add composite index [jobId, timestamp] to improve query performance for the most common job_events access patterns. Changes: - Add @@index([jobId, timestamp]) to JobEvent model in schema.prisma - Create migration 20260202122655_add_job_events_composite_index - Add performance tests to validate index effectiveness - Document index design rationale in scratchpad - Fix lint errors in api-key.guard, herald.service, runner-jobs.service Rationale: The composite index [jobId, timestamp] optimizes the dominant query pattern used across all services: - JobEventsService.getEventsByJobId (WHERE jobId, ORDER BY timestamp) - RunnerJobsService.streamEvents (WHERE jobId + timestamp range) - RunnerJobsService.findOne (implicit jobId filter + timestamp order) This index provides: - Fast filtering by jobId (highly selective) - Efficient timestamp-based ordering - Optimal support for timestamp range queries - Backward compatibility with jobId-only queries Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
291 lines
8.2 KiB
TypeScript
291 lines
8.2 KiB
TypeScript
import { Injectable, Logger } from "@nestjs/common";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import { DiscordService } from "../bridge/discord/discord.service";
|
|
import {
|
|
JOB_CREATED,
|
|
JOB_STARTED,
|
|
JOB_COMPLETED,
|
|
JOB_FAILED,
|
|
JOB_CANCELLED,
|
|
STEP_STARTED,
|
|
STEP_COMPLETED,
|
|
STEP_FAILED,
|
|
GATE_PASSED,
|
|
GATE_FAILED,
|
|
} from "../job-events/event-types";
|
|
|
|
/**
|
|
* Herald Service - Status broadcasting and notifications
|
|
*
|
|
* Responsibilities:
|
|
* - Subscribe to job events
|
|
* - Format status messages with PDA-friendly language
|
|
* - Route to appropriate channels based on workspace config
|
|
* - Support Discord (via bridge) and PR comments
|
|
*/
|
|
@Injectable()
|
|
export class HeraldService {
|
|
private readonly logger = new Logger(HeraldService.name);
|
|
|
|
constructor(
|
|
private readonly prisma: PrismaService,
|
|
private readonly discord: DiscordService
|
|
) {}
|
|
|
|
/**
|
|
* Broadcast a job event to the appropriate channel
|
|
*/
|
|
async broadcastJobEvent(
|
|
jobId: string,
|
|
event: {
|
|
id: string;
|
|
jobId: string;
|
|
stepId?: string | null;
|
|
type: string;
|
|
timestamp: Date;
|
|
actor: string;
|
|
payload: unknown;
|
|
}
|
|
): Promise<void> {
|
|
try {
|
|
// Get job details
|
|
const job = await this.prisma.runnerJob.findUnique({
|
|
where: { id: jobId },
|
|
select: {
|
|
id: true,
|
|
workspaceId: true,
|
|
type: true,
|
|
},
|
|
});
|
|
|
|
if (!job) {
|
|
this.logger.warn(`Job ${jobId} not found, skipping broadcast`);
|
|
return;
|
|
}
|
|
|
|
// Check if Discord is connected
|
|
if (!this.discord.isConnected()) {
|
|
this.logger.debug("Discord not connected, skipping broadcast");
|
|
return;
|
|
}
|
|
|
|
// Get threadId from first event payload (job.created event has metadata)
|
|
const firstEvent = await this.prisma.jobEvent.findFirst({
|
|
where: {
|
|
jobId,
|
|
type: JOB_CREATED,
|
|
},
|
|
select: {
|
|
payload: true,
|
|
},
|
|
});
|
|
|
|
const firstEventPayload = firstEvent?.payload as Record<string, unknown> | undefined;
|
|
const metadata = firstEventPayload?.metadata as Record<string, unknown> | undefined;
|
|
const threadId = metadata?.threadId as string | undefined;
|
|
|
|
if (!threadId) {
|
|
this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`);
|
|
return;
|
|
}
|
|
|
|
// Format message
|
|
const message = this.formatJobEventMessage(event, job, metadata);
|
|
|
|
// Send to thread
|
|
await this.discord.sendThreadMessage({
|
|
threadId,
|
|
content: message,
|
|
});
|
|
|
|
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
|
|
} catch (error) {
|
|
// Log the error with full context for debugging
|
|
this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error);
|
|
|
|
// Re-throw the error so callers can handle it appropriately
|
|
// This enables proper error tracking, retry logic, and alerting
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Format a job event into a PDA-friendly message
|
|
*/
|
|
formatJobEventMessage(
|
|
event: {
|
|
id: string;
|
|
jobId: string;
|
|
stepId?: string | null;
|
|
type: string;
|
|
timestamp: Date;
|
|
actor: string;
|
|
payload: unknown;
|
|
},
|
|
_job: {
|
|
id: string;
|
|
type: string;
|
|
},
|
|
metadata?: Record<string, unknown>
|
|
): string {
|
|
const payload = event.payload as Record<string, unknown>;
|
|
const issueNumber = metadata?.issueNumber as number | undefined;
|
|
|
|
switch (event.type) {
|
|
case JOB_CREATED:
|
|
return this.formatJobCreated(issueNumber, payload);
|
|
|
|
case JOB_STARTED:
|
|
return this.formatJobStarted(issueNumber, payload);
|
|
|
|
case JOB_COMPLETED:
|
|
return this.formatJobCompleted(issueNumber, payload);
|
|
|
|
case JOB_FAILED:
|
|
return this.formatJobFailed(issueNumber, payload);
|
|
|
|
case JOB_CANCELLED:
|
|
return this.formatJobCancelled(issueNumber, payload);
|
|
|
|
case STEP_STARTED:
|
|
return this.formatStepStarted(issueNumber, payload);
|
|
|
|
case STEP_COMPLETED:
|
|
return this.formatStepCompleted(issueNumber, payload);
|
|
|
|
case STEP_FAILED:
|
|
return this.formatStepFailed(issueNumber, payload);
|
|
|
|
case GATE_PASSED:
|
|
return this.formatGatePassed(issueNumber, payload);
|
|
|
|
case GATE_FAILED:
|
|
return this.formatGateFailed(issueNumber, payload);
|
|
|
|
default:
|
|
return `Event: ${event.type}`;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the channel ID for a job type from workspace settings
|
|
*/
|
|
async getChannelForJobType(workspaceId: string, jobType: string): Promise<string | null> {
|
|
const workspace = await this.prisma.workspace.findUnique({
|
|
where: { id: workspaceId },
|
|
select: { settings: true },
|
|
});
|
|
|
|
if (!workspace) {
|
|
return null;
|
|
}
|
|
|
|
const settings = workspace.settings as Record<string, unknown>;
|
|
const heraldSettings = settings.herald as Record<string, unknown> | undefined;
|
|
const channelMappings = heraldSettings?.channelMappings as Record<string, string> | undefined;
|
|
const defaultChannel = heraldSettings?.defaultChannel as string | undefined;
|
|
|
|
// Try to get channel for job type
|
|
if (channelMappings?.[jobType]) {
|
|
return channelMappings[jobType];
|
|
}
|
|
|
|
// Fall back to default channel
|
|
if (defaultChannel) {
|
|
return defaultChannel;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
// Message formatting methods with PDA-friendly language
|
|
|
|
private formatJobCreated(
|
|
issueNumber: number | undefined,
|
|
_payload: Record<string, unknown>
|
|
): string {
|
|
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
|
|
return `🟢 Job created for ${issue}`;
|
|
}
|
|
|
|
private formatJobStarted(
|
|
issueNumber: number | undefined,
|
|
_payload: Record<string, unknown>
|
|
): string {
|
|
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
|
|
return `🔵 Job started for ${issue}`;
|
|
}
|
|
|
|
private formatJobCompleted(
|
|
issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
|
|
const duration = payload.duration as number | undefined;
|
|
const durationText = duration ? ` (${String(duration)}s)` : "";
|
|
return `✅ Job completed for ${issue}${durationText}`;
|
|
}
|
|
|
|
private formatJobFailed(
|
|
issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
|
|
const error = payload.error as string | undefined;
|
|
const errorText = error ? `\n${error}` : "";
|
|
return `⚠️ Job encountered an issue for ${issue}${errorText}`;
|
|
}
|
|
|
|
private formatJobCancelled(
|
|
issueNumber: number | undefined,
|
|
_payload: Record<string, unknown>
|
|
): string {
|
|
const issue = issueNumber ? `#${String(issueNumber)}` : "task";
|
|
return `⏸️ Job paused for ${issue}`;
|
|
}
|
|
|
|
private formatStepStarted(
|
|
_issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const stepName = payload.stepName as string | undefined;
|
|
return `▶️ Step started: ${stepName ?? "unknown"}`;
|
|
}
|
|
|
|
private formatStepCompleted(
|
|
_issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const stepName = payload.stepName as string | undefined;
|
|
return `✅ Step completed: ${stepName ?? "unknown"}`;
|
|
}
|
|
|
|
private formatStepFailed(
|
|
_issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const stepName = payload.stepName as string | undefined;
|
|
const error = payload.error as string | undefined;
|
|
const errorText = error ? `\n${error}` : "";
|
|
return `⚠️ Step needs attention: ${stepName ?? "unknown"}${errorText}`;
|
|
}
|
|
|
|
private formatGatePassed(
|
|
_issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const gateName = payload.gateName as string | undefined;
|
|
return `✅ Gate passed: ${gateName ?? "unknown"}`;
|
|
}
|
|
|
|
private formatGateFailed(
|
|
_issueNumber: number | undefined,
|
|
payload: Record<string, unknown>
|
|
): string {
|
|
const gateName = payload.gateName as string | undefined;
|
|
const error = payload.error as string | undefined;
|
|
const errorText = error ? `\n${error}` : "";
|
|
return `⚠️ Gate needs attention: ${gateName ?? "unknown"}${errorText}`;
|
|
}
|
|
}
|