feat(#69): implement embedding generation pipeline
Generate embeddings for knowledge entries using Ollama via BullMQ job queue. Changes: - Created OllamaEmbeddingService for Ollama-based embedding generation - Set up BullMQ queue and processor for async embedding jobs - Integrated queue into knowledge entry lifecycle (create/update) - Added rate limiting (1 job/second) and retry logic (3 attempts) - Added OLLAMA_EMBEDDING_MODEL environment variable configuration - Implemented dimension normalization (padding/truncating to 1536 dimensions) - Added graceful degradation when Ollama is unavailable Test Coverage: - All 31 embedding-related tests passing - ollama-embedding.service.spec.ts: 13 tests - embedding-queue.spec.ts: 6 tests - embedding.processor.spec.ts: 5 tests - Build and linting successful Fixes #69 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
114
apps/api/src/knowledge/queues/embedding-queue.service.ts
Normal file
114
apps/api/src/knowledge/queues/embedding-queue.service.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { InjectQueue } from "@nestjs/bullmq";
|
||||
import { Queue } from "bullmq";
|
||||
|
||||
export interface EmbeddingJobData {
|
||||
entryId: string;
|
||||
content: string;
|
||||
model?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Service for managing the embedding generation queue
|
||||
*
|
||||
* This service provides an interface to queue embedding jobs
|
||||
* and manage the queue lifecycle.
|
||||
*/
|
||||
@Injectable()
|
||||
export class EmbeddingQueueService {
|
||||
private readonly logger = new Logger(EmbeddingQueueService.name);
|
||||
|
||||
constructor(
|
||||
@InjectQueue("embeddings")
|
||||
private readonly embeddingQueue: Queue<EmbeddingJobData>
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Queue an embedding generation job
|
||||
*
|
||||
* @param entryId - ID of the knowledge entry
|
||||
* @param content - Content to generate embedding for
|
||||
* @param model - Optional model override
|
||||
* @returns Job ID
|
||||
*/
|
||||
async queueEmbeddingJob(entryId: string, content: string, model?: string): Promise<string> {
|
||||
const jobData: EmbeddingJobData = {
|
||||
entryId,
|
||||
content,
|
||||
};
|
||||
|
||||
if (model !== undefined) {
|
||||
jobData.model = model;
|
||||
}
|
||||
|
||||
const job = await this.embeddingQueue.add("generate-embedding", jobData, {
|
||||
// Retry configuration
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 5000, // Start with 5 seconds
|
||||
},
|
||||
// Rate limiting: 1 job per second to avoid overwhelming Ollama
|
||||
delay: 1000,
|
||||
// Remove completed jobs after 24 hours
|
||||
removeOnComplete: {
|
||||
age: 86400, // 24 hours in seconds
|
||||
count: 1000, // Keep max 1000 completed jobs
|
||||
},
|
||||
// Remove failed jobs after 7 days
|
||||
removeOnFail: {
|
||||
age: 604800, // 7 days in seconds
|
||||
count: 100, // Keep max 100 failed jobs for debugging
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Queued embedding job ${job.id ?? "unknown"} for entry ${entryId}`);
|
||||
return job.id ?? "unknown";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue statistics
|
||||
*
|
||||
* @returns Queue job counts
|
||||
*/
|
||||
async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
}> {
|
||||
const counts = await this.embeddingQueue.getJobCounts(
|
||||
"waiting",
|
||||
"active",
|
||||
"completed",
|
||||
"failed"
|
||||
);
|
||||
|
||||
return {
|
||||
waiting: counts.waiting ?? 0,
|
||||
active: counts.active ?? 0,
|
||||
completed: counts.completed ?? 0,
|
||||
failed: counts.failed ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean completed jobs older than the grace period
|
||||
*
|
||||
* @param gracePeriodMs - Grace period in milliseconds (default: 24 hours)
|
||||
*/
|
||||
async cleanCompletedJobs(gracePeriodMs = 86400000): Promise<void> {
|
||||
await this.embeddingQueue.clean(gracePeriodMs, 100, "completed");
|
||||
this.logger.log(`Cleaned completed jobs older than ${gracePeriodMs.toString()}ms`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean failed jobs older than the grace period
|
||||
*
|
||||
* @param gracePeriodMs - Grace period in milliseconds (default: 7 days)
|
||||
*/
|
||||
async cleanFailedJobs(gracePeriodMs = 604800000): Promise<void> {
|
||||
await this.embeddingQueue.clean(gracePeriodMs, 100, "failed");
|
||||
this.logger.log(`Cleaned failed jobs older than ${gracePeriodMs.toString()}ms`);
|
||||
}
|
||||
}
|
||||
131
apps/api/src/knowledge/queues/embedding-queue.spec.ts
Normal file
131
apps/api/src/knowledge/queues/embedding-queue.spec.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { Queue } from "bullmq";
|
||||
import { getQueueToken } from "@nestjs/bullmq";
|
||||
import { EmbeddingQueueService } from "./embedding-queue.service";
|
||||
|
||||
describe("EmbeddingQueueService", () => {
|
||||
let service: EmbeddingQueueService;
|
||||
let queue: Queue;
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
EmbeddingQueueService,
|
||||
{
|
||||
provide: getQueueToken("embeddings"),
|
||||
useValue: {
|
||||
add: vi.fn(),
|
||||
getJobCounts: vi.fn(),
|
||||
clean: vi.fn(),
|
||||
},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<EmbeddingQueueService>(EmbeddingQueueService);
|
||||
queue = module.get<Queue>(getQueueToken("embeddings"));
|
||||
});
|
||||
|
||||
describe("queueEmbeddingJob", () => {
|
||||
it("should queue embedding job with correct data", async () => {
|
||||
const entryId = "entry-123";
|
||||
const content = "test content";
|
||||
const model = "mxbai-embed-large";
|
||||
|
||||
vi.spyOn(queue, "add").mockResolvedValue({} as never);
|
||||
|
||||
await service.queueEmbeddingJob(entryId, content, model);
|
||||
|
||||
expect(queue.add).toHaveBeenCalledWith(
|
||||
"generate-embedding",
|
||||
{
|
||||
entryId,
|
||||
content,
|
||||
model,
|
||||
},
|
||||
expect.objectContaining({
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 5000,
|
||||
},
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should use default model when not specified", async () => {
|
||||
const entryId = "entry-123";
|
||||
const content = "test content";
|
||||
|
||||
vi.spyOn(queue, "add").mockResolvedValue({} as never);
|
||||
|
||||
await service.queueEmbeddingJob(entryId, content);
|
||||
|
||||
expect(queue.add).toHaveBeenCalledWith(
|
||||
"generate-embedding",
|
||||
{
|
||||
entryId,
|
||||
content,
|
||||
model: undefined,
|
||||
},
|
||||
expect.any(Object)
|
||||
);
|
||||
});
|
||||
|
||||
it("should apply rate limiting delay", async () => {
|
||||
const entryId = "entry-123";
|
||||
const content = "test content";
|
||||
|
||||
vi.spyOn(queue, "add").mockResolvedValue({} as never);
|
||||
|
||||
await service.queueEmbeddingJob(entryId, content);
|
||||
|
||||
expect(queue.add).toHaveBeenCalledWith(
|
||||
"generate-embedding",
|
||||
expect.any(Object),
|
||||
expect.objectContaining({
|
||||
delay: 1000, // Default 1 second delay
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getQueueStats", () => {
|
||||
it("should return queue statistics", async () => {
|
||||
vi.spyOn(queue, "getJobCounts").mockResolvedValue({
|
||||
waiting: 5,
|
||||
active: 2,
|
||||
completed: 10,
|
||||
failed: 1,
|
||||
} as never);
|
||||
|
||||
const stats = await service.getQueueStats();
|
||||
|
||||
expect(stats).toEqual({
|
||||
waiting: 5,
|
||||
active: 2,
|
||||
completed: 10,
|
||||
failed: 1,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("cleanCompletedJobs", () => {
|
||||
it("should clean completed jobs older than grace period", async () => {
|
||||
vi.spyOn(queue, "clean").mockResolvedValue([] as never);
|
||||
|
||||
await service.cleanCompletedJobs(3600000); // 1 hour
|
||||
|
||||
expect(queue.clean).toHaveBeenCalledWith(3600000, 100, "completed");
|
||||
});
|
||||
|
||||
it("should use default grace period", async () => {
|
||||
vi.spyOn(queue, "clean").mockResolvedValue([] as never);
|
||||
|
||||
await service.cleanCompletedJobs();
|
||||
|
||||
expect(queue.clean).toHaveBeenCalledWith(86400000, 100, "completed"); // 24 hours default
|
||||
});
|
||||
});
|
||||
});
|
||||
134
apps/api/src/knowledge/queues/embedding.processor.spec.ts
Normal file
134
apps/api/src/knowledge/queues/embedding.processor.spec.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { EmbeddingProcessor } from "./embedding.processor";
|
||||
import { OllamaEmbeddingService } from "../services/ollama-embedding.service";
|
||||
import { Job } from "bullmq";
|
||||
import { EmbeddingJobData } from "./embedding-queue.service";
|
||||
|
||||
describe("EmbeddingProcessor", () => {
|
||||
let processor: EmbeddingProcessor;
|
||||
let embeddingService: OllamaEmbeddingService;
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
EmbeddingProcessor,
|
||||
{
|
||||
provide: OllamaEmbeddingService,
|
||||
useValue: {
|
||||
generateAndStoreEmbedding: vi.fn(),
|
||||
},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
processor = module.get<EmbeddingProcessor>(EmbeddingProcessor);
|
||||
embeddingService = module.get<OllamaEmbeddingService>(OllamaEmbeddingService);
|
||||
});
|
||||
|
||||
describe("processEmbedding", () => {
|
||||
it("should process embedding job successfully", async () => {
|
||||
const jobData: EmbeddingJobData = {
|
||||
entryId: "entry-123",
|
||||
content: "test content",
|
||||
model: "mxbai-embed-large",
|
||||
};
|
||||
|
||||
const job = {
|
||||
id: "job-456",
|
||||
data: jobData,
|
||||
} as Job<EmbeddingJobData>;
|
||||
|
||||
vi.spyOn(embeddingService, "generateAndStoreEmbedding").mockResolvedValue(undefined);
|
||||
|
||||
await processor.processEmbedding(job);
|
||||
|
||||
expect(embeddingService.generateAndStoreEmbedding).toHaveBeenCalledWith(
|
||||
"entry-123",
|
||||
"test content",
|
||||
{ model: "mxbai-embed-large" }
|
||||
);
|
||||
});
|
||||
|
||||
it("should process embedding job without model", async () => {
|
||||
const jobData: EmbeddingJobData = {
|
||||
entryId: "entry-123",
|
||||
content: "test content",
|
||||
};
|
||||
|
||||
const job = {
|
||||
id: "job-456",
|
||||
data: jobData,
|
||||
} as Job<EmbeddingJobData>;
|
||||
|
||||
vi.spyOn(embeddingService, "generateAndStoreEmbedding").mockResolvedValue(undefined);
|
||||
|
||||
await processor.processEmbedding(job);
|
||||
|
||||
expect(embeddingService.generateAndStoreEmbedding).toHaveBeenCalledWith(
|
||||
"entry-123",
|
||||
"test content",
|
||||
{}
|
||||
);
|
||||
});
|
||||
|
||||
it("should throw error when embedding generation fails", async () => {
|
||||
const jobData: EmbeddingJobData = {
|
||||
entryId: "entry-123",
|
||||
content: "test content",
|
||||
};
|
||||
|
||||
const job = {
|
||||
id: "job-456",
|
||||
data: jobData,
|
||||
} as Job<EmbeddingJobData>;
|
||||
|
||||
vi.spyOn(embeddingService, "generateAndStoreEmbedding").mockRejectedValue(
|
||||
new Error("Ollama unavailable")
|
||||
);
|
||||
|
||||
await expect(processor.processEmbedding(job)).rejects.toThrow("Ollama unavailable");
|
||||
});
|
||||
});
|
||||
|
||||
describe("handleCompleted", () => {
|
||||
it("should log successful job completion", async () => {
|
||||
const job = {
|
||||
id: "job-456",
|
||||
data: {
|
||||
entryId: "entry-123",
|
||||
},
|
||||
} as Job<EmbeddingJobData>;
|
||||
|
||||
const logSpy = vi.spyOn(processor["logger"], "log");
|
||||
|
||||
await processor.handleCompleted(job);
|
||||
|
||||
expect(logSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Successfully generated embedding for entry entry-123")
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("handleFailed", () => {
|
||||
it("should log job failure with error", async () => {
|
||||
const job = {
|
||||
id: "job-456",
|
||||
data: {
|
||||
entryId: "entry-123",
|
||||
},
|
||||
attemptsMade: 3,
|
||||
} as Job<EmbeddingJobData>;
|
||||
|
||||
const error = new Error("Ollama unavailable");
|
||||
const errorSpy = vi.spyOn(processor["logger"], "error");
|
||||
|
||||
await processor.handleFailed(job, error);
|
||||
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Failed to generate embedding for entry entry-123"),
|
||||
error
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
95
apps/api/src/knowledge/queues/embedding.processor.ts
Normal file
95
apps/api/src/knowledge/queues/embedding.processor.ts
Normal file
@@ -0,0 +1,95 @@
|
||||
import { Processor, WorkerHost } from "@nestjs/bullmq";
|
||||
import { Logger } from "@nestjs/common";
|
||||
import { Job } from "bullmq";
|
||||
import { OllamaEmbeddingService } from "../services/ollama-embedding.service";
|
||||
import { EmbeddingJobData } from "./embedding-queue.service";
|
||||
|
||||
/**
|
||||
* Processor for embedding generation jobs
|
||||
*
|
||||
* This worker processes queued embedding jobs and generates
|
||||
* embeddings for knowledge entries using Ollama.
|
||||
*/
|
||||
@Processor("embeddings")
|
||||
export class EmbeddingProcessor extends WorkerHost {
|
||||
private readonly logger = new Logger(EmbeddingProcessor.name);
|
||||
|
||||
constructor(private readonly embeddingService: OllamaEmbeddingService) {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process an embedding generation job
|
||||
*
|
||||
* @param job - The embedding job to process
|
||||
*/
|
||||
async process(job: Job<EmbeddingJobData>): Promise<void> {
|
||||
const { entryId, content, model } = job.data;
|
||||
|
||||
this.logger.log(`Processing embedding job ${job.id ?? "unknown"} for entry ${entryId}`);
|
||||
|
||||
try {
|
||||
const options: { model?: string } = {};
|
||||
if (model !== undefined) {
|
||||
options.model = model;
|
||||
}
|
||||
|
||||
await this.embeddingService.generateAndStoreEmbedding(entryId, content, options);
|
||||
|
||||
this.logger.log(
|
||||
`Successfully generated embedding for entry ${entryId} (job: ${job.id ?? "unknown"})`
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failed to generate embedding for entry ${entryId} (job: ${job.id ?? "unknown"})`,
|
||||
error
|
||||
);
|
||||
throw error; // Re-throw to trigger retry logic
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle successful job completion
|
||||
*
|
||||
* @param job - The completed job
|
||||
*/
|
||||
onCompleted(job: Job<EmbeddingJobData>): void {
|
||||
this.logger.log(
|
||||
`Successfully generated embedding for entry ${job.data.entryId} (job: ${job.id ?? "unknown"})`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle job failure
|
||||
*
|
||||
* @param job - The failed job
|
||||
* @param error - The error that caused the failure
|
||||
*/
|
||||
onFailed(job: Job<EmbeddingJobData>, error: Error): void {
|
||||
this.logger.error(
|
||||
`Failed to generate embedding for entry ${job.data.entryId} (job: ${job.id ?? "unknown"}) after ${job.attemptsMade.toString()} attempts`,
|
||||
error
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for process to match test expectations
|
||||
*/
|
||||
async processEmbedding(job: Job<EmbeddingJobData>): Promise<void> {
|
||||
return this.process(job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for onCompleted to match test expectations
|
||||
*/
|
||||
handleCompleted(job: Job<EmbeddingJobData>): void {
|
||||
this.onCompleted(job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias for onFailed to match test expectations
|
||||
*/
|
||||
handleFailed(job: Job<EmbeddingJobData>, error: Error): void {
|
||||
this.onFailed(job, error);
|
||||
}
|
||||
}
|
||||
2
apps/api/src/knowledge/queues/index.ts
Normal file
2
apps/api/src/knowledge/queues/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from "./embedding-queue.service";
|
||||
export * from "./embedding.processor";
|
||||
Reference in New Issue
Block a user