Compare commits

...

2 Commits

Author SHA1 Message Date
f0c0e6b45a feat(api): add conversation archive with vector search (MS22-DB-004, MS22-API-004)
All checks were successful
ci/woodpecker/push/api Pipeline was successful
- Add ConversationArchive Prisma model with pgvector(1536) embedding field
- Migration: 20260228000000_ms22_conversation_archive
- NestJS module at apps/api/src/conversation-archive/ with service, controller, DTOs, spec
- POST /api/conversations/ingest — ingest session logs, auto-embed via EmbeddingService
- POST /api/conversations/search — vector similarity search with agentId filter
- GET  /api/conversations — paginated list with agentId + date range filters
- GET  /api/conversations/:id — fetch full conversation including messages
- Register ConversationArchiveModule in app.module.ts
- 8 unit tests, all passing (vitest)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 20:20:31 -06:00
4b2e48af9c feat(api): add agent memory module (MS22-DB-002, MS22-API-002) (#586)
All checks were successful
ci/woodpecker/push/api Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-01 02:20:15 +00:00
21 changed files with 1291 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
-- CreateTable
CREATE TABLE "agent_memories" (
"id" UUID NOT NULL,
"workspace_id" UUID NOT NULL,
"agent_id" TEXT NOT NULL,
"key" TEXT NOT NULL,
"value" JSONB NOT NULL,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ NOT NULL,
CONSTRAINT "agent_memories_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "agent_memories_workspace_id_agent_id_key_key" ON "agent_memories"("workspace_id", "agent_id", "key");
-- CreateIndex
CREATE INDEX "agent_memories_workspace_id_idx" ON "agent_memories"("workspace_id");
-- CreateIndex
CREATE INDEX "agent_memories_agent_id_idx" ON "agent_memories"("agent_id");
-- AddForeignKey
ALTER TABLE "agent_memories" ADD CONSTRAINT "agent_memories_workspace_id_fkey" FOREIGN KEY ("workspace_id") REFERENCES "workspaces"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1,33 @@
-- CreateTable
CREATE TABLE "conversation_archives" (
"id" UUID NOT NULL,
"workspace_id" UUID NOT NULL,
"session_id" TEXT NOT NULL,
"agent_id" TEXT NOT NULL,
"messages" JSONB NOT NULL,
"message_count" INTEGER NOT NULL,
"summary" TEXT NOT NULL,
"embedding" vector(1536),
"started_at" TIMESTAMPTZ NOT NULL,
"ended_at" TIMESTAMPTZ,
"metadata" JSONB NOT NULL DEFAULT '{}',
"created_at" TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ NOT NULL,
CONSTRAINT "conversation_archives_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "conversation_archives_workspace_id_session_id_key" ON "conversation_archives"("workspace_id", "session_id");
-- CreateIndex
CREATE INDEX "conversation_archives_workspace_id_idx" ON "conversation_archives"("workspace_id");
-- CreateIndex
CREATE INDEX "conversation_archives_agent_id_idx" ON "conversation_archives"("agent_id");
-- CreateIndex
CREATE INDEX "conversation_archives_started_at_idx" ON "conversation_archives"("started_at");
-- AddForeignKey
ALTER TABLE "conversation_archives" ADD CONSTRAINT "conversation_archives_workspace_id_fkey" FOREIGN KEY ("workspace_id") REFERENCES "workspaces"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -299,6 +299,7 @@ model Workspace {
agentSessions AgentSession[]
agentTasks AgentTask[]
findings Finding[]
agentMemories AgentMemory[]
userLayouts UserLayout[]
knowledgeEntries KnowledgeEntry[]
knowledgeTags KnowledgeTag[]
@@ -313,6 +314,7 @@ model Workspace {
llmUsageLogs LlmUsageLog[]
userCredentials UserCredential[]
terminalSessions TerminalSession[]
conversationArchives ConversationArchive[]
@@index([ownerId])
@@map("workspaces")
@@ -764,6 +766,23 @@ model AgentSession {
@@map("agent_sessions")
}
model AgentMemory {
id String @id @default(uuid()) @db.Uuid
workspaceId String @map("workspace_id") @db.Uuid
agentId String @map("agent_id")
key String
value Json
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@unique([workspaceId, agentId, key])
@@index([workspaceId])
@@index([agentId])
@@map("agent_memories")
}
model WidgetDefinition {
id String @id @default(uuid()) @db.Uuid
@@ -1575,3 +1594,33 @@ model TerminalSession {
@@index([workspaceId, status])
@@map("terminal_sessions")
}
// ============================================
// CONVERSATION ARCHIVE MODULE
// ============================================
model ConversationArchive {
id String @id @default(uuid()) @db.Uuid
workspaceId String @map("workspace_id") @db.Uuid
sessionId String @map("session_id")
agentId String @map("agent_id")
messages Json
messageCount Int @map("message_count")
summary String @db.Text
// Note: vector dimension (1536) must match EMBEDDING_DIMENSION constant in @mosaic/shared
embedding Unsupported("vector(1536)")?
startedAt DateTime @map("started_at") @db.Timestamptz
endedAt DateTime? @map("ended_at") @db.Timestamptz
metadata Json @default("{}")
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
// Relations
workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@unique([workspaceId, sessionId])
@@index([workspaceId])
@@index([agentId])
@@index([startedAt])
@@map("conversation_archives")
}

View File

@@ -0,0 +1,102 @@
import { Test, TestingModule } from "@nestjs/testing";
import { AgentMemoryController } from "./agent-memory.controller";
import { AgentMemoryService } from "./agent-memory.service";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { describe, it, expect, beforeEach, vi } from "vitest";
describe("AgentMemoryController", () => {
let controller: AgentMemoryController;
const mockAgentMemoryService = {
upsert: vi.fn(),
findAll: vi.fn(),
findOne: vi.fn(),
remove: vi.fn(),
};
const mockGuard = { canActivate: vi.fn(() => true) };
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [AgentMemoryController],
providers: [
{
provide: AgentMemoryService,
useValue: mockAgentMemoryService,
},
],
})
.overrideGuard(AuthGuard)
.useValue(mockGuard)
.overrideGuard(WorkspaceGuard)
.useValue(mockGuard)
.overrideGuard(PermissionGuard)
.useValue(mockGuard)
.compile();
controller = module.get<AgentMemoryController>(AgentMemoryController);
vi.clearAllMocks();
});
const workspaceId = "workspace-1";
const agentId = "agent-1";
const key = "context";
describe("upsert", () => {
it("should upsert a memory entry", async () => {
const dto = { value: { foo: "bar" } };
const mockEntry = { id: "mem-1", workspaceId, agentId, key, value: dto.value };
mockAgentMemoryService.upsert.mockResolvedValue(mockEntry);
const result = await controller.upsert(agentId, key, dto, workspaceId);
expect(mockAgentMemoryService.upsert).toHaveBeenCalledWith(workspaceId, agentId, key, dto);
expect(result).toEqual(mockEntry);
});
});
describe("findAll", () => {
it("should list all memory entries for an agent", async () => {
const mockEntries = [
{ id: "mem-1", key: "a", value: 1 },
{ id: "mem-2", key: "b", value: 2 },
];
mockAgentMemoryService.findAll.mockResolvedValue(mockEntries);
const result = await controller.findAll(agentId, workspaceId);
expect(mockAgentMemoryService.findAll).toHaveBeenCalledWith(workspaceId, agentId);
expect(result).toEqual(mockEntries);
});
});
describe("findOne", () => {
it("should get a single memory entry", async () => {
const mockEntry = { id: "mem-1", key, value: "v" };
mockAgentMemoryService.findOne.mockResolvedValue(mockEntry);
const result = await controller.findOne(agentId, key, workspaceId);
expect(mockAgentMemoryService.findOne).toHaveBeenCalledWith(workspaceId, agentId, key);
expect(result).toEqual(mockEntry);
});
});
describe("remove", () => {
it("should delete a memory entry", async () => {
const mockResponse = { message: "Memory entry deleted successfully" };
mockAgentMemoryService.remove.mockResolvedValue(mockResponse);
const result = await controller.remove(agentId, key, workspaceId);
expect(mockAgentMemoryService.remove).toHaveBeenCalledWith(workspaceId, agentId, key);
expect(result).toEqual(mockResponse);
});
});
});

View File

@@ -0,0 +1,89 @@
import {
Controller,
Get,
Put,
Delete,
Body,
Param,
UseGuards,
HttpCode,
HttpStatus,
} from "@nestjs/common";
import { AgentMemoryService } from "./agent-memory.service";
import { UpsertAgentMemoryDto } from "./dto";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { Workspace, Permission, RequirePermission } from "../common/decorators";
/**
* Controller for per-agent key/value memory endpoints.
* All endpoints require authentication and workspace context.
*
* Guards are applied in order:
* 1. AuthGuard - Verifies user authentication
* 2. WorkspaceGuard - Validates workspace access
* 3. PermissionGuard - Checks role-based permissions
*/
@Controller("agents/:agentId/memory")
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
export class AgentMemoryController {
constructor(private readonly agentMemoryService: AgentMemoryService) {}
/**
* PUT /api/agents/:agentId/memory/:key
* Upsert a memory entry for an agent
* Requires: MEMBER role or higher
*/
@Put(":key")
@RequirePermission(Permission.WORKSPACE_MEMBER)
async upsert(
@Param("agentId") agentId: string,
@Param("key") key: string,
@Body() dto: UpsertAgentMemoryDto,
@Workspace() workspaceId: string
) {
return this.agentMemoryService.upsert(workspaceId, agentId, key, dto);
}
/**
* GET /api/agents/:agentId/memory
* List all memory entries for an agent
* Requires: Any workspace member (including GUEST)
*/
@Get()
@RequirePermission(Permission.WORKSPACE_ANY)
async findAll(@Param("agentId") agentId: string, @Workspace() workspaceId: string) {
return this.agentMemoryService.findAll(workspaceId, agentId);
}
/**
* GET /api/agents/:agentId/memory/:key
* Get a single memory entry by key
* Requires: Any workspace member (including GUEST)
*/
@Get(":key")
@RequirePermission(Permission.WORKSPACE_ANY)
async findOne(
@Param("agentId") agentId: string,
@Param("key") key: string,
@Workspace() workspaceId: string
) {
return this.agentMemoryService.findOne(workspaceId, agentId, key);
}
/**
* DELETE /api/agents/:agentId/memory/:key
* Remove a memory entry
* Requires: MEMBER role or higher
*/
@Delete(":key")
@HttpCode(HttpStatus.OK)
@RequirePermission(Permission.WORKSPACE_MEMBER)
async remove(
@Param("agentId") agentId: string,
@Param("key") key: string,
@Workspace() workspaceId: string
) {
return this.agentMemoryService.remove(workspaceId, agentId, key);
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from "@nestjs/common";
import { AgentMemoryController } from "./agent-memory.controller";
import { AgentMemoryService } from "./agent-memory.service";
import { PrismaModule } from "../prisma/prisma.module";
import { AuthModule } from "../auth/auth.module";
@Module({
imports: [PrismaModule, AuthModule],
controllers: [AgentMemoryController],
providers: [AgentMemoryService],
exports: [AgentMemoryService],
})
export class AgentMemoryModule {}

View File

@@ -0,0 +1,126 @@
import { Test, TestingModule } from "@nestjs/testing";
import { AgentMemoryService } from "./agent-memory.service";
import { PrismaService } from "../prisma/prisma.service";
import { NotFoundException } from "@nestjs/common";
import { describe, it, expect, beforeEach, vi } from "vitest";
describe("AgentMemoryService", () => {
let service: AgentMemoryService;
const mockPrismaService = {
agentMemory: {
upsert: vi.fn(),
findMany: vi.fn(),
findUnique: vi.fn(),
delete: vi.fn(),
},
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
AgentMemoryService,
{
provide: PrismaService,
useValue: mockPrismaService,
},
],
}).compile();
service = module.get<AgentMemoryService>(AgentMemoryService);
vi.clearAllMocks();
});
const workspaceId = "workspace-1";
const agentId = "agent-1";
const key = "session-context";
describe("upsert", () => {
it("should upsert a memory entry", async () => {
const dto = { value: { data: "some context" } };
const mockEntry = {
id: "mem-1",
workspaceId,
agentId,
key,
value: dto.value,
createdAt: new Date(),
updatedAt: new Date(),
};
mockPrismaService.agentMemory.upsert.mockResolvedValue(mockEntry);
const result = await service.upsert(workspaceId, agentId, key, dto);
expect(mockPrismaService.agentMemory.upsert).toHaveBeenCalledWith({
where: { workspaceId_agentId_key: { workspaceId, agentId, key } },
create: { workspaceId, agentId, key, value: dto.value },
update: { value: dto.value },
});
expect(result).toEqual(mockEntry);
});
});
describe("findAll", () => {
it("should return all memory entries for an agent", async () => {
const mockEntries = [
{ id: "mem-1", key: "a", value: 1 },
{ id: "mem-2", key: "b", value: 2 },
];
mockPrismaService.agentMemory.findMany.mockResolvedValue(mockEntries);
const result = await service.findAll(workspaceId, agentId);
expect(mockPrismaService.agentMemory.findMany).toHaveBeenCalledWith({
where: { workspaceId, agentId },
orderBy: { key: "asc" },
});
expect(result).toEqual(mockEntries);
});
});
describe("findOne", () => {
it("should return a memory entry by key", async () => {
const mockEntry = { id: "mem-1", workspaceId, agentId, key, value: "ctx" };
mockPrismaService.agentMemory.findUnique.mockResolvedValue(mockEntry);
const result = await service.findOne(workspaceId, agentId, key);
expect(mockPrismaService.agentMemory.findUnique).toHaveBeenCalledWith({
where: { workspaceId_agentId_key: { workspaceId, agentId, key } },
});
expect(result).toEqual(mockEntry);
});
it("should throw NotFoundException when key not found", async () => {
mockPrismaService.agentMemory.findUnique.mockResolvedValue(null);
await expect(service.findOne(workspaceId, agentId, key)).rejects.toThrow(NotFoundException);
});
});
describe("remove", () => {
it("should delete a memory entry", async () => {
const mockEntry = { id: "mem-1", workspaceId, agentId, key, value: "x" };
mockPrismaService.agentMemory.findUnique.mockResolvedValue(mockEntry);
mockPrismaService.agentMemory.delete.mockResolvedValue(mockEntry);
const result = await service.remove(workspaceId, agentId, key);
expect(mockPrismaService.agentMemory.delete).toHaveBeenCalledWith({
where: { workspaceId_agentId_key: { workspaceId, agentId, key } },
});
expect(result).toEqual({ message: "Memory entry deleted successfully" });
});
it("should throw NotFoundException when key not found", async () => {
mockPrismaService.agentMemory.findUnique.mockResolvedValue(null);
await expect(service.remove(workspaceId, agentId, key)).rejects.toThrow(NotFoundException);
});
});
});

View File

@@ -0,0 +1,79 @@
import { Injectable, NotFoundException } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { Prisma } from "@prisma/client";
import type { UpsertAgentMemoryDto } from "./dto";
@Injectable()
export class AgentMemoryService {
constructor(private readonly prisma: PrismaService) {}
/**
* Upsert a memory entry for an agent.
*/
async upsert(workspaceId: string, agentId: string, key: string, dto: UpsertAgentMemoryDto) {
return this.prisma.agentMemory.upsert({
where: {
workspaceId_agentId_key: { workspaceId, agentId, key },
},
create: {
workspaceId,
agentId,
key,
value: dto.value as Prisma.InputJsonValue,
},
update: {
value: dto.value as Prisma.InputJsonValue,
},
});
}
/**
* List all memory entries for an agent in a workspace.
*/
async findAll(workspaceId: string, agentId: string) {
return this.prisma.agentMemory.findMany({
where: { workspaceId, agentId },
orderBy: { key: "asc" },
});
}
/**
* Get a single memory entry by key.
*/
async findOne(workspaceId: string, agentId: string, key: string) {
const entry = await this.prisma.agentMemory.findUnique({
where: {
workspaceId_agentId_key: { workspaceId, agentId, key },
},
});
if (!entry) {
throw new NotFoundException(`Memory key "${key}" not found for agent "${agentId}"`);
}
return entry;
}
/**
* Delete a memory entry by key.
*/
async remove(workspaceId: string, agentId: string, key: string) {
const entry = await this.prisma.agentMemory.findUnique({
where: {
workspaceId_agentId_key: { workspaceId, agentId, key },
},
});
if (!entry) {
throw new NotFoundException(`Memory key "${key}" not found for agent "${agentId}"`);
}
await this.prisma.agentMemory.delete({
where: {
workspaceId_agentId_key: { workspaceId, agentId, key },
},
});
return { message: "Memory entry deleted successfully" };
}
}

View File

@@ -0,0 +1 @@
export * from "./upsert-agent-memory.dto";

View File

@@ -0,0 +1,10 @@
import { IsNotEmpty } from "class-validator";
/**
* DTO for upserting an agent memory entry.
* The value accepts any JSON-serializable data.
*/
export class UpsertAgentMemoryDto {
@IsNotEmpty({ message: "value must not be empty" })
value!: unknown;
}

View File

@@ -28,6 +28,7 @@ import { BrainModule } from "./brain/brain.module";
import { CronModule } from "./cron/cron.module";
import { AgentTasksModule } from "./agent-tasks/agent-tasks.module";
import { FindingsModule } from "./findings/findings.module";
import { AgentMemoryModule } from "./agent-memory/agent-memory.module";
import { ValkeyModule } from "./valkey/valkey.module";
import { BullMqModule } from "./bullmq/bullmq.module";
import { StitcherModule } from "./stitcher/stitcher.module";
@@ -47,6 +48,7 @@ import { WorkspacesModule } from "./workspaces/workspaces.module";
import { AdminModule } from "./admin/admin.module";
import { TeamsModule } from "./teams/teams.module";
import { ImportModule } from "./import/import.module";
import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module";
import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor";
@Module({
@@ -102,6 +104,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce
CronModule,
AgentTasksModule,
FindingsModule,
AgentMemoryModule,
RunnerJobsModule,
JobEventsModule,
JobStepsModule,
@@ -117,6 +120,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce
AdminModule,
TeamsModule,
ImportModule,
ConversationArchiveModule,
],
controllers: [AppController, CsrfController],
providers: [

View File

@@ -0,0 +1,69 @@
import { Controller, Post, Get, Body, Param, Query, UseGuards } from "@nestjs/common";
import { AuthGuard } from "../auth/guards/auth.guard";
import { WorkspaceGuard, PermissionGuard } from "../common/guards";
import { Workspace, RequirePermission, Permission } from "../common/decorators";
import { ConversationArchiveService } from "./conversation-archive.service";
import { IngestConversationDto, SearchConversationDto, ListConversationsDto } from "./dto";
/**
* Controller for conversation archive endpoints.
* All endpoints require workspace membership.
*/
@Controller("conversations")
@UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
export class ConversationArchiveController {
constructor(private readonly service: ConversationArchiveService) {}
/**
* POST /api/conversations/ingest
* Ingest a conversation session log and auto-embed for semantic search.
* Requires: MEMBER or higher
*/
@Post("ingest")
@RequirePermission(Permission.WORKSPACE_MEMBER)
async ingest(
@Workspace() workspaceId: string,
@Body() dto: IngestConversationDto
): Promise<{ id: string }> {
return this.service.ingest(workspaceId, dto);
}
/**
* POST /api/conversations/search
* Vector similarity search across archived conversations.
* Requires: Any workspace member
*/
@Post("search")
@RequirePermission(Permission.WORKSPACE_ANY)
async search(
@Workspace() workspaceId: string,
@Body() dto: SearchConversationDto
): Promise<unknown> {
return this.service.search(workspaceId, dto);
}
/**
* GET /api/conversations
* List conversation archives with filtering and pagination.
* Requires: Any workspace member
*/
@Get()
@RequirePermission(Permission.WORKSPACE_ANY)
async findAll(
@Workspace() workspaceId: string,
@Query() query: ListConversationsDto
): Promise<unknown> {
return this.service.findAll(workspaceId, query);
}
/**
* GET /api/conversations/:id
* Get a single conversation archive by ID (includes full messages).
* Requires: Any workspace member
*/
@Get(":id")
@RequirePermission(Permission.WORKSPACE_ANY)
async findOne(@Workspace() workspaceId: string, @Param("id") id: string): Promise<unknown> {
return this.service.findOne(workspaceId, id);
}
}

View File

@@ -0,0 +1,14 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../prisma/prisma.module";
import { AuthModule } from "../auth/auth.module";
import { KnowledgeModule } from "../knowledge/knowledge.module";
import { ConversationArchiveService } from "./conversation-archive.service";
import { ConversationArchiveController } from "./conversation-archive.controller";
@Module({
imports: [PrismaModule, AuthModule, KnowledgeModule],
controllers: [ConversationArchiveController],
providers: [ConversationArchiveService],
exports: [ConversationArchiveService],
})
export class ConversationArchiveModule {}

View File

@@ -0,0 +1,149 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { ConflictException, NotFoundException } from "@nestjs/common";
import { ConversationArchiveService } from "./conversation-archive.service";
import { PrismaService } from "../prisma/prisma.service";
import { EmbeddingService } from "../knowledge/services/embedding.service";
const mockPrisma = {
conversationArchive: {
findUnique: vi.fn(),
create: vi.fn(),
count: vi.fn(),
findMany: vi.fn(),
findFirst: vi.fn(),
},
$queryRaw: vi.fn(),
$executeRaw: vi.fn(),
};
const mockEmbedding = {
isConfigured: vi.fn(),
generateEmbedding: vi.fn(),
};
describe("ConversationArchiveService", () => {
let service: ConversationArchiveService;
beforeEach(async () => {
vi.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
providers: [
ConversationArchiveService,
{ provide: PrismaService, useValue: mockPrisma },
{ provide: EmbeddingService, useValue: mockEmbedding },
],
}).compile();
service = module.get<ConversationArchiveService>(ConversationArchiveService);
});
describe("ingest", () => {
const workspaceId = "ws-1";
const dto = {
sessionId: "sess-abc",
agentId: "agent-xyz",
messages: [
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
],
summary: "A greeting conversation",
startedAt: "2026-02-28T10:00:00Z",
};
it("creates a conversation archive and returns id", async () => {
mockPrisma.conversationArchive.findUnique.mockResolvedValue(null);
mockPrisma.conversationArchive.create.mockResolvedValue({ id: "conv-1" });
mockEmbedding.isConfigured.mockReturnValue(false);
const result = await service.ingest(workspaceId, dto);
expect(result).toEqual({ id: "conv-1" });
expect(mockPrisma.conversationArchive.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
workspaceId,
sessionId: dto.sessionId,
agentId: dto.agentId,
messageCount: 2,
}),
})
);
});
it("throws ConflictException when session already exists", async () => {
mockPrisma.conversationArchive.findUnique.mockResolvedValue({ id: "existing" });
await expect(service.ingest(workspaceId, dto)).rejects.toThrow(ConflictException);
});
});
describe("findAll", () => {
const workspaceId = "ws-1";
it("returns paginated list", async () => {
mockPrisma.conversationArchive.count.mockResolvedValue(5);
mockPrisma.conversationArchive.findMany.mockResolvedValue([
{ id: "conv-1", sessionId: "sess-1" },
]);
const result = await service.findAll(workspaceId, { page: 1, limit: 10 });
expect(result.pagination.total).toBe(5);
expect(result.data).toHaveLength(1);
});
it("uses default pagination when not provided", async () => {
mockPrisma.conversationArchive.count.mockResolvedValue(0);
mockPrisma.conversationArchive.findMany.mockResolvedValue([]);
const result = await service.findAll(workspaceId, {});
expect(result.pagination.page).toBe(1);
expect(result.pagination.limit).toBe(20);
});
});
describe("findOne", () => {
const workspaceId = "ws-1";
it("returns record when found", async () => {
const record = { id: "conv-1", workspaceId, sessionId: "sess-1" };
mockPrisma.conversationArchive.findFirst.mockResolvedValue(record);
const result = await service.findOne(workspaceId, "conv-1");
expect(result).toEqual(record);
});
it("throws NotFoundException when record does not exist", async () => {
mockPrisma.conversationArchive.findFirst.mockResolvedValue(null);
await expect(service.findOne(workspaceId, "missing")).rejects.toThrow(NotFoundException);
});
});
describe("search", () => {
it("throws ConflictException when embedding is not configured", async () => {
mockEmbedding.isConfigured.mockReturnValue(false);
await expect(service.search("ws-1", { query: "test query" })).rejects.toThrow(
ConflictException
);
});
it("performs vector search when configured", async () => {
mockEmbedding.isConfigured.mockReturnValue(true);
mockEmbedding.generateEmbedding.mockResolvedValue(new Array(1536).fill(0.1));
mockPrisma.$queryRaw
.mockResolvedValueOnce([{ id: "conv-1", similarity: 0.9 }])
.mockResolvedValueOnce([{ count: BigInt(1) }]);
const result = await service.search("ws-1", { query: "greetings" });
expect(result.data).toHaveLength(1);
expect(result.pagination.total).toBe(1);
});
});
});

View File

@@ -0,0 +1,277 @@
import { Injectable, Logger, NotFoundException, ConflictException } from "@nestjs/common";
import { Prisma } from "@prisma/client";
import { EMBEDDING_DIMENSION } from "@mosaic/shared";
import { PrismaService } from "../prisma/prisma.service";
import { EmbeddingService } from "../knowledge/services/embedding.service";
import type { IngestConversationDto, SearchConversationDto, ListConversationsDto } from "./dto";
/**
* Shape of a raw conversation archive row from $queryRaw vector search
*/
interface RawConversationResult {
id: string;
workspace_id: string;
session_id: string;
agent_id: string;
messages: unknown;
message_count: number;
summary: string;
started_at: Date;
ended_at: Date | null;
metadata: unknown;
created_at: Date;
updated_at: Date;
similarity: number;
}
/**
* Paginated response wrapper
*/
export interface PaginatedConversations<T> {
data: T[];
pagination: {
page: number;
limit: number;
total: number;
totalPages: number;
};
}
@Injectable()
export class ConversationArchiveService {
private readonly logger = new Logger(ConversationArchiveService.name);
private readonly defaultSimilarityThreshold = 0.5;
constructor(
private readonly prisma: PrismaService,
private readonly embedding: EmbeddingService
) {}
/**
* Ingest a conversation session log.
* Generates a vector embedding from the summary + message content and stores it alongside the record.
*/
async ingest(workspaceId: string, dto: IngestConversationDto): Promise<{ id: string }> {
// Verify no duplicate session in this workspace
const existing = await this.prisma.conversationArchive.findUnique({
where: { workspaceId_sessionId: { workspaceId, sessionId: dto.sessionId } },
select: { id: true },
});
if (existing) {
throw new ConflictException(
`Conversation session '${dto.sessionId}' already exists in this workspace`
);
}
const messageCount = dto.messages.length;
// Create record first to get ID for embedding
const record = await this.prisma.conversationArchive.create({
data: {
workspaceId,
sessionId: dto.sessionId,
agentId: dto.agentId,
messages: dto.messages as unknown as Prisma.InputJsonValue,
messageCount,
summary: dto.summary,
startedAt: new Date(dto.startedAt),
endedAt: dto.endedAt ? new Date(dto.endedAt) : null,
metadata: (dto.metadata ?? {}) as Prisma.InputJsonValue,
},
select: { id: true },
});
// Generate and store embedding asynchronously (non-blocking for ingest)
if (this.embedding.isConfigured()) {
const textForEmbedding = this.buildEmbeddingText(dto.summary, dto.messages);
this.storeEmbedding(record.id, textForEmbedding).catch((err: unknown) => {
this.logger.error(`Failed to store embedding for conversation ${record.id}`, err);
});
}
this.logger.log(`Ingested conversation ${record.id} (session: ${dto.sessionId})`);
return { id: record.id };
}
/**
* Semantic vector search across conversation archives in a workspace.
*/
async search(
workspaceId: string,
dto: SearchConversationDto
): Promise<PaginatedConversations<RawConversationResult>> {
if (!this.embedding.isConfigured()) {
throw new ConflictException("Semantic search requires OpenAI API key to be configured");
}
const limit = dto.limit ?? 20;
const threshold = dto.similarityThreshold ?? this.defaultSimilarityThreshold;
const distanceThreshold = 1 - threshold;
const queryEmbedding = await this.embedding.generateEmbedding(dto.query);
const embeddingStr = `[${queryEmbedding.join(",")}]`;
const agentFilter = dto.agentId ? Prisma.sql`AND ca.agent_id = ${dto.agentId}` : Prisma.sql``;
const rows = await this.prisma.$queryRaw<RawConversationResult[]>`
SELECT
ca.id,
ca.workspace_id,
ca.session_id,
ca.agent_id,
ca.messages,
ca.message_count,
ca.summary,
ca.started_at,
ca.ended_at,
ca.metadata,
ca.created_at,
ca.updated_at,
(1 - (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION}))) AS similarity
FROM conversation_archives ca
WHERE ca.workspace_id = ${workspaceId}::uuid
AND ca.embedding IS NOT NULL
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
${agentFilter}
ORDER BY ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})
LIMIT ${limit}
`;
const countResult = await this.prisma.$queryRaw<[{ count: bigint }]>`
SELECT COUNT(*) AS count
FROM conversation_archives ca
WHERE ca.workspace_id = ${workspaceId}::uuid
AND ca.embedding IS NOT NULL
AND (ca.embedding <=> ${embeddingStr}::vector(${EMBEDDING_DIMENSION})) <= ${distanceThreshold}
${agentFilter}
`;
const total = Number(countResult[0].count);
return {
data: rows,
pagination: {
page: 1,
limit,
total,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* List conversation archives with filtering and pagination.
*/
async findAll(
workspaceId: string,
query: ListConversationsDto
): Promise<PaginatedConversations<object>> {
const page = query.page ?? 1;
const limit = query.limit ?? 20;
const skip = (page - 1) * limit;
const where: Prisma.ConversationArchiveWhereInput = {
workspaceId,
...(query.agentId ? { agentId: query.agentId } : {}),
...(query.startedAfter || query.startedBefore
? {
startedAt: {
...(query.startedAfter ? { gte: new Date(query.startedAfter) } : {}),
...(query.startedBefore ? { lte: new Date(query.startedBefore) } : {}),
},
}
: {}),
};
const [total, records] = await Promise.all([
this.prisma.conversationArchive.count({ where }),
this.prisma.conversationArchive.findMany({
where,
select: {
id: true,
workspaceId: true,
sessionId: true,
agentId: true,
messageCount: true,
summary: true,
startedAt: true,
endedAt: true,
metadata: true,
createdAt: true,
updatedAt: true,
},
orderBy: { startedAt: "desc" },
skip,
take: limit,
}),
]);
return {
data: records,
pagination: {
page,
limit,
total,
totalPages: Math.ceil(total / limit),
},
};
}
/**
* Get a single conversation archive by ID.
*/
async findOne(workspaceId: string, id: string): Promise<object> {
const record = await this.prisma.conversationArchive.findFirst({
where: { id, workspaceId },
select: {
id: true,
workspaceId: true,
sessionId: true,
agentId: true,
messages: true,
messageCount: true,
summary: true,
startedAt: true,
endedAt: true,
metadata: true,
createdAt: true,
updatedAt: true,
},
});
if (!record) {
throw new NotFoundException(`Conversation archive '${id}' not found`);
}
return record;
}
/**
* Build text content for embedding from summary and messages.
*/
private buildEmbeddingText(
summary: string,
messages: { role: string; content: string }[]
): string {
const messageText = messages.map((m) => `${m.role}: ${m.content}`).join("\n");
return `${summary}\n\n${messageText}`.trim();
}
/**
* Generate embedding and store it on the conversation_archives row.
*/
private async storeEmbedding(id: string, text: string): Promise<void> {
const vector = await this.embedding.generateEmbedding(text);
const embeddingStr = `[${vector.join(",")}]`;
await this.prisma.$executeRaw`
UPDATE conversation_archives
SET embedding = ${embeddingStr}::vector(${EMBEDDING_DIMENSION}),
updated_at = NOW()
WHERE id = ${id}::uuid
`;
this.logger.log(`Stored embedding for conversation ${id}`);
}
}

View File

@@ -0,0 +1,3 @@
export { IngestConversationDto, ConversationMessageDto } from "./ingest-conversation.dto";
export { SearchConversationDto } from "./search-conversation.dto";
export { ListConversationsDto } from "./list-conversations.dto";

View File

@@ -0,0 +1,64 @@
import {
IsString,
IsArray,
IsOptional,
IsDateString,
MinLength,
MaxLength,
IsObject,
ValidateNested,
ArrayMinSize,
} from "class-validator";
import { Type } from "class-transformer";
/**
* Represents a single message in a conversation session
*/
export class ConversationMessageDto {
@IsString()
role!: string;
@IsString()
@MinLength(1)
content!: string;
@IsOptional()
@IsDateString()
timestamp?: string;
}
/**
* DTO for ingesting a conversation session log
*/
export class IngestConversationDto {
@IsString()
@MinLength(1)
@MaxLength(500)
sessionId!: string;
@IsString()
@MinLength(1)
@MaxLength(500)
agentId!: string;
@IsArray()
@ArrayMinSize(1)
@ValidateNested({ each: true })
@Type(() => ConversationMessageDto)
messages!: ConversationMessageDto[];
@IsString()
@MinLength(1)
summary!: string;
@IsDateString()
startedAt!: string;
@IsOptional()
@IsDateString()
endedAt?: string;
@IsOptional()
@IsObject()
metadata?: Record<string, unknown>;
}

View File

@@ -0,0 +1,33 @@
import { IsString, IsOptional, MaxLength, IsInt, Min, Max, IsDateString } from "class-validator";
import { Type } from "class-transformer";
/**
* DTO for listing/filtering conversation archives
*/
export class ListConversationsDto {
@IsOptional()
@IsString()
@MaxLength(500)
agentId?: string;
@IsOptional()
@IsDateString()
startedAfter?: string;
@IsOptional()
@IsDateString()
startedBefore?: string;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
page?: number;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(100)
limit?: number;
}

View File

@@ -0,0 +1,40 @@
import {
IsString,
IsOptional,
MinLength,
MaxLength,
IsInt,
Min,
Max,
IsNumber,
} from "class-validator";
import { Type } from "class-transformer";
/**
* DTO for semantic search across conversation archives
*/
export class SearchConversationDto {
@IsString()
@MinLength(1)
@MaxLength(1000)
query!: string;
@IsOptional()
@IsString()
@MaxLength(500)
agentId?: string;
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(100)
limit?: number;
@IsOptional()
@Type(() => Number)
@IsNumber()
@Min(0)
@Max(1)
similarityThreshold?: number;
}

View File

@@ -0,0 +1,64 @@
# MS22 Agent Memory Module
## Objective
Add per-agent key/value store: AgentMemory model + NestJS module with CRUD endpoints.
## Issues
- MS22-DB-002: Add AgentMemory schema model
- MS22-API-002: Add agent-memory NestJS module
## Plan
1. AgentMemory model → schema.prisma (after AgentSession, line 736)
2. Add `agentMemories AgentMemory[]` relation to Workspace model
3. Create apps/api/src/agent-memory/ with service, controller, DTOs, specs
4. Register in app.module.ts
5. Migrate: `prisma migrate dev --name ms22_agent_memory`
6. lint + build
7. Commit
## Endpoints
- PUT /api/agents/:agentId/memory/:key (upsert)
- GET /api/agents/:agentId/memory (list all)
- GET /api/agents/:agentId/memory/:key (get one)
- DELETE /api/agents/:agentId/memory/:key (remove)
## Auth
- @UseGuards(AuthGuard, WorkspaceGuard, PermissionGuard)
- @Workspace() decorator for workspaceId
- Permission.WORKSPACE_MEMBER for write ops
- Permission.WORKSPACE_ANY for read ops
## Schema
```prisma
model AgentMemory {
id String @id @default(uuid()) @db.Uuid
workspaceId String @map("workspace_id") @db.Uuid
agentId String @map("agent_id")
key String
value Json
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@unique([workspaceId, agentId, key])
@@index([workspaceId])
@@index([agentId])
@@map("agent_memories")
}
```
## Progress
- [ ] Schema
- [ ] Module files
- [ ] app.module.ts
- [ ] Migration
- [ ] lint/build
- [ ] Commit

View File

@@ -0,0 +1,48 @@
# MS22 — Conversation Archive Module
## Objective
Implement ConversationArchive module: ingest OpenClaw session logs, store with vector embeddings for semantic search.
## Deliverables
1. ConversationArchive Prisma model
2. NestJS module at apps/api/src/conversation-archive/
3. Endpoints: ingest, search, list, get-by-id
4. Register in app.module.ts
5. Migrate, lint, build, commit
## Plan
- Add model to schema.prisma (end of file)
- Add relation to Workspace model
- Create module structure: dto/, service, controller, spec, module
- Use EmbeddingService from knowledge module (import KnowledgeModule or just PrismaModule + embed inline)
- Follow pattern: AuthGuard + WorkspaceGuard + PermissionGuard
- Endpoint prefix: conversations (maps to /api/conversations)
- Vector search: $queryRaw with <=> operator (cosine distance)
## Assumptions
- ASSUMPTION: Embedding is stored inline on ConversationArchive (not a separate table) — simpler and sufficient for this use case, matches MemoryEmbedding pattern
- ASSUMPTION: Import KnowledgeModule to reuse EmbeddingService (it exports it)
- ASSUMPTION: messageCount computed server-side from messages array length on ingest
- ASSUMPTION: Permission level WORKSPACE_MEMBER for ingest/search, WORKSPACE_ANY for list/get
## Progress
- [ ] Schema model
- [ ] Migration
- [ ] DTOs
- [ ] Service
- [ ] Controller
- [ ] Spec
- [ ] Module
- [ ] app.module.ts registration
- [ ] Lint + build
- [ ] Commit
## Risks
- EmbeddingService exports from knowledge.module — need to import KnowledgeModule
- Migration requires live DB (may need --skip-generate flag if no DB access)