Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
991 lines
28 KiB
TypeScript
991 lines
28 KiB
TypeScript
import { describe, it, expect, beforeEach, vi } from "vitest";
|
|
import { Test, TestingModule } from "@nestjs/testing";
|
|
import { RunnerJobsService } from "./runner-jobs.service";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import { BullMqService } from "../bullmq/bullmq.service";
|
|
import { WebSocketGateway } from "../websocket/websocket.gateway";
|
|
import { RunnerJobStatus } from "@prisma/client";
|
|
import { NotFoundException, BadRequestException } from "@nestjs/common";
|
|
import { CreateJobDto, QueryJobsDto } from "./dto";
|
|
|
|
describe("RunnerJobsService", () => {
|
|
let service: RunnerJobsService;
|
|
let prisma: PrismaService;
|
|
let bullMq: BullMqService;
|
|
|
|
const mockPrismaService = {
|
|
runnerJob: {
|
|
create: vi.fn(),
|
|
findMany: vi.fn(),
|
|
count: vi.fn(),
|
|
findUnique: vi.fn(),
|
|
update: vi.fn(),
|
|
updateMany: vi.fn(),
|
|
},
|
|
jobEvent: {
|
|
findMany: vi.fn(),
|
|
findUnique: vi.fn(),
|
|
},
|
|
};
|
|
|
|
const mockBullMqService = {
|
|
addJob: vi.fn(),
|
|
getQueue: vi.fn(),
|
|
};
|
|
|
|
const mockWebSocketGateway = {
|
|
emitJobCreated: vi.fn(),
|
|
emitJobStatusChanged: vi.fn(),
|
|
emitJobProgress: vi.fn(),
|
|
};
|
|
|
|
beforeEach(async () => {
|
|
const module: TestingModule = await Test.createTestingModule({
|
|
providers: [
|
|
RunnerJobsService,
|
|
{
|
|
provide: PrismaService,
|
|
useValue: mockPrismaService,
|
|
},
|
|
{
|
|
provide: BullMqService,
|
|
useValue: mockBullMqService,
|
|
},
|
|
{
|
|
provide: WebSocketGateway,
|
|
useValue: mockWebSocketGateway,
|
|
},
|
|
],
|
|
}).compile();
|
|
|
|
service = module.get<RunnerJobsService>(RunnerJobsService);
|
|
prisma = module.get<PrismaService>(PrismaService);
|
|
bullMq = module.get<BullMqService>(BullMqService);
|
|
|
|
// Clear all mocks before each test
|
|
vi.clearAllMocks();
|
|
});
|
|
|
|
it("should be defined", () => {
|
|
expect(service).toBeDefined();
|
|
});
|
|
|
|
describe("create", () => {
|
|
it("should create a job and add it to BullMQ queue", async () => {
|
|
const workspaceId = "workspace-123";
|
|
const createDto: CreateJobDto = {
|
|
type: "git-status",
|
|
priority: 5,
|
|
data: { repo: "test-repo" },
|
|
};
|
|
|
|
const mockJob = {
|
|
id: "job-123",
|
|
workspaceId,
|
|
type: "git-status",
|
|
status: RunnerJobStatus.PENDING,
|
|
priority: 5,
|
|
progressPercent: 0,
|
|
result: null,
|
|
error: null,
|
|
createdAt: new Date(),
|
|
startedAt: null,
|
|
completedAt: null,
|
|
agentTaskId: null,
|
|
};
|
|
|
|
const mockBullMqJob = {
|
|
id: "bull-job-123",
|
|
name: "runner-job",
|
|
};
|
|
|
|
mockPrismaService.runnerJob.create.mockResolvedValue(mockJob);
|
|
mockBullMqService.addJob.mockResolvedValue(mockBullMqJob);
|
|
|
|
const result = await service.create(workspaceId, createDto);
|
|
|
|
expect(result).toEqual(mockJob);
|
|
expect(prisma.runnerJob.create).toHaveBeenCalledWith({
|
|
data: {
|
|
workspace: { connect: { id: workspaceId } },
|
|
type: "git-status",
|
|
priority: 5,
|
|
status: RunnerJobStatus.PENDING,
|
|
progressPercent: 0,
|
|
result: { repo: "test-repo" },
|
|
},
|
|
});
|
|
expect(bullMq.addJob).toHaveBeenCalledWith(
|
|
"mosaic-jobs-runner",
|
|
"runner-job",
|
|
{
|
|
jobId: "job-123",
|
|
workspaceId,
|
|
type: "git-status",
|
|
data: { repo: "test-repo" },
|
|
},
|
|
{ priority: 5 }
|
|
);
|
|
});
|
|
|
|
it("should create a job with agentTaskId if provided", async () => {
|
|
const workspaceId = "workspace-123";
|
|
const createDto: CreateJobDto = {
|
|
type: "code-task",
|
|
agentTaskId: "agent-task-123",
|
|
priority: 8,
|
|
};
|
|
|
|
const mockJob = {
|
|
id: "job-456",
|
|
workspaceId,
|
|
type: "code-task",
|
|
status: RunnerJobStatus.PENDING,
|
|
priority: 8,
|
|
progressPercent: 0,
|
|
result: null,
|
|
error: null,
|
|
createdAt: new Date(),
|
|
startedAt: null,
|
|
completedAt: null,
|
|
agentTaskId: "agent-task-123",
|
|
};
|
|
|
|
mockPrismaService.runnerJob.create.mockResolvedValue(mockJob);
|
|
mockBullMqService.addJob.mockResolvedValue({ id: "bull-job-456" });
|
|
|
|
const result = await service.create(workspaceId, createDto);
|
|
|
|
expect(result).toEqual(mockJob);
|
|
expect(prisma.runnerJob.create).toHaveBeenCalledWith({
|
|
data: {
|
|
workspace: { connect: { id: workspaceId } },
|
|
type: "code-task",
|
|
priority: 8,
|
|
status: RunnerJobStatus.PENDING,
|
|
progressPercent: 0,
|
|
agentTask: { connect: { id: "agent-task-123" } },
|
|
},
|
|
});
|
|
});
|
|
|
|
it("should use default priority of 5 if not provided", async () => {
|
|
const workspaceId = "workspace-123";
|
|
const createDto: CreateJobDto = {
|
|
type: "priority-calc",
|
|
};
|
|
|
|
const mockJob = {
|
|
id: "job-789",
|
|
workspaceId,
|
|
type: "priority-calc",
|
|
status: RunnerJobStatus.PENDING,
|
|
priority: 5,
|
|
progressPercent: 0,
|
|
result: null,
|
|
error: null,
|
|
createdAt: new Date(),
|
|
startedAt: null,
|
|
completedAt: null,
|
|
agentTaskId: null,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.create.mockResolvedValue(mockJob);
|
|
mockBullMqService.addJob.mockResolvedValue({ id: "bull-job-789" });
|
|
|
|
await service.create(workspaceId, createDto);
|
|
|
|
expect(prisma.runnerJob.create).toHaveBeenCalledWith({
|
|
data: {
|
|
workspace: { connect: { id: workspaceId } },
|
|
type: "priority-calc",
|
|
priority: 5,
|
|
status: RunnerJobStatus.PENDING,
|
|
progressPercent: 0,
|
|
},
|
|
});
|
|
});
|
|
});
|
|
|
|
describe("findAll", () => {
|
|
it("should return paginated jobs with filters", async () => {
|
|
const query: QueryJobsDto = {
|
|
workspaceId: "workspace-123",
|
|
status: RunnerJobStatus.PENDING,
|
|
page: 1,
|
|
limit: 10,
|
|
};
|
|
|
|
const mockJobs = [
|
|
{
|
|
id: "job-1",
|
|
workspaceId: "workspace-123",
|
|
type: "git-status",
|
|
status: RunnerJobStatus.PENDING,
|
|
priority: 5,
|
|
progressPercent: 0,
|
|
createdAt: new Date(),
|
|
},
|
|
];
|
|
|
|
mockPrismaService.runnerJob.findMany.mockResolvedValue(mockJobs);
|
|
mockPrismaService.runnerJob.count.mockResolvedValue(1);
|
|
|
|
const result = await service.findAll(query);
|
|
|
|
expect(result).toEqual({
|
|
data: mockJobs,
|
|
meta: {
|
|
total: 1,
|
|
page: 1,
|
|
limit: 10,
|
|
totalPages: 1,
|
|
},
|
|
});
|
|
});
|
|
|
|
it("should handle multiple status filters", async () => {
|
|
const query: QueryJobsDto = {
|
|
workspaceId: "workspace-123",
|
|
status: [RunnerJobStatus.RUNNING, RunnerJobStatus.QUEUED],
|
|
page: 1,
|
|
limit: 50,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findMany.mockResolvedValue([]);
|
|
mockPrismaService.runnerJob.count.mockResolvedValue(0);
|
|
|
|
await service.findAll(query);
|
|
|
|
expect(prisma.runnerJob.findMany).toHaveBeenCalledWith({
|
|
where: {
|
|
workspaceId: "workspace-123",
|
|
status: { in: [RunnerJobStatus.RUNNING, RunnerJobStatus.QUEUED] },
|
|
},
|
|
include: {
|
|
agentTask: {
|
|
select: { id: true, title: true, status: true },
|
|
},
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
skip: 0,
|
|
take: 50,
|
|
});
|
|
});
|
|
|
|
it("should filter by type", async () => {
|
|
const query: QueryJobsDto = {
|
|
workspaceId: "workspace-123",
|
|
type: "code-task",
|
|
page: 1,
|
|
limit: 50,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findMany.mockResolvedValue([]);
|
|
mockPrismaService.runnerJob.count.mockResolvedValue(0);
|
|
|
|
await service.findAll(query);
|
|
|
|
expect(prisma.runnerJob.findMany).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
where: {
|
|
workspaceId: "workspace-123",
|
|
type: "code-task",
|
|
},
|
|
})
|
|
);
|
|
});
|
|
|
|
it("should use default pagination values", async () => {
|
|
const query: QueryJobsDto = {
|
|
workspaceId: "workspace-123",
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findMany.mockResolvedValue([]);
|
|
mockPrismaService.runnerJob.count.mockResolvedValue(0);
|
|
|
|
await service.findAll(query);
|
|
|
|
expect(prisma.runnerJob.findMany).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
skip: 0,
|
|
take: 50,
|
|
})
|
|
);
|
|
});
|
|
});
|
|
|
|
describe("findOne", () => {
|
|
it("should return a single job by ID", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
type: "git-status",
|
|
status: RunnerJobStatus.COMPLETED,
|
|
priority: 5,
|
|
progressPercent: 100,
|
|
result: { status: "success" },
|
|
error: null,
|
|
createdAt: new Date(),
|
|
startedAt: new Date(),
|
|
completedAt: new Date(),
|
|
agentTask: null,
|
|
steps: [],
|
|
events: [],
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob);
|
|
|
|
const result = await service.findOne(jobId, workspaceId);
|
|
|
|
expect(result).toEqual(mockJob);
|
|
expect(prisma.runnerJob.findUnique).toHaveBeenCalledWith({
|
|
where: {
|
|
id: jobId,
|
|
workspaceId,
|
|
},
|
|
include: {
|
|
agentTask: {
|
|
select: { id: true, title: true, status: true },
|
|
},
|
|
steps: {
|
|
orderBy: { ordinal: "asc" },
|
|
},
|
|
events: {
|
|
orderBy: { timestamp: "asc" },
|
|
},
|
|
},
|
|
});
|
|
});
|
|
|
|
it("should throw NotFoundException if job not found", async () => {
|
|
const jobId = "nonexistent-job";
|
|
const workspaceId = "workspace-123";
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
|
|
|
|
await expect(service.findOne(jobId, workspaceId)).rejects.toThrow(NotFoundException);
|
|
await expect(service.findOne(jobId, workspaceId)).rejects.toThrow(
|
|
`RunnerJob with ID ${jobId} not found`
|
|
);
|
|
});
|
|
});
|
|
|
|
describe("cancel", () => {
|
|
it("should cancel a pending job", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
status: RunnerJobStatus.PENDING,
|
|
version: 1,
|
|
};
|
|
|
|
const mockUpdatedJob = {
|
|
...mockExistingJob,
|
|
status: RunnerJobStatus.CANCELLED,
|
|
completedAt: new Date(),
|
|
version: 2,
|
|
};
|
|
|
|
// First findUnique returns existing job, second returns updated job
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce(mockExistingJob)
|
|
.mockResolvedValueOnce(mockUpdatedJob);
|
|
// updateMany returns count for optimistic locking
|
|
mockPrismaService.runnerJob.updateMany.mockResolvedValue({ count: 1 });
|
|
|
|
const result = await service.cancel(jobId, workspaceId);
|
|
|
|
expect(result).toEqual(mockUpdatedJob);
|
|
expect(mockPrismaService.runnerJob.updateMany).toHaveBeenCalledWith({
|
|
where: { id: jobId, workspaceId, version: mockExistingJob.version },
|
|
data: {
|
|
status: RunnerJobStatus.CANCELLED,
|
|
completedAt: expect.any(Date),
|
|
version: { increment: 1 },
|
|
},
|
|
});
|
|
});
|
|
|
|
it("should cancel a queued job", async () => {
|
|
const jobId = "job-456";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
status: RunnerJobStatus.QUEUED,
|
|
version: 1,
|
|
};
|
|
|
|
const mockUpdatedJob = {
|
|
...mockExistingJob,
|
|
status: RunnerJobStatus.CANCELLED,
|
|
version: 2,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce(mockExistingJob)
|
|
.mockResolvedValueOnce(mockUpdatedJob);
|
|
mockPrismaService.runnerJob.updateMany.mockResolvedValue({ count: 1 });
|
|
|
|
await service.cancel(jobId, workspaceId);
|
|
|
|
expect(mockPrismaService.runnerJob.updateMany).toHaveBeenCalled();
|
|
});
|
|
|
|
it("should throw NotFoundException if job not found", async () => {
|
|
const jobId = "nonexistent-job";
|
|
const workspaceId = "workspace-123";
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
|
|
|
|
await expect(service.cancel(jobId, workspaceId)).rejects.toThrow(NotFoundException);
|
|
});
|
|
|
|
it("should throw BadRequestException if job is already completed", async () => {
|
|
const jobId = "job-789";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockExistingJob);
|
|
|
|
await expect(service.cancel(jobId, workspaceId)).rejects.toThrow(BadRequestException);
|
|
await expect(service.cancel(jobId, workspaceId)).rejects.toThrow(
|
|
"Cannot cancel job with status COMPLETED"
|
|
);
|
|
});
|
|
|
|
it("should throw BadRequestException if job is already cancelled", async () => {
|
|
const jobId = "job-999";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
status: RunnerJobStatus.CANCELLED,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockExistingJob);
|
|
|
|
await expect(service.cancel(jobId, workspaceId)).rejects.toThrow(BadRequestException);
|
|
});
|
|
});
|
|
|
|
describe("retry", () => {
|
|
it("should retry a failed job", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
type: "git-status",
|
|
status: RunnerJobStatus.FAILED,
|
|
priority: 5,
|
|
result: { repo: "test-repo" },
|
|
};
|
|
|
|
const mockNewJob = {
|
|
id: "job-new",
|
|
workspaceId,
|
|
type: "git-status",
|
|
status: RunnerJobStatus.PENDING,
|
|
priority: 5,
|
|
progressPercent: 0,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockExistingJob);
|
|
mockPrismaService.runnerJob.create.mockResolvedValue(mockNewJob);
|
|
mockBullMqService.addJob.mockResolvedValue({ id: "bull-job-new" });
|
|
|
|
const result = await service.retry(jobId, workspaceId);
|
|
|
|
expect(result).toEqual(mockNewJob);
|
|
expect(prisma.runnerJob.create).toHaveBeenCalledWith({
|
|
data: {
|
|
workspace: { connect: { id: workspaceId } },
|
|
type: "git-status",
|
|
priority: 5,
|
|
status: RunnerJobStatus.PENDING,
|
|
progressPercent: 0,
|
|
result: { repo: "test-repo" },
|
|
},
|
|
});
|
|
expect(bullMq.addJob).toHaveBeenCalled();
|
|
});
|
|
|
|
it("should throw NotFoundException if job not found", async () => {
|
|
const jobId = "nonexistent-job";
|
|
const workspaceId = "workspace-123";
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
|
|
|
|
await expect(service.retry(jobId, workspaceId)).rejects.toThrow(NotFoundException);
|
|
});
|
|
|
|
it("should throw BadRequestException if job is not failed", async () => {
|
|
const jobId = "job-456";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockExistingJob = {
|
|
id: jobId,
|
|
workspaceId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockExistingJob);
|
|
|
|
await expect(service.retry(jobId, workspaceId)).rejects.toThrow(BadRequestException);
|
|
await expect(service.retry(jobId, workspaceId)).rejects.toThrow("Can only retry failed jobs");
|
|
});
|
|
});
|
|
|
|
describe("streamEvents", () => {
|
|
it("should stream events and close when job completes", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
// Mock response object
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
setHeader: vi.fn(),
|
|
};
|
|
|
|
// Mock initial job lookup
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED, // Second call for status check
|
|
});
|
|
|
|
// Mock events
|
|
const mockEvents = [
|
|
{
|
|
id: "event-1",
|
|
jobId,
|
|
stepId: "step-1",
|
|
type: "step.started",
|
|
timestamp: new Date(),
|
|
payload: { name: "Running tests", phase: "validation" },
|
|
},
|
|
];
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents);
|
|
|
|
// Execute streamEvents
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify job lookup was called
|
|
expect(prisma.runnerJob.findUnique).toHaveBeenCalledWith({
|
|
where: { id: jobId, workspaceId },
|
|
select: { id: true, status: true },
|
|
});
|
|
|
|
// Verify events were written
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("step.started"));
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("stream.complete"));
|
|
expect(mockRes.end).toHaveBeenCalled();
|
|
});
|
|
|
|
it("should throw NotFoundException if job not found", async () => {
|
|
const jobId = "nonexistent-job";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
|
|
|
|
await expect(service.streamEvents(jobId, workspaceId, mockRes as never)).rejects.toThrow(
|
|
NotFoundException
|
|
);
|
|
await expect(service.streamEvents(jobId, workspaceId, mockRes as never)).rejects.toThrow(
|
|
`RunnerJob with ID ${jobId} not found`
|
|
);
|
|
});
|
|
|
|
it("should clean up interval on connection close", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn((event: string, handler: () => void) => {
|
|
if (event === "close") {
|
|
// Immediately trigger close to break the loop
|
|
setTimeout(() => handler(), 10);
|
|
}
|
|
}),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
});
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue([]);
|
|
|
|
// Start streaming and wait for it to complete
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify cleanup
|
|
expect(mockRes.on).toHaveBeenCalledWith("close", expect.any(Function));
|
|
expect(mockRes.end).toHaveBeenCalled();
|
|
});
|
|
|
|
it("should call clearInterval in finally block to prevent memory leaks", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
// Spy on global setInterval and clearInterval
|
|
const mockIntervalId = 12345;
|
|
const setIntervalSpy = vi
|
|
.spyOn(global, "setInterval")
|
|
.mockReturnValue(mockIntervalId as never);
|
|
const clearIntervalSpy = vi.spyOn(global, "clearInterval").mockImplementation(() => {});
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
};
|
|
|
|
// Mock job to complete immediately
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
});
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue([]);
|
|
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify setInterval was called for keep-alive ping
|
|
expect(setIntervalSpy).toHaveBeenCalled();
|
|
|
|
// Verify clearInterval was called with the interval ID to prevent memory leak
|
|
expect(clearIntervalSpy).toHaveBeenCalledWith(mockIntervalId);
|
|
|
|
// Cleanup spies
|
|
setIntervalSpy.mockRestore();
|
|
clearIntervalSpy.mockRestore();
|
|
});
|
|
|
|
it("should clear interval even when stream throws an error", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
// Spy on global setInterval and clearInterval
|
|
const mockIntervalId = 54321;
|
|
const setIntervalSpy = vi
|
|
.spyOn(global, "setInterval")
|
|
.mockReturnValue(mockIntervalId as never);
|
|
const clearIntervalSpy = vi.spyOn(global, "clearInterval").mockImplementation(() => {});
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
});
|
|
|
|
// Simulate a fatal error during event polling
|
|
mockPrismaService.jobEvent.findMany.mockRejectedValue(new Error("Fatal database failure"));
|
|
|
|
// The method should throw but still clean up
|
|
await expect(service.streamEvents(jobId, workspaceId, mockRes as never)).rejects.toThrow(
|
|
"Fatal database failure"
|
|
);
|
|
|
|
// Verify clearInterval was called even on error (via finally block)
|
|
expect(clearIntervalSpy).toHaveBeenCalledWith(mockIntervalId);
|
|
|
|
// Cleanup spies
|
|
setIntervalSpy.mockRestore();
|
|
clearIntervalSpy.mockRestore();
|
|
});
|
|
|
|
// ERROR RECOVERY TESTS - Issue #187
|
|
|
|
it("should support resuming stream from lastEventId", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
const lastEventId = "event-5";
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
};
|
|
|
|
// Mock initial job lookup
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
});
|
|
|
|
// Mock finding the last event for timestamp lookup
|
|
mockPrismaService.jobEvent.findUnique.mockResolvedValue({
|
|
id: lastEventId,
|
|
timestamp: new Date("2026-01-01T12:00:00Z"),
|
|
});
|
|
|
|
// Mock events starting after the lastEventId
|
|
const mockEvents = [
|
|
{
|
|
id: "event-6",
|
|
jobId,
|
|
stepId: "step-2",
|
|
type: "step.started",
|
|
timestamp: new Date("2026-01-01T12:01:00Z"),
|
|
payload: { name: "Next step" },
|
|
},
|
|
];
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents);
|
|
|
|
// Execute streamEvents with lastEventId
|
|
await service.streamEventsFrom(jobId, workspaceId, mockRes as never, lastEventId);
|
|
|
|
// Verify events query used lastEventId as cursor
|
|
expect(prisma.jobEvent.findMany).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
where: expect.objectContaining({
|
|
id: { gt: lastEventId },
|
|
}),
|
|
})
|
|
);
|
|
});
|
|
|
|
it("should send event IDs for reconnection support", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
});
|
|
|
|
const mockEvents = [
|
|
{
|
|
id: "event-123",
|
|
jobId,
|
|
stepId: "step-1",
|
|
type: "step.started",
|
|
timestamp: new Date(),
|
|
payload: { name: "Test" },
|
|
},
|
|
];
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue(mockEvents);
|
|
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify event ID was sent
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("id: event-123"));
|
|
});
|
|
|
|
it("should handle database connection errors gracefully", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
let closeHandler: (() => void) | null = null;
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn((event: string, handler: () => void) => {
|
|
if (event === "close") {
|
|
closeHandler = handler;
|
|
}
|
|
}),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
});
|
|
|
|
// Simulate database error during event polling (non-retryable)
|
|
const dbError = new Error("Fatal database error");
|
|
mockPrismaService.jobEvent.findMany.mockRejectedValue(dbError);
|
|
|
|
// Should propagate non-retryable error
|
|
await expect(service.streamEvents(jobId, workspaceId, mockRes as never)).rejects.toThrow(
|
|
"Fatal database error"
|
|
);
|
|
|
|
// Verify error event was written
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("event: error"));
|
|
});
|
|
|
|
it("should send retry hint on transient errors", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
let callCount = 0;
|
|
let closeHandler: (() => void) | null = null;
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn((event: string, handler: () => void) => {
|
|
if (event === "close") {
|
|
closeHandler = handler;
|
|
}
|
|
}),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
});
|
|
|
|
// Simulate transient error, then success
|
|
mockPrismaService.jobEvent.findMany.mockImplementation(() => {
|
|
callCount++;
|
|
if (callCount === 1) {
|
|
return Promise.reject(new Error("Temporary connection issue"));
|
|
}
|
|
return Promise.resolve([]);
|
|
});
|
|
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify error event was sent with retryable flag
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("event: error"));
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining('"retryable":true'));
|
|
// Verify stream completed after retry
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("stream.complete"));
|
|
});
|
|
|
|
it("should respect client disconnect and stop polling", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
let closeHandler: (() => void) | null = null;
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn((event: string, handler: () => void) => {
|
|
if (event === "close") {
|
|
closeHandler = handler;
|
|
// Trigger close after first poll
|
|
setTimeout(() => handler(), 100);
|
|
}
|
|
}),
|
|
writableEnded: false,
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique.mockResolvedValue({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
});
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue([]);
|
|
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify cleanup happened
|
|
expect(mockRes.end).toHaveBeenCalled();
|
|
|
|
// Verify we didn't query excessively after disconnect
|
|
const queryCount = mockPrismaService.jobEvent.findMany.mock.calls.length;
|
|
expect(queryCount).toBeLessThan(5); // Should stop quickly after disconnect
|
|
});
|
|
|
|
it("should include connection metadata in stream headers", async () => {
|
|
const jobId = "job-123";
|
|
const workspaceId = "workspace-123";
|
|
|
|
const mockRes = {
|
|
write: vi.fn(),
|
|
end: vi.fn(),
|
|
on: vi.fn(),
|
|
writableEnded: false,
|
|
setHeader: vi.fn(),
|
|
};
|
|
|
|
mockPrismaService.runnerJob.findUnique
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.RUNNING,
|
|
})
|
|
.mockResolvedValueOnce({
|
|
id: jobId,
|
|
status: RunnerJobStatus.COMPLETED,
|
|
});
|
|
|
|
mockPrismaService.jobEvent.findMany.mockResolvedValue([]);
|
|
|
|
await service.streamEvents(jobId, workspaceId, mockRes as never);
|
|
|
|
// Verify SSE headers include retry recommendation
|
|
expect(mockRes.write).toHaveBeenCalledWith(expect.stringMatching(/retry: \d+/));
|
|
});
|
|
});
|
|
});
|