Some checks failed
ci/woodpecker/push/api Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
278 lines
8.1 KiB
TypeScript
278 lines
8.1 KiB
TypeScript
import { Injectable, Logger, NotFoundException, ConflictException } from "@nestjs/common";
|
|
import { Prisma } from "@prisma/client";
|
|
import { EMBEDDING_DIMENSION } from "@mosaic/shared";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
import { EmbeddingService } from "../knowledge/services/embedding.service";
|
|
import type { IngestConversationDto, SearchConversationDto, ListConversationsDto } from "./dto";
|
|
|
|
/**
|
|
* Shape of a raw conversation archive row from $queryRaw vector search
|
|
*/
|
|
interface RawConversationResult {
|
|
id: string;
|
|
workspace_id: string;
|
|
session_id: string;
|
|
agent_id: string;
|
|
messages: unknown;
|
|
message_count: number;
|
|
summary: string;
|
|
started_at: Date;
|
|
ended_at: Date | null;
|
|
metadata: unknown;
|
|
created_at: Date;
|
|
updated_at: Date;
|
|
similarity: number;
|
|
}
|
|
|
|
/**
|
|
* Paginated response wrapper
|
|
*/
|
|
export interface PaginatedConversations<T> {
|
|
data: T[];
|
|
pagination: {
|
|
page: number;
|
|
limit: number;
|
|
total: number;
|
|
totalPages: number;
|
|
};
|
|
}
|
|
|
|
@Injectable()
|
|
export class ConversationArchiveService {
|
|
private readonly logger = new Logger(ConversationArchiveService.name);
|
|
private readonly defaultSimilarityThreshold = 0.5;
|
|
|
|
constructor(
|
|
private readonly prisma: PrismaService,
|
|
private readonly embedding: EmbeddingService
|
|
) {}
|
|
|
|
/**
|
|
* Ingest a conversation session log.
|
|
* Generates a vector embedding from the summary + message content and stores it alongside the record.
|
|
*/
|
|
async ingest(workspaceId: string, dto: IngestConversationDto): Promise<{ id: string }> {
|
|
// Verify no duplicate session in this workspace
|
|
const existing = await this.prisma.conversationArchive.findUnique({
|
|
where: { workspaceId_sessionId: { workspaceId, sessionId: dto.sessionId } },
|
|
select: { id: true },
|
|
});
|
|
|
|
if (existing) {
|
|
throw new ConflictException(
|
|
`Conversation session '${dto.sessionId}' already exists in this workspace`
|
|
);
|
|
}
|
|
|
|
const messageCount = dto.messages.length;
|
|
|
|
// Create record first to get ID for embedding
|
|
const record = await this.prisma.conversationArchive.create({
|
|
data: {
|
|
workspaceId,
|
|
sessionId: dto.sessionId,
|
|
agentId: dto.agentId,
|
|
messages: dto.messages as unknown as Prisma.InputJsonValue,
|
|
messageCount,
|
|
summary: dto.summary,
|
|
startedAt: new Date(dto.startedAt),
|
|
endedAt: dto.endedAt ? new Date(dto.endedAt) : null,
|
|
metadata: (dto.metadata ?? {}) as Prisma.InputJsonValue,
|
|
},
|
|
select: { id: true },
|
|
});
|
|
|
|
// Generate and store embedding asynchronously (non-blocking for ingest)
|
|
if (this.embedding.isConfigured()) {
|
|
const textForEmbedding = this.buildEmbeddingText(dto.summary, dto.messages);
|
|
this.storeEmbedding(record.id, textForEmbedding).catch((err: unknown) => {
|
|
this.logger.error(`Failed to store embedding for conversation ${record.id}`, err);
|
|
});
|
|
}
|
|
|
|
this.logger.log(`Ingested conversation ${record.id} (session: ${dto.sessionId})`);
|
|
return { id: record.id };
|
|
}
|
|
|
|
/**
|
|
* Semantic vector search across conversation archives in a workspace.
|
|
*/
|
|
async search(
|
|
workspaceId: string,
|
|
dto: SearchConversationDto
|
|
): Promise<PaginatedConversations<RawConversationResult>> {
|
|
if (!this.embedding.isConfigured()) {
|
|
throw new ConflictException("Semantic search requires OpenAI API key to be configured");
|
|
}
|
|
|
|
const limit = dto.limit ?? 20;
|
|
const threshold = dto.similarityThreshold ?? this.defaultSimilarityThreshold;
|
|
const distanceThreshold = 1 - threshold;
|
|
|
|
const queryEmbedding = await this.embedding.generateEmbedding(dto.query);
|
|
const embeddingStr = `[${queryEmbedding.join(",")}]`;
|
|
|
|
const agentFilter = dto.agentId ? Prisma.sql`AND ca.agent_id = ${dto.agentId}` : Prisma.sql``;
|
|
|
|
const rows = await this.prisma.$queryRaw<RawConversationResult[]>`
|
|
SELECT
|
|
ca.id,
|
|
ca.workspace_id,
|
|
ca.session_id,
|
|
ca.agent_id,
|
|
ca.messages,
|
|
ca.message_count,
|
|
ca.summary,
|
|
ca.started_at,
|
|
ca.ended_at,
|
|
ca.metadata,
|
|
ca.created_at,
|
|
ca.updated_at,
|
|
(1 - (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION}))) AS similarity
|
|
FROM conversation_archives ca
|
|
WHERE ca.workspace_id = ${workspaceId}::uuid
|
|
AND ca.embedding IS NOT NULL
|
|
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
|
|
${agentFilter}
|
|
ORDER BY ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})
|
|
LIMIT ${limit}
|
|
`;
|
|
|
|
const countResult = await this.prisma.$queryRaw<[{ count: bigint }]>`
|
|
SELECT COUNT(*) AS count
|
|
FROM conversation_archives ca
|
|
WHERE ca.workspace_id = ${workspaceId}::uuid
|
|
AND ca.embedding IS NOT NULL
|
|
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
|
|
${agentFilter}
|
|
`;
|
|
|
|
const total = Number(countResult[0].count);
|
|
|
|
return {
|
|
data: rows,
|
|
pagination: {
|
|
page: 1,
|
|
limit,
|
|
total,
|
|
totalPages: Math.ceil(total / limit),
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* List conversation archives with filtering and pagination.
|
|
*/
|
|
async findAll(
|
|
workspaceId: string,
|
|
query: ListConversationsDto
|
|
): Promise<PaginatedConversations<object>> {
|
|
const page = query.page ?? 1;
|
|
const limit = query.limit ?? 20;
|
|
const skip = (page - 1) * limit;
|
|
|
|
const where: Prisma.ConversationArchiveWhereInput = {
|
|
workspaceId,
|
|
...(query.agentId ? { agentId: query.agentId } : {}),
|
|
...(query.startedAfter || query.startedBefore
|
|
? {
|
|
startedAt: {
|
|
...(query.startedAfter ? { gte: new Date(query.startedAfter) } : {}),
|
|
...(query.startedBefore ? { lte: new Date(query.startedBefore) } : {}),
|
|
},
|
|
}
|
|
: {}),
|
|
};
|
|
|
|
const [total, records] = await Promise.all([
|
|
this.prisma.conversationArchive.count({ where }),
|
|
this.prisma.conversationArchive.findMany({
|
|
where,
|
|
select: {
|
|
id: true,
|
|
workspaceId: true,
|
|
sessionId: true,
|
|
agentId: true,
|
|
messageCount: true,
|
|
summary: true,
|
|
startedAt: true,
|
|
endedAt: true,
|
|
metadata: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
},
|
|
orderBy: { startedAt: "desc" },
|
|
skip,
|
|
take: limit,
|
|
}),
|
|
]);
|
|
|
|
return {
|
|
data: records,
|
|
pagination: {
|
|
page,
|
|
limit,
|
|
total,
|
|
totalPages: Math.ceil(total / limit),
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get a single conversation archive by ID.
|
|
*/
|
|
async findOne(workspaceId: string, id: string): Promise<object> {
|
|
const record = await this.prisma.conversationArchive.findFirst({
|
|
where: { id, workspaceId },
|
|
select: {
|
|
id: true,
|
|
workspaceId: true,
|
|
sessionId: true,
|
|
agentId: true,
|
|
messages: true,
|
|
messageCount: true,
|
|
summary: true,
|
|
startedAt: true,
|
|
endedAt: true,
|
|
metadata: true,
|
|
createdAt: true,
|
|
updatedAt: true,
|
|
},
|
|
});
|
|
|
|
if (!record) {
|
|
throw new NotFoundException(`Conversation archive '${id}' not found`);
|
|
}
|
|
|
|
return record;
|
|
}
|
|
|
|
/**
|
|
* Build text content for embedding from summary and messages.
|
|
*/
|
|
private buildEmbeddingText(
|
|
summary: string,
|
|
messages: { role: string; content: string }[]
|
|
): string {
|
|
const messageText = messages.map((m) => `${m.role}: ${m.content}`).join("\n");
|
|
return `${summary}\n\n${messageText}`.trim();
|
|
}
|
|
|
|
/**
|
|
* Generate embedding and store it on the conversation_archives row.
|
|
*/
|
|
private async storeEmbedding(id: string, text: string): Promise<void> {
|
|
const vector = await this.embedding.generateEmbedding(text);
|
|
const embeddingStr = `[${vector.join(",")}]`;
|
|
|
|
await this.prisma.$executeRaw`
|
|
UPDATE conversation_archives
|
|
SET embedding = ${embeddingStr}::vector(${EMBEDDING_DIMENSION}),
|
|
updated_at = NOW()
|
|
WHERE id = ${id}::uuid
|
|
`;
|
|
|
|
this.logger.log(`Stored embedding for conversation ${id}`);
|
|
}
|
|
}
|