Files
stack/apps/api/src/knowledge/knowledge.service.ts
Jason Woltje 3ec2059470
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/pr/woodpecker Pipeline failed
feat: add semantic search with pgvector (closes #68, #69, #70)
Issues resolved:
- #68: pgvector Setup
  * Added pgvector vector index migration for knowledge_embeddings
  * Vector index uses HNSW algorithm with cosine distance
  * Optimized for 1536-dimension OpenAI embeddings

- #69: Embedding Generation Pipeline
  * Created EmbeddingService with OpenAI integration
  * Automatic embedding generation on entry create/update
  * Batch processing endpoint for existing entries
  * Async generation to avoid blocking API responses
  * Content preparation with title weighting

- #70: Semantic Search API
  * POST /api/knowledge/search/semantic - pure vector search
  * POST /api/knowledge/search/hybrid - RRF combined search
  * POST /api/knowledge/embeddings/batch - batch generation
  * Comprehensive test coverage
  * Full documentation in docs/SEMANTIC_SEARCH.md

Technical details:
- Uses OpenAI text-embedding-3-small model (1536 dims)
- HNSW index for O(log n) similarity search
- Reciprocal Rank Fusion for hybrid search
- Graceful degradation when OpenAI not configured
- Async embedding generation for performance

Configuration:
- Added OPENAI_API_KEY to .env.example
- Optional feature - disabled if API key not set
- Falls back to keyword search in hybrid mode
2026-01-30 15:19:13 -06:00

945 lines
23 KiB
TypeScript

import {
Injectable,
NotFoundException,
ConflictException,
} from "@nestjs/common";
import { EntryStatus, Prisma } from "@prisma/client";
import slugify from "slugify";
import { PrismaService } from "../prisma/prisma.service";
import type { CreateEntryDto, UpdateEntryDto, EntryQueryDto } from "./dto";
import type {
KnowledgeEntryWithTags,
PaginatedEntries,
} from "./entities/knowledge-entry.entity";
import type {
KnowledgeEntryVersionWithAuthor,
PaginatedVersions,
} from "./entities/knowledge-entry-version.entity";
import { renderMarkdown } from "./utils/markdown";
import { LinkSyncService } from "./services/link-sync.service";
import { KnowledgeCacheService } from "./services/cache.service";
import { EmbeddingService } from "./services/embedding.service";
/**
* Service for managing knowledge entries
*/
@Injectable()
export class KnowledgeService {
constructor(
private readonly prisma: PrismaService,
private readonly linkSync: LinkSyncService,
private readonly cache: KnowledgeCacheService,
private readonly embedding: EmbeddingService
) {}
/**
* Get all entries for a workspace (paginated and filterable)
*/
async findAll(
workspaceId: string,
query: EntryQueryDto
): Promise<PaginatedEntries> {
const page = query.page || 1;
const limit = query.limit || 20;
const skip = (page - 1) * limit;
// Build where clause
const where: Prisma.KnowledgeEntryWhereInput = {
workspaceId,
};
if (query.status) {
where.status = query.status;
}
if (query.tag) {
where.tags = {
some: {
tag: {
slug: query.tag,
},
},
};
}
// Get total count
const total = await this.prisma.knowledgeEntry.count({ where });
// Get entries
const entries = await this.prisma.knowledgeEntry.findMany({
where,
include: {
tags: {
include: {
tag: true,
},
},
},
orderBy: {
updatedAt: "desc",
},
skip,
take: limit,
});
// Transform to response format
const data: KnowledgeEntryWithTags[] = entries.map((entry) => ({
id: entry.id,
workspaceId: entry.workspaceId,
slug: entry.slug,
title: entry.title,
content: entry.content,
contentHtml: entry.contentHtml,
summary: entry.summary,
status: entry.status,
visibility: entry.visibility,
createdAt: entry.createdAt,
updatedAt: entry.updatedAt,
createdBy: entry.createdBy,
updatedBy: entry.updatedBy,
tags: entry.tags.map((et) => ({
id: et.tag.id,
name: et.tag.name,
slug: et.tag.slug,
color: et.tag.color,
})),
}));
return {
data,
pagination: {
page,
limit,
total,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* Get a single entry by slug
*/
async findOne(
workspaceId: string,
slug: string
): Promise<KnowledgeEntryWithTags> {
// Check cache first
const cached = await this.cache.getEntry(workspaceId, slug);
if (cached) {
return cached;
}
// Cache miss - fetch from database
const entry = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
include: {
tags: {
include: {
tag: true,
},
},
},
});
if (!entry) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
const result: KnowledgeEntryWithTags = {
id: entry.id,
workspaceId: entry.workspaceId,
slug: entry.slug,
title: entry.title,
content: entry.content,
contentHtml: entry.contentHtml,
summary: entry.summary,
status: entry.status,
visibility: entry.visibility,
createdAt: entry.createdAt,
updatedAt: entry.updatedAt,
createdBy: entry.createdBy,
updatedBy: entry.updatedBy,
tags: entry.tags.map((et) => ({
id: et.tag.id,
name: et.tag.name,
slug: et.tag.slug,
color: et.tag.color,
})),
};
// Populate cache
await this.cache.setEntry(workspaceId, slug, result);
return result;
}
/**
* Create a new entry
*/
async create(
workspaceId: string,
userId: string,
createDto: CreateEntryDto
): Promise<KnowledgeEntryWithTags> {
// Generate slug from title
const baseSlug = this.generateSlug(createDto.title);
const slug = await this.ensureUniqueSlug(workspaceId, baseSlug);
// Render markdown to HTML with sanitization
const contentHtml = await renderMarkdown(createDto.content);
// Use transaction to ensure atomicity
const result = await this.prisma.$transaction(async (tx) => {
// Create entry
const entry = await tx.knowledgeEntry.create({
data: {
workspaceId,
slug,
title: createDto.title,
content: createDto.content,
contentHtml,
summary: createDto.summary ?? null,
status: createDto.status || EntryStatus.DRAFT,
visibility: createDto.visibility || "PRIVATE",
createdBy: userId,
updatedBy: userId,
},
});
// Create initial version
await tx.knowledgeEntryVersion.create({
data: {
entryId: entry.id,
version: 1,
title: entry.title,
content: entry.content,
summary: entry.summary,
createdBy: userId,
changeNote: createDto.changeNote || "Initial version",
},
});
// Handle tags if provided
if (createDto.tags && createDto.tags.length > 0) {
await this.syncTags(tx, workspaceId, entry.id, createDto.tags);
}
// Fetch with tags
return tx.knowledgeEntry.findUnique({
where: { id: entry.id },
include: {
tags: {
include: {
tag: true,
},
},
},
});
});
if (!result) {
throw new Error("Failed to create entry");
}
// Sync wiki links after entry creation
await this.linkSync.syncLinks(workspaceId, result.id, createDto.content);
// Generate and store embedding asynchronously (don't block the response)
this.generateEntryEmbedding(result.id, result.title, result.content).catch(
(error) => {
console.error(`Failed to generate embedding for entry ${result.id}:`, error);
}
);
// Invalidate search and graph caches (new entry affects search results)
await this.cache.invalidateSearches(workspaceId);
await this.cache.invalidateGraphs(workspaceId);
return {
id: result.id,
workspaceId: result.workspaceId,
slug: result.slug,
title: result.title,
content: result.content,
contentHtml: result.contentHtml,
summary: result.summary,
status: result.status,
visibility: result.visibility,
createdAt: result.createdAt,
updatedAt: result.updatedAt,
createdBy: result.createdBy,
updatedBy: result.updatedBy,
tags: result.tags.map((et) => ({
id: et.tag.id,
name: et.tag.name,
slug: et.tag.slug,
color: et.tag.color,
})),
};
}
/**
* Update an entry
*/
async update(
workspaceId: string,
slug: string,
userId: string,
updateDto: UpdateEntryDto
): Promise<KnowledgeEntryWithTags> {
// Find existing entry
const existing = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
include: {
versions: {
orderBy: {
version: "desc",
},
take: 1,
},
},
});
if (!existing) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
// If title is being updated, generate new slug if needed
let newSlug = slug;
if (updateDto.title && updateDto.title !== existing.title) {
const baseSlug = this.generateSlug(updateDto.title);
if (baseSlug !== slug) {
newSlug = await this.ensureUniqueSlug(workspaceId, baseSlug, slug);
}
}
// Render markdown if content is updated
let contentHtml = existing.contentHtml;
if (updateDto.content) {
contentHtml = await renderMarkdown(updateDto.content);
}
// Build update data object conditionally
const updateData: Prisma.KnowledgeEntryUpdateInput = {
updatedBy: userId,
};
if (newSlug !== slug) {
updateData.slug = newSlug;
}
if (updateDto.title !== undefined) {
updateData.title = updateDto.title;
}
if (updateDto.content !== undefined) {
updateData.content = updateDto.content;
updateData.contentHtml = contentHtml;
}
if (updateDto.summary !== undefined) {
updateData.summary = updateDto.summary ?? null;
}
if (updateDto.status !== undefined) {
updateData.status = updateDto.status;
}
if (updateDto.visibility !== undefined) {
updateData.visibility = updateDto.visibility;
}
// Use transaction to ensure atomicity
const result = await this.prisma.$transaction(async (tx) => {
// Update entry
const entry = await tx.knowledgeEntry.update({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
data: updateData,
});
// Create new version if content or title changed
if (updateDto.title || updateDto.content) {
const latestVersion = existing.versions[0];
const nextVersion = latestVersion ? latestVersion.version + 1 : 1;
await tx.knowledgeEntryVersion.create({
data: {
entryId: entry.id,
version: nextVersion,
title: entry.title,
content: entry.content,
summary: entry.summary,
createdBy: userId,
changeNote: updateDto.changeNote || `Update version ${nextVersion}`,
},
});
}
// Handle tags if provided
if (updateDto.tags !== undefined) {
await this.syncTags(tx, workspaceId, entry.id, updateDto.tags);
}
// Fetch with tags
return tx.knowledgeEntry.findUnique({
where: { id: entry.id },
include: {
tags: {
include: {
tag: true,
},
},
},
});
});
if (!result) {
throw new Error("Failed to update entry");
}
// Sync wiki links after entry update (only if content changed)
if (updateDto.content !== undefined) {
await this.linkSync.syncLinks(workspaceId, result.id, result.content);
}
// Regenerate embedding if content or title changed (async, don't block response)
if (updateDto.content !== undefined || updateDto.title !== undefined) {
this.generateEntryEmbedding(result.id, result.title, result.content).catch(
(error) => {
console.error(`Failed to generate embedding for entry ${result.id}:`, error);
}
);
}
// Invalidate caches
// Invalidate old slug cache if slug changed
if (newSlug !== slug) {
await this.cache.invalidateEntry(workspaceId, slug);
}
// Invalidate new slug cache
await this.cache.invalidateEntry(workspaceId, result.slug);
// Invalidate search caches (content/title/tags may have changed)
await this.cache.invalidateSearches(workspaceId);
// Invalidate graph caches if links changed
if (updateDto.content !== undefined) {
await this.cache.invalidateGraphsForEntry(workspaceId, result.id);
}
return {
id: result.id,
workspaceId: result.workspaceId,
slug: result.slug,
title: result.title,
content: result.content,
contentHtml: result.contentHtml,
summary: result.summary,
status: result.status,
visibility: result.visibility,
createdAt: result.createdAt,
updatedAt: result.updatedAt,
createdBy: result.createdBy,
updatedBy: result.updatedBy,
tags: result.tags.map((et) => ({
id: et.tag.id,
name: et.tag.name,
slug: et.tag.slug,
color: et.tag.color,
})),
};
}
/**
* Delete an entry (soft delete by setting status to ARCHIVED)
*/
async remove(workspaceId: string, slug: string, userId: string): Promise<void> {
const entry = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
});
if (!entry) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
await this.prisma.knowledgeEntry.update({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
data: {
status: EntryStatus.ARCHIVED,
updatedBy: userId,
},
});
// Invalidate caches
await this.cache.invalidateEntry(workspaceId, slug);
await this.cache.invalidateSearches(workspaceId);
await this.cache.invalidateGraphsForEntry(workspaceId, entry.id);
}
/**
* Generate a URL-friendly slug from a title
*/
private generateSlug(title: string): string {
return slugify(title, {
lower: true,
strict: true,
trim: true,
});
}
/**
* Ensure slug is unique by appending a number if needed
*/
private async ensureUniqueSlug(
workspaceId: string,
baseSlug: string,
currentSlug?: string
): Promise<string> {
let slug = baseSlug;
let counter = 1;
while (true) {
// Check if slug exists (excluding current entry if updating)
const existing = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
});
// Slug is available
if (!existing) {
return slug;
}
// If this is the current entry being updated, keep the slug
if (currentSlug && existing.slug === currentSlug) {
return slug;
}
// Try next variation
slug = `${baseSlug}-${counter}`;
counter++;
// Safety limit to prevent infinite loops
if (counter > 1000) {
throw new ConflictException(
"Unable to generate unique slug after 1000 attempts"
);
}
}
}
/**
* Get all versions for an entry (paginated)
*/
async findVersions(
workspaceId: string,
slug: string,
page: number = 1,
limit: number = 20
): Promise<PaginatedVersions> {
// Find the entry to get its ID
const entry = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
});
if (!entry) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
const skip = (page - 1) * limit;
// Get total count
const total = await this.prisma.knowledgeEntryVersion.count({
where: { entryId: entry.id },
});
// Get versions with author information
const versions = await this.prisma.knowledgeEntryVersion.findMany({
where: { entryId: entry.id },
include: {
author: {
select: {
id: true,
name: true,
email: true,
},
},
},
orderBy: {
version: "desc",
},
skip,
take: limit,
});
// Transform to response format
const data: KnowledgeEntryVersionWithAuthor[] = versions.map((v) => ({
id: v.id,
entryId: v.entryId,
version: v.version,
title: v.title,
content: v.content,
summary: v.summary,
createdAt: v.createdAt,
createdBy: v.createdBy,
changeNote: v.changeNote,
author: v.author,
}));
return {
data,
pagination: {
page,
limit,
total,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* Get a specific version of an entry
*/
async findVersion(
workspaceId: string,
slug: string,
version: number
): Promise<KnowledgeEntryVersionWithAuthor> {
// Find the entry to get its ID
const entry = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
});
if (!entry) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
// Get the specific version
const versionData = await this.prisma.knowledgeEntryVersion.findUnique({
where: {
entryId_version: {
entryId: entry.id,
version,
},
},
include: {
author: {
select: {
id: true,
name: true,
email: true,
},
},
},
});
if (!versionData) {
throw new NotFoundException(
`Version ${version} not found for entry "${slug}"`
);
}
return {
id: versionData.id,
entryId: versionData.entryId,
version: versionData.version,
title: versionData.title,
content: versionData.content,
summary: versionData.summary,
createdAt: versionData.createdAt,
createdBy: versionData.createdBy,
changeNote: versionData.changeNote,
author: versionData.author,
};
}
/**
* Restore a previous version of an entry
*/
async restoreVersion(
workspaceId: string,
slug: string,
version: number,
userId: string,
changeNote?: string
): Promise<KnowledgeEntryWithTags> {
// Get the version to restore
const versionToRestore = await this.findVersion(workspaceId, slug, version);
// Find the current entry
const entry = await this.prisma.knowledgeEntry.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
include: {
versions: {
orderBy: {
version: "desc",
},
take: 1,
},
},
});
if (!entry) {
throw new NotFoundException(
`Knowledge entry with slug "${slug}" not found`
);
}
// Render markdown for the restored content
const contentHtml = await renderMarkdown(versionToRestore.content);
// Use transaction to ensure atomicity
const result = await this.prisma.$transaction(async (tx) => {
// Update entry with restored content
const updated = await tx.knowledgeEntry.update({
where: {
workspaceId_slug: {
workspaceId,
slug,
},
},
data: {
title: versionToRestore.title,
content: versionToRestore.content,
contentHtml,
summary: versionToRestore.summary,
updatedBy: userId,
},
});
// Create new version for the restore operation
const latestVersion = entry.versions[0];
const nextVersion = latestVersion ? latestVersion.version + 1 : 1;
await tx.knowledgeEntryVersion.create({
data: {
entryId: updated.id,
version: nextVersion,
title: updated.title,
content: updated.content,
summary: updated.summary,
createdBy: userId,
changeNote:
changeNote || `Restored from version ${version}`,
},
});
// Fetch with tags
return tx.knowledgeEntry.findUnique({
where: { id: updated.id },
include: {
tags: {
include: {
tag: true,
},
},
},
});
});
if (!result) {
throw new Error("Failed to restore version");
}
// Sync wiki links after restore
await this.linkSync.syncLinks(workspaceId, result.id, result.content);
// Invalidate caches (content changed, links may have changed)
await this.cache.invalidateEntry(workspaceId, slug);
await this.cache.invalidateSearches(workspaceId);
await this.cache.invalidateGraphsForEntry(workspaceId, result.id);
return {
id: result.id,
workspaceId: result.workspaceId,
slug: result.slug,
title: result.title,
content: result.content,
contentHtml: result.contentHtml,
summary: result.summary,
status: result.status,
visibility: result.visibility,
createdAt: result.createdAt,
updatedAt: result.updatedAt,
createdBy: result.createdBy,
updatedBy: result.updatedBy,
tags: result.tags.map((et) => ({
id: et.tag.id,
name: et.tag.name,
slug: et.tag.slug,
color: et.tag.color,
})),
};
}
/**
* Sync tags for an entry (create missing tags, update associations)
*/
private async syncTags(
tx: Prisma.TransactionClient,
workspaceId: string,
entryId: string,
tagNames: string[]
): Promise<void> {
// Remove all existing tag associations
await tx.knowledgeEntryTag.deleteMany({
where: { entryId },
});
// If no tags provided, we're done
if (tagNames.length === 0) {
return;
}
// Get or create tags
const tags = await Promise.all(
tagNames.map(async (name) => {
const tagSlug = this.generateSlug(name);
// Try to find existing tag
let tag = await tx.knowledgeTag.findUnique({
where: {
workspaceId_slug: {
workspaceId,
slug: tagSlug,
},
},
});
// Create if doesn't exist
if (!tag) {
tag = await tx.knowledgeTag.create({
data: {
workspaceId,
name,
slug: tagSlug,
},
});
}
return tag;
})
);
// Create tag associations
await Promise.all(
tags.map((tag) =>
tx.knowledgeEntryTag.create({
data: {
entryId,
tagId: tag.id,
},
})
)
);
}
/**
* Generate and store embedding for a knowledge entry
* Private helper method called asynchronously after entry create/update
*/
private async generateEntryEmbedding(
entryId: string,
title: string,
content: string
): Promise<void> {
const combinedContent = this.embedding.prepareContentForEmbedding(
title,
content
);
await this.embedding.generateAndStoreEmbedding(entryId, combinedContent);
}
/**
* Batch generate embeddings for all entries in a workspace
* Useful for populating embeddings for existing entries
*
* @param workspaceId - The workspace ID
* @param status - Optional status filter (default: not ARCHIVED)
* @returns Number of embeddings successfully generated
*/
async batchGenerateEmbeddings(
workspaceId: string,
status?: EntryStatus
): Promise<{ total: number; success: number }> {
const where: Prisma.KnowledgeEntryWhereInput = {
workspaceId,
status: status || { not: EntryStatus.ARCHIVED },
};
const entries = await this.prisma.knowledgeEntry.findMany({
where,
select: {
id: true,
title: true,
content: true,
},
});
const entriesForEmbedding = entries.map((entry) => ({
id: entry.id,
content: this.embedding.prepareContentForEmbedding(
entry.title,
entry.content
),
}));
const successCount = await this.embedding.batchGenerateEmbeddings(
entriesForEmbedding
);
return {
total: entries.length,
success: successCount,
};
}
}