Fix QA validation issues and add M7.1 security fixes (#318)
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #318.
This commit is contained in:
2026-02-04 03:08:09 +00:00
committed by jason.woltje
parent 482507ce4d
commit a1973e6419
178 changed files with 4902 additions and 74 deletions

View File

@@ -119,9 +119,14 @@ describe("CoordinatorIntegrationService - Concurrency", () => {
expect(result.status).toBe(RunnerJobStatus.RUNNING);
// Verify SELECT FOR UPDATE was used
expect(mockTxClient.$queryRaw).toHaveBeenCalledWith(
expect.anything() // Raw SQL with FOR UPDATE
);
expect(mockTxClient.$queryRaw).toHaveBeenCalled();
const sqlCall = mockTxClient.$queryRaw.mock.calls[0];
expect(sqlCall).toBeDefined();
// Verify the SQL contains FOR UPDATE (raw SQL is passed as template parts)
const sqlString = sqlCall?.toString() ?? "";
expect(
sqlString.includes("FOR UPDATE") || sqlCall?.[0]?.toString().includes("FOR UPDATE")
).toBe(true);
});
it("should handle concurrent status updates by coordinator and API", async () => {
@@ -313,8 +318,11 @@ describe("CoordinatorIntegrationService - Concurrency", () => {
version: mockJob.version + 1,
};
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValue(mockJob as any);
// First findUnique returns the current job state
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValueOnce(mockJob as any);
// updateMany succeeds
vi.mocked(prisma.runnerJob.updateMany).mockResolvedValue({ count: 1 });
// Second findUnique returns the updated job state
vi.mocked(prisma.runnerJob.findUnique).mockResolvedValueOnce(updatedJob as any);
const result = await service.updateJobProgress(jobId, {

View File

@@ -58,7 +58,10 @@ describe("CoordinatorIntegrationService", () => {
create: vi.fn(),
findUnique: vi.fn(),
update: vi.fn(),
updateMany: vi.fn(),
},
$transaction: vi.fn(),
$queryRaw: vi.fn(),
};
const mockJobEventsService = {
@@ -97,6 +100,9 @@ describe("CoordinatorIntegrationService", () => {
jobEventsService = module.get<JobEventsService>(JobEventsService);
heraldService = module.get<HeraldService>(HeraldService);
bullMqService = module.get<BullMqService>(BullMqService);
// Set default mock return values
mockPrismaService.runnerJob.updateMany.mockResolvedValue({ count: 1 });
});
describe("createJob", () => {
@@ -145,8 +151,26 @@ describe("CoordinatorIntegrationService", () => {
it("should update job status to RUNNING", async () => {
const updatedJob = { ...mockJob, status: RunnerJobStatus.RUNNING, startedAt: new Date() };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob);
mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob);
// Mock transaction that passes through the callback
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: mockJob.status,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(updatedJob),
},
};
return callback(mockTx);
});
mockJobEventsService.emitJobStarted.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);
@@ -160,7 +184,16 @@ describe("CoordinatorIntegrationService", () => {
});
it("should throw NotFoundException if job does not exist", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
// Mock transaction with empty result
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi.fn().mockResolvedValue([]),
runnerJob: {
update: vi.fn(),
},
};
return callback(mockTx);
});
await expect(
service.updateJobStatus("non-existent", { status: "RUNNING" as const })
@@ -168,8 +201,25 @@ describe("CoordinatorIntegrationService", () => {
});
it("should throw BadRequestException for invalid status transition", async () => {
const completedJob = { ...mockJob, status: RunnerJobStatus.COMPLETED };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(completedJob);
// Mock transaction with completed job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.COMPLETED,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn(),
},
};
return callback(mockTx);
});
await expect(
service.updateJobStatus("job-123", { status: "RUNNING" as const })
@@ -179,11 +229,12 @@ describe("CoordinatorIntegrationService", () => {
describe("updateJobProgress", () => {
it("should update job progress percentage", async () => {
const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING };
const updatedJob = { ...runningJob, progressPercent: 50 };
const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING, version: 1 };
const updatedJob = { ...runningJob, progressPercent: 50, version: 2 };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob);
mockPrismaService.runnerJob.updateMany.mockResolvedValue({ count: 1 });
mockPrismaService.runnerJob.findUnique.mockResolvedValue(updatedJob);
mockJobEventsService.emitEvent.mockResolvedValue(mockEvent);
const result = await service.updateJobProgress("job-123", {
@@ -217,8 +268,26 @@ describe("CoordinatorIntegrationService", () => {
completedAt: new Date(),
};
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(completedJob);
// Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(completedJob),
},
};
return callback(mockTx);
});
mockJobEventsService.emitJobCompleted.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);
@@ -243,8 +312,26 @@ describe("CoordinatorIntegrationService", () => {
completedAt: new Date(),
};
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(failedJob);
// Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(failedJob),
},
};
return callback(mockTx);
});
mockJobEventsService.emitJobFailed.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);

View File

@@ -4,6 +4,7 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { ModuleRef } from "@nestjs/core";
import { HttpService } from "@nestjs/axios";
import { CommandService } from "./command.service";
import { PrismaService } from "../prisma/prisma.service";
@@ -16,6 +17,7 @@ import {
} from "@prisma/client";
import { of } from "rxjs";
import type { CommandMessage, CommandResponse } from "./types/message.types";
import { UnknownCommandTypeError } from "./errors/command.errors";
describe("CommandService", () => {
let service: CommandService;
@@ -23,6 +25,7 @@ describe("CommandService", () => {
let federationService: FederationService;
let signatureService: SignatureService;
let httpService: HttpService;
let moduleRef: ModuleRef;
const mockWorkspaceId = "workspace-123";
const mockConnectionId = "connection-123";
@@ -77,6 +80,7 @@ describe("CommandService", () => {
federationService = module.get<FederationService>(FederationService);
signatureService = module.get<SignatureService>(SignatureService);
httpService = module.get<HttpService>(HttpService);
moduleRef = module.get<ModuleRef>(ModuleRef);
});
describe("sendCommand", () => {
@@ -238,12 +242,75 @@ describe("CommandService", () => {
});
describe("handleIncomingCommand", () => {
it("should process a valid incoming command", async () => {
it("should process a valid incoming agent command", async () => {
const commandMessage: CommandMessage = {
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "spawn_agent",
payload: { agentType: "task_executor" },
commandType: "agent.spawn",
payload: { agentType: "task_executor", taskId: "task-123" },
timestamp: Date.now(),
signature: "signature-123",
};
const mockConnection = {
id: mockConnectionId,
remoteInstanceId: mockInstanceId,
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance",
displayName: "Local Instance",
};
const mockFederationAgentService = {
handleAgentCommand: vi.fn().mockResolvedValue({
success: true,
data: { agentId: "agent-123", status: "spawning", spawnedAt: new Date().toISOString() },
}),
};
vi.spyOn(signatureService, "validateTimestamp").mockReturnValue(true);
vi.spyOn(prisma.federationConnection, "findFirst").mockResolvedValue(mockConnection as never);
vi.spyOn(signatureService, "verifyMessage").mockResolvedValue({
valid: true,
error: null,
} as never);
vi.spyOn(federationService, "getInstanceIdentity").mockResolvedValue(mockIdentity as never);
vi.spyOn(signatureService, "signMessage").mockResolvedValue("response-signature");
vi.spyOn(moduleRef, "get").mockReturnValue(mockFederationAgentService as never);
const response = await service.handleIncomingCommand(commandMessage);
expect(response).toMatchObject({
correlationId: "cmd-123",
instanceId: "local-instance",
success: true,
});
expect(signatureService.validateTimestamp).toHaveBeenCalledWith(commandMessage.timestamp);
expect(signatureService.verifyMessage).toHaveBeenCalledWith(
expect.objectContaining({
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "agent.spawn",
}),
"signature-123",
mockInstanceId
);
expect(mockFederationAgentService.handleAgentCommand).toHaveBeenCalledWith(
mockInstanceId,
"agent.spawn",
commandMessage.payload
);
});
it("should handle unknown command types and return error response", async () => {
const commandMessage: CommandMessage = {
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "unknown.command",
payload: {},
timestamp: Date.now(),
signature: "signature-123",
};
@@ -273,18 +340,125 @@ describe("CommandService", () => {
expect(response).toMatchObject({
correlationId: "cmd-123",
instanceId: "local-instance",
success: true,
success: false,
error: "Unknown command type: unknown.command",
});
});
expect(signatureService.validateTimestamp).toHaveBeenCalledWith(commandMessage.timestamp);
expect(signatureService.verifyMessage).toHaveBeenCalledWith(
expect.objectContaining({
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "spawn_agent",
it("should handle business logic errors from agent service and return error response", async () => {
const commandMessage: CommandMessage = {
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "agent.spawn",
payload: { agentType: "invalid_type" },
timestamp: Date.now(),
signature: "signature-123",
};
const mockConnection = {
id: mockConnectionId,
remoteInstanceId: mockInstanceId,
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance",
displayName: "Local Instance",
};
vi.spyOn(signatureService, "validateTimestamp").mockReturnValue(true);
vi.spyOn(prisma.federationConnection, "findFirst").mockResolvedValue(mockConnection as never);
vi.spyOn(signatureService, "verifyMessage").mockResolvedValue({
valid: true,
error: null,
} as never);
vi.spyOn(federationService, "getInstanceIdentity").mockResolvedValue(mockIdentity as never);
vi.spyOn(signatureService, "signMessage").mockResolvedValue("response-signature");
// Mock FederationAgentService to return error response
const mockFederationAgentService = {
handleAgentCommand: vi.fn().mockResolvedValue({
success: false,
error: "Invalid agent type: invalid_type",
}),
"signature-123",
mockInstanceId
};
vi.spyOn(moduleRef, "get").mockReturnValue(mockFederationAgentService as never);
const response = await service.handleIncomingCommand(commandMessage);
expect(response).toMatchObject({
correlationId: "cmd-123",
instanceId: "local-instance",
success: false,
error: "Invalid agent type: invalid_type",
});
});
it("should let system errors propagate (database connection failure)", async () => {
const commandMessage: CommandMessage = {
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "agent.spawn",
payload: { agentType: "task_executor" },
timestamp: Date.now(),
signature: "signature-123",
};
vi.spyOn(signatureService, "validateTimestamp").mockReturnValue(true);
// Simulate database connection failure (system error)
const dbError = new Error("Connection pool exhausted");
dbError.name = "PoolExhaustedError";
vi.spyOn(prisma.federationConnection, "findFirst").mockRejectedValue(dbError);
// System errors should propagate
await expect(service.handleIncomingCommand(commandMessage)).rejects.toThrow(
"Connection pool exhausted"
);
});
it("should let system errors propagate from agent service (not wrapped)", async () => {
const commandMessage: CommandMessage = {
messageId: "cmd-123",
instanceId: mockInstanceId,
commandType: "agent.spawn",
payload: { agentType: "task_executor" },
timestamp: Date.now(),
signature: "signature-123",
};
const mockConnection = {
id: mockConnectionId,
remoteInstanceId: mockInstanceId,
status: FederationConnectionStatus.ACTIVE,
};
const mockIdentity = {
instanceId: "local-instance",
displayName: "Local Instance",
};
// Simulate a system error (not a CommandProcessingError) from agent service
const systemError = new Error("Database connection failed");
systemError.name = "DatabaseError";
const mockFederationAgentService = {
handleAgentCommand: vi.fn().mockRejectedValue(systemError),
};
vi.spyOn(signatureService, "validateTimestamp").mockReturnValue(true);
vi.spyOn(prisma.federationConnection, "findFirst").mockResolvedValue(mockConnection as never);
vi.spyOn(signatureService, "verifyMessage").mockResolvedValue({
valid: true,
error: null,
} as never);
vi.spyOn(federationService, "getInstanceIdentity").mockResolvedValue(mockIdentity as never);
vi.spyOn(moduleRef, "get").mockReturnValue(mockFederationAgentService as never);
// System errors should propagate (not caught by business logic handler)
await expect(service.handleIncomingCommand(commandMessage)).rejects.toThrow(
"Database connection failed"
);
});

View File

@@ -18,6 +18,7 @@ import {
FederationMessageStatus,
} from "@prisma/client";
import type { CommandMessage, CommandResponse, CommandMessageDetails } from "./types/message.types";
import { CommandProcessingError, UnknownCommandTypeError } from "./errors/command.errors";
@Injectable()
export class CommandService {
@@ -184,13 +185,30 @@ export class CommandService {
responseData = agentResponse.data;
errorMessage = agentResponse.error;
} else {
// Other command types can be added here
responseData = { message: "Command received and processed" };
// Unknown command type - throw business logic error
throw new UnknownCommandTypeError(commandMessage.commandType);
}
} catch (error) {
success = false;
errorMessage = error instanceof Error ? error.message : "Command processing failed";
this.logger.error(`Command processing failed: ${errorMessage}`);
// Only catch expected business logic errors
// System errors (OOM, DB failures, network issues) should propagate
if (error instanceof CommandProcessingError) {
success = false;
errorMessage = error.message;
this.logger.warn(`Command processing failed (business logic): ${errorMessage}`, {
commandType: commandMessage.commandType,
instanceId: commandMessage.instanceId,
messageId: commandMessage.messageId,
});
} else {
// System error - log and re-throw to preserve stack trace
this.logger.error(`System error during command processing: ${String(error)}`, {
commandType: commandMessage.commandType,
instanceId: commandMessage.instanceId,
messageId: commandMessage.messageId,
error: error instanceof Error ? error.stack : String(error),
});
throw error;
}
}
// Get local instance identity

View File

@@ -0,0 +1,49 @@
/**
* Command Processing Errors
*
* Custom error classes for expected business logic errors in command processing.
* These errors should be caught and returned as error responses.
* All other errors (system errors like OOM, DB failures) should propagate.
*/
/**
* Base class for command processing errors that should be caught
* and converted to error responses
*/
export class CommandProcessingError extends Error {
constructor(message: string) {
super(message);
this.name = "CommandProcessingError";
Error.captureStackTrace(this, this.constructor);
}
}
/**
* Error thrown when an unknown or unsupported command type is received
*/
export class UnknownCommandTypeError extends CommandProcessingError {
constructor(commandType: string) {
super(`Unknown command type: ${commandType}`);
this.name = "UnknownCommandTypeError";
}
}
/**
* Error thrown when command payload validation fails
*/
export class InvalidCommandPayloadError extends CommandProcessingError {
constructor(message: string) {
super(message);
this.name = "InvalidCommandPayloadError";
}
}
/**
* Error thrown when agent command execution fails due to business logic
*/
export class AgentCommandError extends CommandProcessingError {
constructor(message: string) {
super(message);
this.name = "AgentCommandError";
}
}

View File

@@ -419,13 +419,12 @@ describe("FederationAgentService", () => {
});
});
it("should return error for unknown command type", async () => {
it("should throw UnknownCommandTypeError for unknown command type", async () => {
prisma.federationConnection.findFirst.mockResolvedValue(mockConnection as never);
const result = await service.handleAgentCommand("remote-instance-1", "agent.unknown", {});
expect(result.success).toBe(false);
expect(result.error).toContain("Unknown agent command type: agent.unknown");
await expect(
service.handleAgentCommand("remote-instance-1", "agent.unknown", {})
).rejects.toThrow("Unknown command type: agent.unknown");
});
it("should throw error if connection not found", async () => {
@@ -436,7 +435,7 @@ describe("FederationAgentService", () => {
).rejects.toThrow("No connection found for remote instance");
});
it("should handle orchestrator errors", async () => {
it("should throw AgentCommandError for orchestrator errors", async () => {
const spawnPayload: SpawnAgentCommandPayload = {
taskId: mockTaskId,
agentType: "worker",
@@ -453,14 +452,9 @@ describe("FederationAgentService", () => {
throwError(() => new Error("Orchestrator connection failed")) as never
);
const result = await service.handleAgentCommand(
"remote-instance-1",
"agent.spawn",
spawnPayload
);
expect(result.success).toBe(false);
expect(result.error).toContain("Orchestrator connection failed");
await expect(
service.handleAgentCommand("remote-instance-1", "agent.spawn", spawnPayload)
).rejects.toThrow("Failed to spawn agent: Orchestrator connection failed");
});
});
});

View File

@@ -22,6 +22,7 @@ import type {
AgentStatusResponseData,
KillAgentResponseData,
} from "./types/federation-agent.types";
import { AgentCommandError, UnknownCommandTypeError } from "./errors/command.errors";
/**
* Agent command response structure
@@ -222,26 +223,19 @@ export class FederationAgentService {
}
// Route command to appropriate handler
try {
switch (commandType) {
case "agent.spawn":
return await this.handleSpawnCommand(payload as unknown as SpawnAgentCommandPayload);
switch (commandType) {
case "agent.spawn":
return await this.handleSpawnCommand(payload as unknown as SpawnAgentCommandPayload);
case "agent.status":
return await this.handleStatusCommand(payload as unknown as AgentStatusCommandPayload);
case "agent.status":
return await this.handleStatusCommand(payload as unknown as AgentStatusCommandPayload);
case "agent.kill":
return await this.handleKillCommand(payload as unknown as KillAgentCommandPayload);
case "agent.kill":
return await this.handleKillCommand(payload as unknown as KillAgentCommandPayload);
default:
throw new Error(`Unknown agent command type: ${commandType}`);
}
} catch (error) {
this.logger.error(`Error handling agent command: ${String(error)}`);
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
default:
// Unknown command type - throw business logic error
throw new UnknownCommandTypeError(commandType);
}
}
@@ -285,8 +279,10 @@ export class FederationAgentService {
data: responseData,
};
} catch (error) {
this.logger.error(`Failed to spawn agent: ${String(error)}`);
throw error;
// Wrap orchestrator errors as business logic errors
const errorMessage = error instanceof Error ? error.message : "Failed to spawn agent";
this.logger.error(`Failed to spawn agent: ${errorMessage}`);
throw new AgentCommandError(`Failed to spawn agent: ${errorMessage}`);
}
}
@@ -314,8 +310,10 @@ export class FederationAgentService {
data: responseData,
};
} catch (error) {
this.logger.error(`Failed to get agent status: ${String(error)}`);
throw error;
// Wrap orchestrator errors as business logic errors
const errorMessage = error instanceof Error ? error.message : "Failed to get agent status";
this.logger.error(`Failed to get agent status: ${errorMessage}`);
throw new AgentCommandError(`Failed to get agent status: ${errorMessage}`);
}
}
@@ -347,8 +345,10 @@ export class FederationAgentService {
data: responseData,
};
} catch (error) {
this.logger.error(`Failed to kill agent: ${String(error)}`);
throw error;
// Wrap orchestrator errors as business logic errors
const errorMessage = error instanceof Error ? error.message : "Failed to kill agent";
this.logger.error(`Failed to kill agent: ${errorMessage}`);
throw new AgentCommandError(`Failed to kill agent: ${errorMessage}`);
}
}
}

View File

@@ -0,0 +1,69 @@
/**
* HTTP Timeout Tests
*
* Verifies that HTTP requests have proper timeout configuration to prevent DoS attacks.
* Issue #282: Add HTTP request timeouts (DoS risk)
*/
import { describe, it, expect, beforeEach } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { HttpService, HttpModule } from "@nestjs/axios";
import { ConfigModule } from "@nestjs/config";
import { of, delay } from "rxjs";
describe("HTTP Timeout Configuration", () => {
let httpService: HttpService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [
ConfigModule,
HttpModule.register({
timeout: 10000, // 10 seconds
maxRedirects: 5,
}),
],
}).compile();
httpService = module.get<HttpService>(HttpService);
});
it("should have HttpService configured", () => {
expect(httpService).toBeDefined();
});
it("should have axios instance with timeout configured", () => {
const axiosInstance = httpService.axiosRef;
expect(axiosInstance.defaults.timeout).toBe(10000);
});
it("should have max redirects configured", () => {
const axiosInstance = httpService.axiosRef;
expect(axiosInstance.defaults.maxRedirects).toBe(5);
});
});
describe("HTTP Timeout Behavior", () => {
let httpService: HttpService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [
ConfigModule,
HttpModule.register({
timeout: 100, // 100ms for fast testing
maxRedirects: 5,
}),
],
}).compile();
httpService = module.get<HttpService>(HttpService);
});
it("should timeout requests that exceed the configured timeout", async () => {
// This test verifies the timeout mechanism exists
// In a real scenario, a slow server would trigger this
const axiosInstance = httpService.axiosRef;
expect(axiosInstance.defaults.timeout).toBe(100);
});
});

View File

@@ -241,6 +241,7 @@ describe("RunnerJobsController", () => {
it("should stream events via SSE", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const lastEventId = undefined;
// Mock response object
const mockRes = {
@@ -270,20 +271,22 @@ describe("RunnerJobsController", () => {
mockRunnerJobsService.streamEvents.mockResolvedValue(mockEvents);
await controller.streamEvents(jobId, workspaceId, mockRes as never);
await controller.streamEvents(jobId, workspaceId, lastEventId, mockRes as never);
// Verify headers are set
expect(mockRes.setHeader).toHaveBeenCalledWith("Content-Type", "text/event-stream");
expect(mockRes.setHeader).toHaveBeenCalledWith("Cache-Control", "no-cache");
expect(mockRes.setHeader).toHaveBeenCalledWith("Connection", "keep-alive");
expect(mockRes.setHeader).toHaveBeenCalledWith("X-Accel-Buffering", "no");
// Verify service was called
expect(service.streamEvents).toHaveBeenCalledWith(jobId, workspaceId, mockRes);
expect(service.streamEvents).toHaveBeenCalledWith(jobId, workspaceId, mockRes, lastEventId);
});
it("should handle errors during streaming", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const lastEventId = undefined;
const mockRes = {
setHeader: vi.fn(),
@@ -294,11 +297,33 @@ describe("RunnerJobsController", () => {
const error = new Error("Job not found");
mockRunnerJobsService.streamEvents.mockRejectedValue(error);
await controller.streamEvents(jobId, workspaceId, mockRes as never);
await controller.streamEvents(jobId, workspaceId, lastEventId, mockRes as never);
// Verify error is written to stream
expect(mockRes.write).toHaveBeenCalledWith(expect.stringContaining("Job not found"));
expect(mockRes.write).toHaveBeenCalledWith("event: error\n");
expect(mockRes.write).toHaveBeenCalledWith(
expect.stringContaining('"error":"Job not found"')
);
expect(mockRes.end).toHaveBeenCalled();
});
it("should support reconnection with Last-Event-ID header", async () => {
const jobId = "job-123";
const workspaceId = "workspace-123";
const lastEventId = "event-5";
const mockRes = {
setHeader: vi.fn(),
write: vi.fn(),
end: vi.fn(),
};
mockRunnerJobsService.streamEvents.mockResolvedValue([]);
await controller.streamEvents(jobId, workspaceId, lastEventId, mockRes as never);
// Verify service was called with lastEventId for reconnection
expect(service.streamEvents).toHaveBeenCalledWith(jobId, workspaceId, mockRes, lastEventId);
});
});
});