feat(#2): Implement PostgreSQL 17 + pgvector database schema
Establishes multi-tenant database layer with vector similarity search for AI-powered memory features. Includes Docker infrastructure, Prisma ORM integration, NestJS services, and shared types across the monorepo. Key changes: - Docker: PostgreSQL 17 + pgvector v0.7.4, Valkey cache - Schema: 8 models (User, Workspace, Task, Event, Project, ActivityLog, MemoryEmbedding) with RLS preparation - NestJS: PrismaModule, DatabaseModule, EmbeddingsService - Shared: Type-safe enums, constants, and database types Fixes #2 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,15 @@
|
||||
import { Controller, Get } from "@nestjs/common";
|
||||
import { AppService } from "./app.service";
|
||||
import { PrismaService } from "./prisma/prisma.service";
|
||||
import type { ApiResponse, HealthStatus } from "@mosaic/shared";
|
||||
import { successResponse } from "@mosaic/shared";
|
||||
|
||||
@Controller()
|
||||
export class AppController {
|
||||
constructor(private readonly appService: AppService) {}
|
||||
constructor(
|
||||
private readonly appService: AppService,
|
||||
private readonly prisma: PrismaService,
|
||||
) {}
|
||||
|
||||
@Get()
|
||||
getHello(): string {
|
||||
@@ -13,10 +17,25 @@ export class AppController {
|
||||
}
|
||||
|
||||
@Get("health")
|
||||
getHealth(): ApiResponse<HealthStatus> {
|
||||
async getHealth(): Promise<ApiResponse<HealthStatus>> {
|
||||
const dbHealthy = await this.prisma.isHealthy();
|
||||
const dbInfo = await this.prisma.getConnectionInfo();
|
||||
|
||||
const overallStatus = dbHealthy ? "healthy" : "degraded";
|
||||
const packageVersion = process.env.npm_package_version;
|
||||
|
||||
return successResponse({
|
||||
status: "healthy",
|
||||
status: overallStatus,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(packageVersion && { version: packageVersion }),
|
||||
checks: {
|
||||
database: {
|
||||
status: dbHealthy ? "healthy" : "unhealthy",
|
||||
message: dbInfo.connected
|
||||
? `Connected to ${dbInfo.database} (${dbInfo.version})`
|
||||
: "Database connection failed",
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { AppController } from "./app.controller";
|
||||
import { AppService } from "./app.service";
|
||||
import { PrismaModule } from "./prisma/prisma.module";
|
||||
import { DatabaseModule } from "./database/database.module";
|
||||
|
||||
@Module({
|
||||
imports: [],
|
||||
imports: [PrismaModule, DatabaseModule],
|
||||
controllers: [AppController],
|
||||
providers: [AppService],
|
||||
})
|
||||
|
||||
12
apps/api/src/database/database.module.ts
Normal file
12
apps/api/src/database/database.module.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { EmbeddingsService } from "./embeddings.service";
|
||||
|
||||
/**
|
||||
* Database utilities module
|
||||
* Provides services for specialized database operations
|
||||
*/
|
||||
@Module({
|
||||
providers: [EmbeddingsService],
|
||||
exports: [EmbeddingsService],
|
||||
})
|
||||
export class DatabaseModule {}
|
||||
262
apps/api/src/database/embeddings.service.ts
Normal file
262
apps/api/src/database/embeddings.service.ts
Normal file
@@ -0,0 +1,262 @@
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { EntityType } from "@prisma/client";
|
||||
import { EMBEDDING_DIMENSION } from "@mosaic/shared";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
|
||||
/**
|
||||
* Result from similarity search
|
||||
*/
|
||||
export interface SimilarEmbedding {
|
||||
id: string;
|
||||
content: string;
|
||||
similarity: number;
|
||||
entityType: EntityType | null;
|
||||
entityId: string | null;
|
||||
metadata: Record<string, unknown>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Service for managing vector embeddings using pgvector
|
||||
* Uses raw SQL for vector operations since Prisma doesn't support vector types natively
|
||||
*/
|
||||
@Injectable()
|
||||
export class EmbeddingsService {
|
||||
private readonly logger = new Logger(EmbeddingsService.name);
|
||||
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
/**
|
||||
* Validate that an embedding array contains only finite numbers
|
||||
* @param embedding Array to validate
|
||||
* @throws Error if validation fails
|
||||
*/
|
||||
private validateEmbedding(embedding: number[]): void {
|
||||
if (!Array.isArray(embedding)) {
|
||||
throw new Error("Embedding must be an array");
|
||||
}
|
||||
|
||||
if (
|
||||
!embedding.every((val) => typeof val === "number" && Number.isFinite(val))
|
||||
) {
|
||||
throw new Error("Embedding array must contain only finite numbers");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store an embedding vector for content
|
||||
* @param params Embedding parameters
|
||||
* @returns ID of the created embedding
|
||||
*/
|
||||
async storeEmbedding(params: {
|
||||
workspaceId: string;
|
||||
content: string;
|
||||
embedding: number[];
|
||||
entityType?: EntityType;
|
||||
entityId?: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
}): Promise<string> {
|
||||
const { workspaceId, content, embedding, entityType, entityId, metadata } =
|
||||
params;
|
||||
|
||||
// Validate embedding array
|
||||
this.validateEmbedding(embedding);
|
||||
|
||||
if (embedding.length !== EMBEDDING_DIMENSION) {
|
||||
throw new Error(
|
||||
`Invalid embedding dimension: expected EMBEDDING_DIMENSION, got ${embedding.length}`
|
||||
);
|
||||
}
|
||||
|
||||
const vectorString = `[${embedding.join(",")}]`;
|
||||
|
||||
try {
|
||||
const result = await this.prisma.$queryRaw<Array<{ id: string }>>`
|
||||
INSERT INTO memory_embeddings (
|
||||
id, workspace_id, content, embedding, entity_type, entity_id, metadata, created_at, updated_at
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(),
|
||||
${workspaceId}::uuid,
|
||||
${content},
|
||||
${vectorString}::vector,
|
||||
${entityType ?? null}::"EntityType",
|
||||
${entityId ?? null}::uuid,
|
||||
${JSON.stringify(metadata ?? {})}::jsonb,
|
||||
NOW(),
|
||||
NOW()
|
||||
)
|
||||
RETURNING id::text
|
||||
`;
|
||||
|
||||
const embeddingId = result[0]?.id;
|
||||
if (!embeddingId) {
|
||||
throw new Error("Failed to get embedding ID from insert result");
|
||||
}
|
||||
this.logger.debug(
|
||||
`Stored embedding ${embeddingId} for workspace ${workspaceId}`
|
||||
);
|
||||
return embeddingId;
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to store embedding", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find similar embeddings using cosine similarity
|
||||
* @param params Search parameters
|
||||
* @returns Array of similar embeddings sorted by similarity (descending)
|
||||
*/
|
||||
async findSimilar(params: {
|
||||
workspaceId: string;
|
||||
embedding: number[];
|
||||
limit?: number;
|
||||
threshold?: number;
|
||||
entityType?: EntityType;
|
||||
}): Promise<SimilarEmbedding[]> {
|
||||
const {
|
||||
workspaceId,
|
||||
embedding,
|
||||
limit = 10,
|
||||
threshold = 0.7,
|
||||
entityType,
|
||||
} = params;
|
||||
|
||||
// Validate embedding array
|
||||
this.validateEmbedding(embedding);
|
||||
|
||||
if (embedding.length !== EMBEDDING_DIMENSION) {
|
||||
throw new Error(
|
||||
`Invalid embedding dimension: expected EMBEDDING_DIMENSION, got ${embedding.length}`
|
||||
);
|
||||
}
|
||||
|
||||
const vectorString = `[${embedding.join(",")}]`;
|
||||
|
||||
try {
|
||||
let results: SimilarEmbedding[];
|
||||
|
||||
if (entityType) {
|
||||
results = await this.prisma.$queryRaw<SimilarEmbedding[]>`
|
||||
SELECT
|
||||
id::text,
|
||||
content,
|
||||
1 - (embedding <=> ${vectorString}::vector) as similarity,
|
||||
entity_type as "entityType",
|
||||
entity_id::text as "entityId",
|
||||
metadata
|
||||
FROM memory_embeddings
|
||||
WHERE workspace_id = ${workspaceId}::uuid
|
||||
AND embedding IS NOT NULL
|
||||
AND 1 - (embedding <=> ${vectorString}::vector) >= ${threshold}
|
||||
AND entity_type = ${entityType}::"EntityType"
|
||||
ORDER BY embedding <=> ${vectorString}::vector
|
||||
LIMIT ${limit}
|
||||
`;
|
||||
} else {
|
||||
results = await this.prisma.$queryRaw<SimilarEmbedding[]>`
|
||||
SELECT
|
||||
id::text,
|
||||
content,
|
||||
1 - (embedding <=> ${vectorString}::vector) as similarity,
|
||||
entity_type as "entityType",
|
||||
entity_id::text as "entityId",
|
||||
metadata
|
||||
FROM memory_embeddings
|
||||
WHERE workspace_id = ${workspaceId}::uuid
|
||||
AND embedding IS NOT NULL
|
||||
AND 1 - (embedding <=> ${vectorString}::vector) >= ${threshold}
|
||||
ORDER BY embedding <=> ${vectorString}::vector
|
||||
LIMIT ${limit}
|
||||
`;
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Found ${results.length} similar embeddings for workspace ${workspaceId}`
|
||||
);
|
||||
return results;
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to find similar embeddings", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete embeddings for a specific entity
|
||||
* @param params Entity identifiers
|
||||
* @returns Number of embeddings deleted
|
||||
*/
|
||||
async deleteByEntity(params: {
|
||||
workspaceId: string;
|
||||
entityType: EntityType;
|
||||
entityId: string;
|
||||
}): Promise<number> {
|
||||
const { workspaceId, entityType, entityId } = params;
|
||||
|
||||
try {
|
||||
const result = await this.prisma.$executeRaw`
|
||||
DELETE FROM memory_embeddings
|
||||
WHERE workspace_id = ${workspaceId}::uuid
|
||||
AND entity_type = ${entityType}::"EntityType"
|
||||
AND entity_id = ${entityId}::uuid
|
||||
`;
|
||||
|
||||
this.logger.debug(
|
||||
`Deleted ${result} embeddings for ${entityType}:${entityId} in workspace ${workspaceId}`
|
||||
);
|
||||
return result;
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to delete embeddings", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all embeddings for a workspace
|
||||
* @param workspaceId Workspace ID
|
||||
* @returns Number of embeddings deleted
|
||||
*/
|
||||
async deleteByWorkspace(workspaceId: string): Promise<number> {
|
||||
try {
|
||||
const result = await this.prisma.$executeRaw`
|
||||
DELETE FROM memory_embeddings
|
||||
WHERE workspace_id = ${workspaceId}::uuid
|
||||
`;
|
||||
|
||||
this.logger.debug(
|
||||
`Deleted ${result} embeddings for workspace ${workspaceId}`
|
||||
);
|
||||
return result;
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to delete workspace embeddings", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get embedding by ID
|
||||
* @param id Embedding ID
|
||||
* @returns Embedding or null if not found
|
||||
*/
|
||||
async getById(id: string): Promise<SimilarEmbedding | null> {
|
||||
try {
|
||||
const results = await this.prisma.$queryRaw<SimilarEmbedding[]>`
|
||||
SELECT
|
||||
id::text,
|
||||
content,
|
||||
0 as similarity,
|
||||
entity_type as "entityType",
|
||||
entity_id::text as "entityId",
|
||||
metadata
|
||||
FROM memory_embeddings
|
||||
WHERE id = ${id}::uuid
|
||||
LIMIT 1
|
||||
`;
|
||||
|
||||
return results.length > 0 ? (results[0] ?? null) : null;
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to get embedding ${id}`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
2
apps/api/src/database/index.ts
Normal file
2
apps/api/src/database/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from "./database.module";
|
||||
export * from "./embeddings.service";
|
||||
13
apps/api/src/prisma/prisma.module.ts
Normal file
13
apps/api/src/prisma/prisma.module.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Global, Module } from "@nestjs/common";
|
||||
import { PrismaService } from "./prisma.service";
|
||||
|
||||
/**
|
||||
* Global Prisma module providing database access throughout the application
|
||||
* Marked as @Global() so PrismaService is available in all modules without importing
|
||||
*/
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [PrismaService],
|
||||
exports: [PrismaService],
|
||||
})
|
||||
export class PrismaModule {}
|
||||
95
apps/api/src/prisma/prisma.service.ts
Normal file
95
apps/api/src/prisma/prisma.service.ts
Normal file
@@ -0,0 +1,95 @@
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
OnModuleDestroy,
|
||||
OnModuleInit,
|
||||
} from "@nestjs/common";
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
|
||||
/**
|
||||
* Prisma service that manages database connection lifecycle
|
||||
* Extends PrismaClient to provide connection management and health checks
|
||||
*/
|
||||
@Injectable()
|
||||
export class PrismaService
|
||||
extends PrismaClient
|
||||
implements OnModuleInit, OnModuleDestroy
|
||||
{
|
||||
private readonly logger = new Logger(PrismaService.name);
|
||||
|
||||
constructor() {
|
||||
super({
|
||||
log:
|
||||
process.env.NODE_ENV === "development"
|
||||
? ["query", "info", "warn", "error"]
|
||||
: ["error"],
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to database when NestJS module initializes
|
||||
*/
|
||||
async onModuleInit() {
|
||||
try {
|
||||
await this.$connect();
|
||||
this.logger.log("Database connection established");
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to connect to database", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from database when NestJS module is destroyed
|
||||
*/
|
||||
async onModuleDestroy() {
|
||||
await this.$disconnect();
|
||||
this.logger.log("Database connection closed");
|
||||
}
|
||||
|
||||
/**
|
||||
* Health check for database connectivity
|
||||
* @returns true if database is accessible, false otherwise
|
||||
*/
|
||||
async isHealthy(): Promise<boolean> {
|
||||
try {
|
||||
await this.$queryRaw`SELECT 1`;
|
||||
return true;
|
||||
} catch (error) {
|
||||
this.logger.error("Database health check failed", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get database connection info for debugging
|
||||
* @returns Connection status and basic info
|
||||
*/
|
||||
async getConnectionInfo(): Promise<{
|
||||
connected: boolean;
|
||||
database?: string;
|
||||
version?: string;
|
||||
}> {
|
||||
try {
|
||||
const result = await this.$queryRaw<
|
||||
Array<{ current_database: string; version: string }>
|
||||
>`
|
||||
SELECT current_database(), version()
|
||||
`;
|
||||
|
||||
if (result && result.length > 0 && result[0]) {
|
||||
const dbVersion = result[0].version?.split(" ")[0];
|
||||
return {
|
||||
connected: true,
|
||||
database: result[0].current_database,
|
||||
...(dbVersion && { version: dbVersion }),
|
||||
};
|
||||
}
|
||||
|
||||
return { connected: false };
|
||||
} catch (error) {
|
||||
this.logger.error("Failed to get connection info", error);
|
||||
return { connected: false };
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user