feat(api): add conversation archive with vector search (MS22-DB-004, MS22-API-004) (#587)
Some checks failed
ci/woodpecker/push/api Pipeline failed
Some checks failed
ci/woodpecker/push/api Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #587.
This commit is contained in:
@@ -0,0 +1,277 @@
|
||||
import { Injectable, Logger, NotFoundException, ConflictException } from "@nestjs/common";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { EMBEDDING_DIMENSION } from "@mosaic/shared";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { EmbeddingService } from "../knowledge/services/embedding.service";
|
||||
import type { IngestConversationDto, SearchConversationDto, ListConversationsDto } from "./dto";
|
||||
|
||||
/**
|
||||
* Shape of a raw conversation archive row from $queryRaw vector search
|
||||
*/
|
||||
interface RawConversationResult {
|
||||
id: string;
|
||||
workspace_id: string;
|
||||
session_id: string;
|
||||
agent_id: string;
|
||||
messages: unknown;
|
||||
message_count: number;
|
||||
summary: string;
|
||||
started_at: Date;
|
||||
ended_at: Date | null;
|
||||
metadata: unknown;
|
||||
created_at: Date;
|
||||
updated_at: Date;
|
||||
similarity: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Paginated response wrapper
|
||||
*/
|
||||
export interface PaginatedConversations<T> {
|
||||
data: T[];
|
||||
pagination: {
|
||||
page: number;
|
||||
limit: number;
|
||||
total: number;
|
||||
totalPages: number;
|
||||
};
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class ConversationArchiveService {
|
||||
private readonly logger = new Logger(ConversationArchiveService.name);
|
||||
private readonly defaultSimilarityThreshold = 0.5;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly embedding: EmbeddingService
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Ingest a conversation session log.
|
||||
* Generates a vector embedding from the summary + message content and stores it alongside the record.
|
||||
*/
|
||||
async ingest(workspaceId: string, dto: IngestConversationDto): Promise<{ id: string }> {
|
||||
// Verify no duplicate session in this workspace
|
||||
const existing = await this.prisma.conversationArchive.findUnique({
|
||||
where: { workspaceId_sessionId: { workspaceId, sessionId: dto.sessionId } },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (existing) {
|
||||
throw new ConflictException(
|
||||
`Conversation session '${dto.sessionId}' already exists in this workspace`
|
||||
);
|
||||
}
|
||||
|
||||
const messageCount = dto.messages.length;
|
||||
|
||||
// Create record first to get ID for embedding
|
||||
const record = await this.prisma.conversationArchive.create({
|
||||
data: {
|
||||
workspaceId,
|
||||
sessionId: dto.sessionId,
|
||||
agentId: dto.agentId,
|
||||
messages: dto.messages as unknown as Prisma.InputJsonValue,
|
||||
messageCount,
|
||||
summary: dto.summary,
|
||||
startedAt: new Date(dto.startedAt),
|
||||
endedAt: dto.endedAt ? new Date(dto.endedAt) : null,
|
||||
metadata: (dto.metadata ?? {}) as Prisma.InputJsonValue,
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
// Generate and store embedding asynchronously (non-blocking for ingest)
|
||||
if (this.embedding.isConfigured()) {
|
||||
const textForEmbedding = this.buildEmbeddingText(dto.summary, dto.messages);
|
||||
this.storeEmbedding(record.id, textForEmbedding).catch((err: unknown) => {
|
||||
this.logger.error(`Failed to store embedding for conversation ${record.id}`, err);
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.log(`Ingested conversation ${record.id} (session: ${dto.sessionId})`);
|
||||
return { id: record.id };
|
||||
}
|
||||
|
||||
/**
|
||||
* Semantic vector search across conversation archives in a workspace.
|
||||
*/
|
||||
async search(
|
||||
workspaceId: string,
|
||||
dto: SearchConversationDto
|
||||
): Promise<PaginatedConversations<RawConversationResult>> {
|
||||
if (!this.embedding.isConfigured()) {
|
||||
throw new ConflictException("Semantic search requires OpenAI API key to be configured");
|
||||
}
|
||||
|
||||
const limit = dto.limit ?? 20;
|
||||
const threshold = dto.similarityThreshold ?? this.defaultSimilarityThreshold;
|
||||
const distanceThreshold = 1 - threshold;
|
||||
|
||||
const queryEmbedding = await this.embedding.generateEmbedding(dto.query);
|
||||
const embeddingStr = `[${queryEmbedding.join(",")}]`;
|
||||
|
||||
const agentFilter = dto.agentId ? Prisma.sql`AND ca.agent_id = ${dto.agentId}` : Prisma.sql``;
|
||||
|
||||
const rows = await this.prisma.$queryRaw<RawConversationResult[]>`
|
||||
SELECT
|
||||
ca.id,
|
||||
ca.workspace_id,
|
||||
ca.session_id,
|
||||
ca.agent_id,
|
||||
ca.messages,
|
||||
ca.message_count,
|
||||
ca.summary,
|
||||
ca.started_at,
|
||||
ca.ended_at,
|
||||
ca.metadata,
|
||||
ca.created_at,
|
||||
ca.updated_at,
|
||||
(1 - (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION}))) AS similarity
|
||||
FROM conversation_archives ca
|
||||
WHERE ca.workspace_id = ${workspaceId}::uuid
|
||||
AND ca.embedding IS NOT NULL
|
||||
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
|
||||
${agentFilter}
|
||||
ORDER BY ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})
|
||||
LIMIT ${limit}
|
||||
`;
|
||||
|
||||
const countResult = await this.prisma.$queryRaw<[{ count: bigint }]>`
|
||||
SELECT COUNT(*) AS count
|
||||
FROM conversation_archives ca
|
||||
WHERE ca.workspace_id = ${workspaceId}::uuid
|
||||
AND ca.embedding IS NOT NULL
|
||||
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
|
||||
${agentFilter}
|
||||
`;
|
||||
|
||||
const total = Number(countResult[0].count);
|
||||
|
||||
return {
|
||||
data: rows,
|
||||
pagination: {
|
||||
page: 1,
|
||||
limit,
|
||||
total,
|
||||
totalPages: Math.ceil(total / limit),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* List conversation archives with filtering and pagination.
|
||||
*/
|
||||
async findAll(
|
||||
workspaceId: string,
|
||||
query: ListConversationsDto
|
||||
): Promise<PaginatedConversations<object>> {
|
||||
const page = query.page ?? 1;
|
||||
const limit = query.limit ?? 20;
|
||||
const skip = (page - 1) * limit;
|
||||
|
||||
const where: Prisma.ConversationArchiveWhereInput = {
|
||||
workspaceId,
|
||||
...(query.agentId ? { agentId: query.agentId } : {}),
|
||||
...(query.startedAfter || query.startedBefore
|
||||
? {
|
||||
startedAt: {
|
||||
...(query.startedAfter ? { gte: new Date(query.startedAfter) } : {}),
|
||||
...(query.startedBefore ? { lte: new Date(query.startedBefore) } : {}),
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
const [total, records] = await Promise.all([
|
||||
this.prisma.conversationArchive.count({ where }),
|
||||
this.prisma.conversationArchive.findMany({
|
||||
where,
|
||||
select: {
|
||||
id: true,
|
||||
workspaceId: true,
|
||||
sessionId: true,
|
||||
agentId: true,
|
||||
messageCount: true,
|
||||
summary: true,
|
||||
startedAt: true,
|
||||
endedAt: true,
|
||||
metadata: true,
|
||||
createdAt: true,
|
||||
updatedAt: true,
|
||||
},
|
||||
orderBy: { startedAt: "desc" },
|
||||
skip,
|
||||
take: limit,
|
||||
}),
|
||||
]);
|
||||
|
||||
return {
|
||||
data: records,
|
||||
pagination: {
|
||||
page,
|
||||
limit,
|
||||
total,
|
||||
totalPages: Math.ceil(total / limit),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single conversation archive by ID.
|
||||
*/
|
||||
async findOne(workspaceId: string, id: string): Promise<object> {
|
||||
const record = await this.prisma.conversationArchive.findFirst({
|
||||
where: { id, workspaceId },
|
||||
select: {
|
||||
id: true,
|
||||
workspaceId: true,
|
||||
sessionId: true,
|
||||
agentId: true,
|
||||
messages: true,
|
||||
messageCount: true,
|
||||
summary: true,
|
||||
startedAt: true,
|
||||
endedAt: true,
|
||||
metadata: true,
|
||||
createdAt: true,
|
||||
updatedAt: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!record) {
|
||||
throw new NotFoundException(`Conversation archive '${id}' not found`);
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build text content for embedding from summary and messages.
|
||||
*/
|
||||
private buildEmbeddingText(
|
||||
summary: string,
|
||||
messages: { role: string; content: string }[]
|
||||
): string {
|
||||
const messageText = messages.map((m) => `${m.role}: ${m.content}`).join("\n");
|
||||
return `${summary}\n\n${messageText}`.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate embedding and store it on the conversation_archives row.
|
||||
*/
|
||||
private async storeEmbedding(id: string, text: string): Promise<void> {
|
||||
const vector = await this.embedding.generateEmbedding(text);
|
||||
const embeddingStr = `[${vector.join(",")}]`;
|
||||
|
||||
await this.prisma.$executeRaw`
|
||||
UPDATE conversation_archives
|
||||
SET embedding = ${embeddingStr}::vector(${EMBEDDING_DIMENSION}),
|
||||
updated_at = NOW()
|
||||
WHERE id = ${id}::uuid
|
||||
`;
|
||||
|
||||
this.logger.log(`Stored embedding for conversation ${id}`);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user