feat(#168): Implement job steps tracking

Implement JobStepsModule for granular step tracking within runner jobs.

Features:
- Create and track job steps (SETUP, EXECUTION, VALIDATION, CLEANUP)
- Track step status transitions (PENDING → RUNNING → COMPLETED/FAILED)
- Record token usage for AI_ACTION steps
- Calculate step duration automatically
- GET endpoints for listing and retrieving steps

Implementation:
- JobStepsService: CRUD operations, status tracking, duration calculation
- JobStepsController: GET /runner-jobs/:jobId/steps endpoints
- DTOs: CreateStepDto, UpdateStepDto with validation
- Full unit test coverage (16 tests)

Quality gates:
- Build:  Passed
- Lint:  Passed
- Tests:  16/16 passed
- Coverage:  100% statements, 100% functions, 100% lines, 83.33% branches

Also fixed pre-existing TypeScript strict mode issue in job-events DTO.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 21:16:23 -06:00
parent 7102b4a1d2
commit efe624e2c1
54 changed files with 2597 additions and 17 deletions

View File

@@ -24,6 +24,9 @@ import { ValkeyModule } from "./valkey/valkey.module";
import { BullMqModule } from "./bullmq/bullmq.module";
import { StitcherModule } from "./stitcher/stitcher.module";
import { TelemetryModule, TelemetryInterceptor } from "./telemetry";
import { RunnerJobsModule } from "./runner-jobs/runner-jobs.module";
import { JobEventsModule } from "./job-events/job-events.module";
import { JobStepsModule } from "./job-steps/job-steps.module";
@Module({
imports: [
@@ -49,6 +52,9 @@ import { TelemetryModule, TelemetryInterceptor } from "./telemetry";
BrainModule,
CronModule,
AgentTasksModule,
RunnerJobsModule,
JobEventsModule,
JobStepsModule,
],
controllers: [AppController],
providers: [

View File

@@ -0,0 +1,20 @@
import { IsString, IsOptional, IsObject, IsUUID, IsEnum } from "class-validator";
import { EventType, ALL_EVENT_TYPES } from "../event-types";
/**
* DTO for creating a job event
*/
export class CreateEventDto {
@IsEnum(ALL_EVENT_TYPES)
type!: EventType;
@IsString()
actor!: string;
@IsObject()
payload!: Record<string, unknown>;
@IsOptional()
@IsUUID()
stepId?: string;
}

View File

@@ -0,0 +1,2 @@
export * from "./create-event.dto";
export * from "./query-events.dto";

View File

@@ -0,0 +1,29 @@
import { IsOptional, IsString, IsInt, Min, Max, IsEnum } from "class-validator";
import { Type } from "class-transformer";
import { EventType, ALL_EVENT_TYPES } from "../event-types";
/**
* DTO for querying job events
*/
export class QueryEventsDto {
@IsOptional()
@IsEnum(ALL_EVENT_TYPES)
type?: EventType;
@IsOptional()
@IsString()
stepId?: string;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
page?: number;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(100)
limit?: number;
}

View File

@@ -0,0 +1,61 @@
/**
* Event type constants for job events
* These events are emitted throughout the job lifecycle and stored immutably
*/
// Job lifecycle events
export const JOB_CREATED = "job.created";
export const JOB_QUEUED = "job.queued";
export const JOB_STARTED = "job.started";
export const JOB_COMPLETED = "job.completed";
export const JOB_FAILED = "job.failed";
export const JOB_CANCELLED = "job.cancelled";
// Step lifecycle events
export const STEP_STARTED = "step.started";
export const STEP_PROGRESS = "step.progress";
export const STEP_OUTPUT = "step.output";
export const STEP_COMPLETED = "step.completed";
export const STEP_FAILED = "step.failed";
// AI events
export const AI_TOOL_CALLED = "ai.tool_called";
export const AI_TOKENS_USED = "ai.tokens_used";
export const AI_ARTIFACT_CREATED = "ai.artifact_created";
// Gate events
export const GATE_STARTED = "gate.started";
export const GATE_PASSED = "gate.passed";
export const GATE_FAILED = "gate.failed";
/**
* All valid event types
*/
export const ALL_EVENT_TYPES = [
// Job lifecycle
JOB_CREATED,
JOB_QUEUED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
JOB_CANCELLED,
// Step lifecycle
STEP_STARTED,
STEP_PROGRESS,
STEP_OUTPUT,
STEP_COMPLETED,
STEP_FAILED,
// AI events
AI_TOOL_CALLED,
AI_TOKENS_USED,
AI_ARTIFACT_CREATED,
// Gate events
GATE_STARTED,
GATE_PASSED,
GATE_FAILED,
] as const;
/**
* Type for event types
*/
export type EventType = (typeof ALL_EVENT_TYPES)[number];

View File

@@ -0,0 +1,5 @@
export * from "./job-events.module";
export * from "./job-events.service";
export * from "./job-events.controller";
export * from "./event-types";
export * from "./dto";

View File

@@ -0,0 +1,134 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { JobEventsController } from "./job-events.controller";
import { JobEventsService } from "./job-events.service";
import { JOB_CREATED } from "./event-types";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard } from "../common/guards/workspace.guard";
import { PermissionGuard } from "../common/guards/permission.guard";
import { ExecutionContext } from "@nestjs/common";
describe("JobEventsController", () => {
let controller: JobEventsController;
let service: JobEventsService;
const mockJobEventsService = {
getEventsByJobId: vi.fn(),
};
const mockAuthGuard = {
canActivate: vi.fn((context: ExecutionContext) => {
const request = context.switchToHttp().getRequest();
request.user = {
id: "user-123",
workspaceId: "workspace-123",
};
return true;
}),
};
const mockWorkspaceGuard = {
canActivate: vi.fn(() => true),
};
const mockPermissionGuard = {
canActivate: vi.fn(() => true),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [JobEventsController],
providers: [
{
provide: JobEventsService,
useValue: mockJobEventsService,
},
],
})
.overrideGuard(AuthGuard)
.useValue(mockAuthGuard)
.overrideGuard(WorkspaceGuard)
.useValue(mockWorkspaceGuard)
.overrideGuard(PermissionGuard)
.useValue(mockPermissionGuard)
.compile();
controller = module.get<JobEventsController>(JobEventsController);
service = module.get<JobEventsService>(JobEventsService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("getEvents", () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const mockEvents = {
data: [
{
id: "event-1",
jobId,
stepId: null,
type: JOB_CREATED,
timestamp: new Date("2026-01-01T10:00:00Z"),
actor: "system",
payload: {},
},
],
meta: {
total: 1,
page: 1,
limit: 50,
totalPages: 1,
},
};
it("should return paginated events for a job", async () => {
mockJobEventsService.getEventsByJobId.mockResolvedValue(mockEvents);
const result = await controller.getEvents(jobId, {}, workspaceId);
expect(service.getEventsByJobId).toHaveBeenCalledWith(jobId, {});
expect(result).toEqual(mockEvents);
});
it("should pass query parameters to service", async () => {
const query = { type: JOB_CREATED, page: 2, limit: 10 };
mockJobEventsService.getEventsByJobId.mockResolvedValue(mockEvents);
await controller.getEvents(jobId, query, workspaceId);
expect(service.getEventsByJobId).toHaveBeenCalledWith(jobId, query);
});
it("should handle filtering by type", async () => {
const query = { type: JOB_CREATED };
mockJobEventsService.getEventsByJobId.mockResolvedValue(mockEvents);
const result = await controller.getEvents(jobId, query, workspaceId);
expect(service.getEventsByJobId).toHaveBeenCalledWith(jobId, query);
expect(result).toEqual(mockEvents);
});
it("should handle pagination parameters", async () => {
const query = { page: 2, limit: 25 };
mockJobEventsService.getEventsByJobId.mockResolvedValue({
...mockEvents,
meta: {
total: 100,
page: 2,
limit: 25,
totalPages: 4,
},
});
const result = await controller.getEvents(jobId, query, workspaceId);
expect(service.getEventsByJobId).toHaveBeenCalledWith(jobId, query);
expect(result.meta.page).toBe(2);
expect(result.meta.limit).toBe(25);
});
});
});

View File

@@ -0,0 +1,36 @@
import { Controller, Get, Param, Query, UseGuards } from "@nestjs/common";
import { JobEventsService } from "./job-events.service";
import { QueryEventsDto } from "./dto";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { Workspace, Permission, RequirePermission } from "../common/decorators";
/**
* Controller for job events endpoints
* Provides read-only access to job events for audit logging
*
* Guards are applied in order:
* 1. AuthGuard - Verifies user authentication
* 2. WorkspaceGuard - Validates workspace access and sets RLS context
* 3. PermissionGuard - Checks role-based permissions
*/
@Controller("runner-jobs/:jobId/events")
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
export class JobEventsController {
constructor(private readonly jobEventsService: JobEventsService) {}
/**
* GET /api/runner-jobs/:jobId/events
* Get paginated events for a specific job
* Requires: Any workspace member (including GUEST)
*/
@Get()
@RequirePermission(Permission.WORKSPACE_ANY)
async getEvents(
@Param("jobId") jobId: string,
@Query() query: QueryEventsDto,
@Workspace() _workspaceId: string
) {
return this.jobEventsService.getEventsByJobId(jobId, query);
}
}

View File

@@ -0,0 +1,18 @@
import { Module } from "@nestjs/common";
import { JobEventsController } from "./job-events.controller";
import { JobEventsService } from "./job-events.service";
import { PrismaModule } from "../prisma/prisma.module";
/**
* Job Events Module
*
* Provides immutable event logging for runner jobs using event sourcing pattern.
* Events are stored in PostgreSQL and provide a complete audit trail.
*/
@Module({
imports: [PrismaModule],
controllers: [JobEventsController],
providers: [JobEventsService],
exports: [JobEventsService],
})
export class JobEventsModule {}

View File

@@ -0,0 +1,338 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { JobEventsService } from "./job-events.service";
import { PrismaService } from "../prisma/prisma.service";
import { NotFoundException } from "@nestjs/common";
import { JOB_CREATED, STEP_STARTED, AI_TOKENS_USED } from "./event-types";
describe("JobEventsService", () => {
let service: JobEventsService;
let prisma: PrismaService;
const mockPrismaService = {
runnerJob: {
findUnique: vi.fn(),
},
jobStep: {
findUnique: vi.fn(),
},
jobEvent: {
create: vi.fn(),
findMany: vi.fn(),
count: vi.fn(),
},
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
JobEventsService,
{
provide: PrismaService,
useValue: mockPrismaService,
},
],
}).compile();
service = module.get<JobEventsService>(JobEventsService);
prisma = module.get<PrismaService>(PrismaService);
});
afterEach(() => {
vi.clearAllMocks();
});
describe("emitEvent", () => {
const jobId = "job-123";
const mockEvent = {
id: "event-123",
jobId,
stepId: null,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: { message: "Job created" },
};
it("should create a job event without stepId", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.create.mockResolvedValue(mockEvent);
const result = await service.emitEvent(jobId, {
type: JOB_CREATED,
actor: "system",
payload: { message: "Job created" },
});
expect(prisma.runnerJob.findUnique).toHaveBeenCalledWith({
where: { id: jobId },
select: { id: true },
});
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
type: JOB_CREATED,
timestamp: expect.any(Date),
actor: "system",
payload: { message: "Job created" },
},
});
expect(result).toEqual(mockEvent);
});
it("should create a job event with stepId", async () => {
const stepId = "step-123";
const eventWithStep = { ...mockEvent, stepId, type: STEP_STARTED };
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobStep.findUnique.mockResolvedValue({ id: stepId });
mockPrismaService.jobEvent.create.mockResolvedValue(eventWithStep);
const result = await service.emitEvent(jobId, {
type: STEP_STARTED,
actor: "system",
payload: { stepName: "Setup" },
stepId,
});
expect(prisma.jobStep.findUnique).toHaveBeenCalledWith({
where: { id: stepId },
select: { id: true },
});
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
step: { connect: { id: stepId } },
type: STEP_STARTED,
timestamp: expect.any(Date),
actor: "system",
payload: { stepName: "Setup" },
},
});
expect(result).toEqual(eventWithStep);
});
it("should throw NotFoundException if job does not exist", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
await expect(
service.emitEvent(jobId, {
type: JOB_CREATED,
actor: "system",
payload: {},
})
).rejects.toThrow(NotFoundException);
});
it("should throw NotFoundException if step does not exist", async () => {
const stepId = "step-invalid";
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobStep.findUnique.mockResolvedValue(null);
await expect(
service.emitEvent(jobId, {
type: STEP_STARTED,
actor: "system",
payload: {},
stepId,
})
).rejects.toThrow(NotFoundException);
});
});
describe("getEventsByJobId", () => {
const jobId = "job-123";
const mockEvents = [
{
id: "event-1",
jobId,
stepId: null,
type: JOB_CREATED,
timestamp: new Date("2026-01-01T10:00:00Z"),
actor: "system",
payload: {},
},
{
id: "event-2",
jobId,
stepId: "step-1",
type: STEP_STARTED,
timestamp: new Date("2026-01-01T10:01:00Z"),
actor: "system",
payload: {},
},
];
it("should return paginated events for a job", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents);
mockPrismaService.jobEvent.count.mockResolvedValue(2);
const result = await service.getEventsByJobId(jobId, {});
expect(prisma.runnerJob.findUnique).toHaveBeenCalledWith({
where: { id: jobId },
select: { id: true },
});
expect(prisma.jobEvent.findMany).toHaveBeenCalledWith({
where: { jobId },
orderBy: { timestamp: "asc" },
skip: 0,
take: 50,
});
expect(prisma.jobEvent.count).toHaveBeenCalledWith({
where: { jobId },
});
expect(result).toEqual({
data: mockEvents,
meta: {
total: 2,
page: 1,
limit: 50,
totalPages: 1,
},
});
});
it("should filter events by type", async () => {
const filteredEvents = [mockEvents[0]];
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.findMany.mockResolvedValue(filteredEvents);
mockPrismaService.jobEvent.count.mockResolvedValue(1);
const result = await service.getEventsByJobId(jobId, { type: JOB_CREATED });
expect(prisma.jobEvent.findMany).toHaveBeenCalledWith({
where: { jobId, type: JOB_CREATED },
orderBy: { timestamp: "asc" },
skip: 0,
take: 50,
});
expect(result.data).toHaveLength(1);
expect(result.meta.total).toBe(1);
});
it("should filter events by stepId", async () => {
const stepId = "step-1";
const filteredEvents = [mockEvents[1]];
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.findMany.mockResolvedValue(filteredEvents);
mockPrismaService.jobEvent.count.mockResolvedValue(1);
const result = await service.getEventsByJobId(jobId, { stepId });
expect(prisma.jobEvent.findMany).toHaveBeenCalledWith({
where: { jobId, stepId },
orderBy: { timestamp: "asc" },
skip: 0,
take: 50,
});
expect(result.data).toHaveLength(1);
});
it("should paginate results correctly", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.findMany.mockResolvedValue([mockEvents[1]]);
mockPrismaService.jobEvent.count.mockResolvedValue(2);
const result = await service.getEventsByJobId(jobId, { page: 2, limit: 1 });
expect(prisma.jobEvent.findMany).toHaveBeenCalledWith({
where: { jobId },
orderBy: { timestamp: "asc" },
skip: 1,
take: 1,
});
expect(result.data).toHaveLength(1);
expect(result.meta.page).toBe(2);
expect(result.meta.limit).toBe(1);
expect(result.meta.totalPages).toBe(2);
});
it("should throw NotFoundException if job does not exist", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
await expect(service.getEventsByJobId(jobId, {})).rejects.toThrow(NotFoundException);
});
});
describe("convenience methods", () => {
const jobId = "job-123";
beforeEach(() => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue({ id: jobId });
mockPrismaService.jobEvent.create.mockResolvedValue({
id: "event-123",
jobId,
stepId: null,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
});
});
it("should emit job.created event", async () => {
await service.emitJobCreated(jobId, { type: "code-task" });
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
type: JOB_CREATED,
timestamp: expect.any(Date),
actor: "system",
payload: { type: "code-task" },
},
});
});
it("should emit job.started event", async () => {
await service.emitJobStarted(jobId);
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
type: "job.started",
timestamp: expect.any(Date),
actor: "system",
payload: {},
},
});
});
it("should emit step.started event", async () => {
const stepId = "step-123";
mockPrismaService.jobStep.findUnique.mockResolvedValue({ id: stepId });
await service.emitStepStarted(jobId, stepId, { name: "Setup" });
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
step: { connect: { id: stepId } },
type: STEP_STARTED,
timestamp: expect.any(Date),
actor: "system",
payload: { name: "Setup" },
},
});
});
it("should emit ai.tokens_used event", async () => {
await service.emitAiTokensUsed(jobId, { input: 100, output: 50 });
expect(prisma.jobEvent.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
type: AI_TOKENS_USED,
timestamp: expect.any(Date),
actor: "system",
payload: { input: 100, output: 50 },
},
});
});
});
});

View File

@@ -0,0 +1,197 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import { CreateEventDto, QueryEventsDto } from "./dto";
import {
JOB_CREATED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
STEP_STARTED,
STEP_COMPLETED,
AI_TOKENS_USED,
} from "./event-types";
/**
* Service for managing job events
* Events are immutable once created and provide an audit log of all job activities
*/
@Injectable()
export class JobEventsService {
constructor(private readonly prisma: PrismaService) {}
/**
* Emit a job event
* Events are stored immutably in PostgreSQL
*/
async emitEvent(jobId: string, createEventDto: CreateEventDto) {
// Verify job exists
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
// Verify step exists if stepId is provided
if (createEventDto.stepId) {
const step = await this.prisma.jobStep.findUnique({
where: { id: createEventDto.stepId },
select: { id: true },
});
if (!step) {
throw new NotFoundException(`JobStep with ID ${createEventDto.stepId} not found`);
}
}
// Build event data
const data: Prisma.JobEventCreateInput = {
job: { connect: { id: jobId } },
type: createEventDto.type,
timestamp: new Date(),
actor: createEventDto.actor,
payload: createEventDto.payload as unknown as Prisma.InputJsonValue,
};
// Add step connection if provided
if (createEventDto.stepId) {
data.step = { connect: { id: createEventDto.stepId } };
}
// Create and return the event
return this.prisma.jobEvent.create({ data });
}
/**
* Get events for a specific job with optional filtering
*/
async getEventsByJobId(jobId: string, query: QueryEventsDto) {
// Verify job exists
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
const page = query.page ?? 1;
const limit = query.limit ?? 50;
const skip = (page - 1) * limit;
// Build where clause
const where: Prisma.JobEventWhereInput = { jobId };
if (query.type) {
where.type = query.type;
}
if (query.stepId) {
where.stepId = query.stepId;
}
// Execute queries in parallel
const [data, total] = await Promise.all([
this.prisma.jobEvent.findMany({
where,
orderBy: { timestamp: "asc" },
skip,
take: limit,
}),
this.prisma.jobEvent.count({ where }),
]);
return {
data,
meta: {
total,
page,
limit,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* Convenience method: Emit job.created event
*/
async emitJobCreated(jobId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: JOB_CREATED,
actor: "system",
payload,
});
}
/**
* Convenience method: Emit job.started event
*/
async emitJobStarted(jobId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: JOB_STARTED,
actor: "system",
payload,
});
}
/**
* Convenience method: Emit job.completed event
*/
async emitJobCompleted(jobId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: JOB_COMPLETED,
actor: "system",
payload,
});
}
/**
* Convenience method: Emit job.failed event
*/
async emitJobFailed(jobId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: JOB_FAILED,
actor: "system",
payload,
});
}
/**
* Convenience method: Emit step.started event
*/
async emitStepStarted(jobId: string, stepId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: STEP_STARTED,
actor: "system",
payload,
stepId,
});
}
/**
* Convenience method: Emit step.completed event
*/
async emitStepCompleted(jobId: string, stepId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: STEP_COMPLETED,
actor: "system",
payload,
stepId,
});
}
/**
* Convenience method: Emit ai.tokens_used event
*/
async emitAiTokensUsed(jobId: string, payload: Record<string, unknown> = {}) {
return this.emitEvent(jobId, {
type: AI_TOKENS_USED,
actor: "system",
payload,
});
}
}

View File

@@ -0,0 +1,26 @@
import { JobStepPhase, JobStepType, JobStepStatus } from "@prisma/client";
import { IsString, IsEnum, IsInt, IsOptional, MinLength, MaxLength, Min } from "class-validator";
/**
* DTO for creating a new job step
*/
export class CreateStepDto {
@IsInt({ message: "ordinal must be an integer" })
@Min(0, { message: "ordinal must be at least 0" })
ordinal!: number;
@IsEnum(JobStepPhase, { message: "phase must be a valid JobStepPhase" })
phase!: JobStepPhase;
@IsString({ message: "name must be a string" })
@MinLength(1, { message: "name must not be empty" })
@MaxLength(200, { message: "name must not exceed 200 characters" })
name!: string;
@IsEnum(JobStepType, { message: "type must be a valid JobStepType" })
type!: JobStepType;
@IsOptional()
@IsEnum(JobStepStatus, { message: "status must be a valid JobStepStatus" })
status?: JobStepStatus;
}

View File

@@ -0,0 +1,2 @@
export * from "./create-step.dto";
export * from "./update-step.dto";

View File

@@ -0,0 +1,25 @@
import { JobStepStatus } from "@prisma/client";
import { IsEnum, IsString, IsOptional, IsInt, Min } from "class-validator";
/**
* DTO for updating a job step
*/
export class UpdateStepDto {
@IsOptional()
@IsEnum(JobStepStatus, { message: "status must be a valid JobStepStatus" })
status?: JobStepStatus;
@IsOptional()
@IsString({ message: "output must be a string" })
output?: string;
@IsOptional()
@IsInt({ message: "tokensInput must be an integer" })
@Min(0, { message: "tokensInput must be at least 0" })
tokensInput?: number;
@IsOptional()
@IsInt({ message: "tokensOutput must be an integer" })
@Min(0, { message: "tokensOutput must be at least 0" })
tokensOutput?: number;
}

View File

@@ -0,0 +1,4 @@
export * from "./job-steps.module";
export * from "./job-steps.service";
export * from "./job-steps.controller";
export * from "./dto";

View File

@@ -0,0 +1,147 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { JobStepsController } from "./job-steps.controller";
import { JobStepsService } from "./job-steps.service";
import { JobStepPhase, JobStepType, JobStepStatus } from "@prisma/client";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard } from "../common/guards/workspace.guard";
import { PermissionGuard } from "../common/guards/permission.guard";
import { ExecutionContext } from "@nestjs/common";
describe("JobStepsController", () => {
let controller: JobStepsController;
let service: JobStepsService;
const mockJobStepsService = {
findAllByJob: vi.fn(),
findOne: vi.fn(),
create: vi.fn(),
update: vi.fn(),
startStep: vi.fn(),
completeStep: vi.fn(),
failStep: vi.fn(),
};
const mockAuthGuard = {
canActivate: vi.fn((context: ExecutionContext) => {
const request = context.switchToHttp().getRequest();
request.user = {
id: "user-123",
workspaceId: "workspace-123",
};
return true;
}),
};
const mockWorkspaceGuard = {
canActivate: vi.fn(() => true),
};
const mockPermissionGuard = {
canActivate: vi.fn(() => true),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [JobStepsController],
providers: [
{
provide: JobStepsService,
useValue: mockJobStepsService,
},
],
})
.overrideGuard(AuthGuard)
.useValue(mockAuthGuard)
.overrideGuard(WorkspaceGuard)
.useValue(mockWorkspaceGuard)
.overrideGuard(PermissionGuard)
.useValue(mockPermissionGuard)
.compile();
controller = module.get<JobStepsController>(JobStepsController);
service = module.get<JobStepsService>(JobStepsService);
// Clear all mocks before each test
vi.clearAllMocks();
});
it("should be defined", () => {
expect(controller).toBeDefined();
});
describe("findAll", () => {
it("should return all steps for a job", async () => {
const jobId = "job-123";
const mockSteps = [
{
id: "step-1",
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.COMPLETED,
output: "Cloned successfully",
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
},
{
id: "step-2",
jobId,
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "Run tests",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:05Z"),
completedAt: null,
durationMs: null,
},
];
mockJobStepsService.findAllByJob.mockResolvedValue(mockSteps);
const result = await controller.findAll(jobId);
expect(result).toEqual(mockSteps);
expect(service.findAllByJob).toHaveBeenCalledWith(jobId);
});
});
describe("findOne", () => {
it("should return a single step by ID", async () => {
const jobId = "job-123";
const stepId = "step-123";
const mockStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.COMPLETED,
output: "Cloned successfully",
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
};
mockJobStepsService.findOne.mockResolvedValue(mockStep);
const result = await controller.findOne(jobId, stepId);
expect(result).toEqual(mockStep);
expect(service.findOne).toHaveBeenCalledWith(stepId, jobId);
});
});
});

View File

@@ -0,0 +1,42 @@
import { Controller, Get, Param, UseGuards } from "@nestjs/common";
import { JobStepsService } from "./job-steps.service";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { Permission, RequirePermission } from "../common/decorators";
/**
* Controller for job steps endpoints
* All endpoints require authentication and workspace context
*
* Guards are applied in order:
* 1. AuthGuard - Verifies user authentication
* 2. WorkspaceGuard - Validates workspace access and sets RLS context
* 3. PermissionGuard - Checks role-based permissions
*/
@Controller("runner-jobs/:jobId/steps")
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
export class JobStepsController {
constructor(private readonly jobStepsService: JobStepsService) {}
/**
* GET /api/runner-jobs/:jobId/steps
* Get all steps for a job
* Requires: Any workspace member
*/
@Get()
@RequirePermission(Permission.WORKSPACE_ANY)
async findAll(@Param("jobId") jobId: string) {
return this.jobStepsService.findAllByJob(jobId);
}
/**
* GET /api/runner-jobs/:jobId/steps/:stepId
* Get a single step by ID
* Requires: Any workspace member
*/
@Get(":stepId")
@RequirePermission(Permission.WORKSPACE_ANY)
async findOne(@Param("jobId") jobId: string, @Param("stepId") stepId: string) {
return this.jobStepsService.findOne(stepId, jobId);
}
}

View File

@@ -0,0 +1,18 @@
import { Module } from "@nestjs/common";
import { JobStepsController } from "./job-steps.controller";
import { JobStepsService } from "./job-steps.service";
import { PrismaModule } from "../prisma/prisma.module";
/**
* Job Steps Module
*
* Provides granular step tracking within runner jobs.
* Tracks step status transitions, token usage, and duration.
*/
@Module({
imports: [PrismaModule],
controllers: [JobStepsController],
providers: [JobStepsService],
exports: [JobStepsService],
})
export class JobStepsModule {}

View File

@@ -0,0 +1,511 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { JobStepsService } from "./job-steps.service";
import { PrismaService } from "../prisma/prisma.service";
import { JobStepPhase, JobStepType, JobStepStatus } from "@prisma/client";
import { NotFoundException } from "@nestjs/common";
import { CreateStepDto, UpdateStepDto } from "./dto";
describe("JobStepsService", () => {
let service: JobStepsService;
let prisma: PrismaService;
const mockPrismaService = {
jobStep: {
create: vi.fn(),
findMany: vi.fn(),
findUnique: vi.fn(),
update: vi.fn(),
},
runnerJob: {
findUnique: vi.fn(),
},
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
JobStepsService,
{
provide: PrismaService,
useValue: mockPrismaService,
},
],
}).compile();
service = module.get<JobStepsService>(JobStepsService);
prisma = module.get<PrismaService>(PrismaService);
// Clear all mocks before each test
vi.clearAllMocks();
});
it("should be defined", () => {
expect(service).toBeDefined();
});
describe("create", () => {
it("should create a job step", async () => {
const jobId = "job-123";
const createDto: CreateStepDto = {
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repository",
type: JobStepType.COMMAND,
};
const mockStep = {
id: "step-123",
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repository",
type: JobStepType.COMMAND,
status: JobStepStatus.PENDING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: null,
completedAt: null,
durationMs: null,
};
mockPrismaService.jobStep.create.mockResolvedValue(mockStep);
const result = await service.create(jobId, createDto);
expect(result).toEqual(mockStep);
expect(prisma.jobStep.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repository",
type: JobStepType.COMMAND,
status: JobStepStatus.PENDING,
},
});
});
it("should use provided status when creating step", async () => {
const jobId = "job-123";
const createDto: CreateStepDto = {
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "Run tests",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
};
const mockStep = {
id: "step-124",
jobId,
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "Run tests",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: new Date(),
completedAt: null,
durationMs: null,
};
mockPrismaService.jobStep.create.mockResolvedValue(mockStep);
const result = await service.create(jobId, createDto);
expect(result).toEqual(mockStep);
expect(prisma.jobStep.create).toHaveBeenCalledWith({
data: {
job: { connect: { id: jobId } },
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "Run tests",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
},
});
});
});
describe("findAllByJob", () => {
it("should return all steps for a job ordered by ordinal", async () => {
const jobId = "job-123";
const mockSteps = [
{
id: "step-1",
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.COMPLETED,
output: "Cloned successfully",
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
},
{
id: "step-2",
jobId,
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "Run tests",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:05Z"),
completedAt: null,
durationMs: null,
},
];
mockPrismaService.jobStep.findMany.mockResolvedValue(mockSteps);
const result = await service.findAllByJob(jobId);
expect(result).toEqual(mockSteps);
expect(prisma.jobStep.findMany).toHaveBeenCalledWith({
where: { jobId },
orderBy: { ordinal: "asc" },
});
});
});
describe("findOne", () => {
it("should return a single step by ID", async () => {
const stepId = "step-123";
const jobId = "job-123";
const mockStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.COMPLETED,
output: "Cloned successfully",
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(mockStep);
const result = await service.findOne(stepId, jobId);
expect(result).toEqual(mockStep);
expect(prisma.jobStep.findUnique).toHaveBeenCalledWith({
where: { id: stepId, jobId },
});
});
it("should throw NotFoundException when step not found", async () => {
const stepId = "step-999";
const jobId = "job-123";
mockPrismaService.jobStep.findUnique.mockResolvedValue(null);
await expect(service.findOne(stepId, jobId)).rejects.toThrow(NotFoundException);
await expect(service.findOne(stepId, jobId)).rejects.toThrow(
`JobStep with ID ${stepId} not found`
);
});
});
describe("update", () => {
it("should update step status", async () => {
const stepId = "step-123";
const jobId = "job-123";
const updateDto: UpdateStepDto = {
status: JobStepStatus.COMPLETED,
};
const existingStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: null,
durationMs: null,
};
const updatedStep = {
...existingStep,
status: JobStepStatus.COMPLETED,
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(updatedStep);
const result = await service.update(stepId, jobId, updateDto);
expect(result).toEqual(updatedStep);
expect(prisma.jobStep.update).toHaveBeenCalledWith({
where: { id: stepId, jobId },
data: { status: JobStepStatus.COMPLETED },
});
});
it("should update step with output and token usage", async () => {
const stepId = "step-123";
const jobId = "job-123";
const updateDto: UpdateStepDto = {
status: JobStepStatus.COMPLETED,
output: "Analysis complete",
tokensInput: 1000,
tokensOutput: 500,
};
const existingStep = {
id: stepId,
jobId,
ordinal: 2,
phase: JobStepPhase.EXECUTION,
name: "AI Analysis",
type: JobStepType.AI_ACTION,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: new Date("2024-01-01T10:00:00Z"),
completedAt: null,
durationMs: null,
};
const updatedStep = {
...existingStep,
status: JobStepStatus.COMPLETED,
output: "Analysis complete",
tokensInput: 1000,
tokensOutput: 500,
completedAt: new Date("2024-01-01T10:00:10Z"),
durationMs: 10000,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(updatedStep);
const result = await service.update(stepId, jobId, updateDto);
expect(result).toEqual(updatedStep);
expect(prisma.jobStep.update).toHaveBeenCalledWith({
where: { id: stepId, jobId },
data: {
status: JobStepStatus.COMPLETED,
output: "Analysis complete",
tokensInput: 1000,
tokensOutput: 500,
},
});
});
it("should throw NotFoundException when step not found", async () => {
const stepId = "step-999";
const jobId = "job-123";
const updateDto: UpdateStepDto = {
status: JobStepStatus.COMPLETED,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(null);
await expect(service.update(stepId, jobId, updateDto)).rejects.toThrow(NotFoundException);
});
});
describe("startStep", () => {
it("should mark step as running and set startedAt", async () => {
const stepId = "step-123";
const jobId = "job-123";
const existingStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.PENDING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: null,
completedAt: null,
durationMs: null,
};
const startedStep = {
...existingStep,
status: JobStepStatus.RUNNING,
startedAt: new Date("2024-01-01T10:00:00Z"),
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(startedStep);
const result = await service.startStep(stepId, jobId);
expect(result).toEqual(startedStep);
expect(prisma.jobStep.update).toHaveBeenCalledWith({
where: { id: stepId, jobId },
data: {
status: JobStepStatus.RUNNING,
startedAt: expect.any(Date),
},
});
});
});
describe("completeStep", () => {
it("should mark step as completed and calculate duration", async () => {
const stepId = "step-123";
const jobId = "job-123";
const startTime = new Date("2024-01-01T10:00:00Z");
const existingStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: startTime,
completedAt: null,
durationMs: null,
};
const completedStep = {
...existingStep,
status: JobStepStatus.COMPLETED,
output: "Success",
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: 5000,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(completedStep);
const result = await service.completeStep(stepId, jobId, "Success");
expect(result).toEqual(completedStep);
expect(prisma.jobStep.update).toHaveBeenCalledWith({
where: { id: stepId, jobId },
data: {
status: JobStepStatus.COMPLETED,
output: "Success",
completedAt: expect.any(Date),
durationMs: expect.any(Number),
},
});
});
it("should handle step without startedAt by setting durationMs to null", async () => {
const stepId = "step-123";
const jobId = "job-123";
const existingStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.SETUP,
name: "Clone repo",
type: JobStepType.COMMAND,
status: JobStepStatus.PENDING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: null,
completedAt: null,
durationMs: null,
};
const completedStep = {
...existingStep,
status: JobStepStatus.COMPLETED,
output: "Success",
completedAt: new Date("2024-01-01T10:00:05Z"),
durationMs: null,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(completedStep);
const result = await service.completeStep(stepId, jobId, "Success");
expect(result.durationMs).toBeNull();
});
});
describe("failStep", () => {
it("should mark step as failed with error output", async () => {
const stepId = "step-123";
const jobId = "job-123";
const error = "Command failed with exit code 1";
const startTime = new Date("2024-01-01T10:00:00Z");
const existingStep = {
id: stepId,
jobId,
ordinal: 1,
phase: JobStepPhase.VALIDATION,
name: "Run tests",
type: JobStepType.GATE,
status: JobStepStatus.RUNNING,
output: null,
tokensInput: null,
tokensOutput: null,
startedAt: startTime,
completedAt: null,
durationMs: null,
};
const failedStep = {
...existingStep,
status: JobStepStatus.FAILED,
output: error,
completedAt: new Date("2024-01-01T10:00:03Z"),
durationMs: 3000,
};
mockPrismaService.jobStep.findUnique.mockResolvedValue(existingStep);
mockPrismaService.jobStep.update.mockResolvedValue(failedStep);
const result = await service.failStep(stepId, jobId, error);
expect(result).toEqual(failedStep);
expect(prisma.jobStep.update).toHaveBeenCalledWith({
where: { id: stepId, jobId },
data: {
status: JobStepStatus.FAILED,
output: error,
completedAt: expect.any(Date),
durationMs: expect.any(Number),
},
});
});
});
});

View File

@@ -0,0 +1,148 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import { Prisma, JobStepStatus } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import type { CreateStepDto, UpdateStepDto } from "./dto";
/**
* Service for managing job steps
*/
@Injectable()
export class JobStepsService {
constructor(private readonly prisma: PrismaService) {}
/**
* Create a new job step
*/
async create(jobId: string, createStepDto: CreateStepDto) {
const data: Prisma.JobStepCreateInput = {
job: { connect: { id: jobId } },
ordinal: createStepDto.ordinal,
phase: createStepDto.phase,
name: createStepDto.name,
type: createStepDto.type,
status: createStepDto.status ?? JobStepStatus.PENDING,
};
return this.prisma.jobStep.create({ data });
}
/**
* Get all steps for a job, ordered by ordinal
*/
async findAllByJob(jobId: string) {
return this.prisma.jobStep.findMany({
where: { jobId },
orderBy: { ordinal: "asc" },
});
}
/**
* Get a single step by ID
*/
async findOne(id: string, jobId: string) {
const step = await this.prisma.jobStep.findUnique({
where: { id, jobId },
});
if (!step) {
throw new NotFoundException(`JobStep with ID ${id} not found`);
}
return step;
}
/**
* Update a job step
*/
async update(id: string, jobId: string, updateStepDto: UpdateStepDto) {
// Verify step exists
await this.findOne(id, jobId);
const data: Prisma.JobStepUpdateInput = {};
if (updateStepDto.status !== undefined) {
data.status = updateStepDto.status;
}
if (updateStepDto.output !== undefined) {
data.output = updateStepDto.output;
}
if (updateStepDto.tokensInput !== undefined) {
data.tokensInput = updateStepDto.tokensInput;
}
if (updateStepDto.tokensOutput !== undefined) {
data.tokensOutput = updateStepDto.tokensOutput;
}
return this.prisma.jobStep.update({
where: { id, jobId },
data,
});
}
/**
* Mark a step as running and set startedAt timestamp
*/
async startStep(id: string, jobId: string) {
// Verify step exists
await this.findOne(id, jobId);
return this.prisma.jobStep.update({
where: { id, jobId },
data: {
status: JobStepStatus.RUNNING,
startedAt: new Date(),
},
});
}
/**
* Mark a step as completed, set output, and calculate duration
*/
async completeStep(id: string, jobId: string, output?: string) {
// Verify step exists and get startedAt
const existingStep = await this.findOne(id, jobId);
const completedAt = new Date();
const durationMs = existingStep.startedAt
? completedAt.getTime() - existingStep.startedAt.getTime()
: null;
const data: Prisma.JobStepUpdateInput = {
status: JobStepStatus.COMPLETED,
completedAt,
durationMs,
};
if (output !== undefined) {
data.output = output;
}
return this.prisma.jobStep.update({
where: { id, jobId },
data,
});
}
/**
* Mark a step as failed, set error output, and calculate duration
*/
async failStep(id: string, jobId: string, error: string) {
// Verify step exists and get startedAt
const existingStep = await this.findOne(id, jobId);
const completedAt = new Date();
const durationMs = existingStep.startedAt
? completedAt.getTime() - existingStep.startedAt.getTime()
: null;
return this.prisma.jobStep.update({
where: { id, jobId },
data: {
status: JobStepStatus.FAILED,
output: error,
completedAt,
durationMs,
},
});
}
}