import { createDb, runMigrations, eq, and, asc, desc, sql, type Db, type DbHandle, } from '@mosaic/db'; import * as schema from '@mosaic/db'; import type { StorageAdapter, StorageConfig } from '../types.js'; /* eslint-disable @typescript-eslint/no-explicit-any */ /** * Maps collection name → Drizzle table object. * Typed as `any` because the generic StorageAdapter interface erases table * types — all runtime values are still strongly-typed Drizzle table objects. */ const TABLE_MAP: Record = { users: schema.users, sessions: schema.sessions, accounts: schema.accounts, verifications: schema.verifications, teams: schema.teams, team_members: schema.teamMembers, projects: schema.projects, missions: schema.missions, tasks: schema.tasks, mission_tasks: schema.missionTasks, events: schema.events, agents: schema.agents, tickets: schema.tickets, appreciations: schema.appreciations, conversations: schema.conversations, messages: schema.messages, preferences: schema.preferences, insights: schema.insights, agent_logs: schema.agentLogs, skills: schema.skills, routing_rules: schema.routingRules, provider_credentials: schema.providerCredentials, summarization_jobs: schema.summarizationJobs, }; function getTable(collection: string): any { const table = TABLE_MAP[collection]; if (!table) throw new Error(`Unknown collection: ${collection}`); return table; } function buildWhereClause(table: any, filter?: Record) { if (!filter || Object.keys(filter).length === 0) return undefined; const conditions = Object.entries(filter).map(([key, value]) => { const column = table[key]; if (!column) throw new Error(`Unknown column "${key}" on table`); return eq(column, value); }); return conditions.length === 1 ? conditions[0]! : and(...conditions); } export class PostgresAdapter implements StorageAdapter { readonly name = 'postgres'; private handle: DbHandle; private db: Db; private url: string; constructor(config: Extract) { this.url = config.url; this.handle = createDb(config.url); this.db = this.handle.db; } async create>( collection: string, data: T, ): Promise { const table = getTable(collection); const [row] = await (this.db as any).insert(table).values(data).returning(); return row as T & { id: string }; } async read>(collection: string, id: string): Promise { const table = getTable(collection); const [row] = await (this.db as any).select().from(table).where(eq(table.id, id)); return (row as T) ?? null; } async update(collection: string, id: string, data: Record): Promise { const table = getTable(collection); const result = await (this.db as any) .update(table) .set(data) .where(eq(table.id, id)) .returning(); return result.length > 0; } async delete(collection: string, id: string): Promise { const table = getTable(collection); const result = await (this.db as any).delete(table).where(eq(table.id, id)).returning(); return result.length > 0; } async find>( collection: string, filter?: Record, opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, ): Promise { const table = getTable(collection); let query = (this.db as any).select().from(table); const where = buildWhereClause(table, filter); if (where) query = query.where(where); if (opts?.orderBy) { const col = table[opts.orderBy]; if (col) { query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col)); } } if (opts?.limit) query = query.limit(opts.limit); if (opts?.offset) query = query.offset(opts.offset); return (await query) as T[]; } async findOne>( collection: string, filter: Record, ): Promise { const results = await this.find(collection, filter, { limit: 1 }); return results[0] ?? null; } async count(collection: string, filter?: Record): Promise { const table = getTable(collection); let query = (this.db as any).select({ count: sql`count(*)::int` }).from(table); const where = buildWhereClause(table, filter); if (where) query = query.where(where); const [row] = await query; return (row as any)?.count ?? 0; } async transaction(fn: (tx: StorageAdapter) => Promise): Promise { return (this.db as any).transaction(async (drizzleTx: any) => { const txAdapter = new PostgresTxAdapter(drizzleTx, this.url); return fn(txAdapter); }); } async migrate(): Promise { await runMigrations(this.url); } async close(): Promise { await this.handle.close(); } } /** * Thin transaction wrapper — delegates to the Drizzle transaction object * instead of the top-level db handle. */ class PostgresTxAdapter implements StorageAdapter { readonly name = 'postgres'; private tx: any; private url: string; constructor(tx: any, url: string) { this.tx = tx; this.url = url; } async create>( collection: string, data: T, ): Promise { const table = getTable(collection); const [row] = await this.tx.insert(table).values(data).returning(); return row as T & { id: string }; } async read>(collection: string, id: string): Promise { const table = getTable(collection); const [row] = await this.tx.select().from(table).where(eq(table.id, id)); return (row as T) ?? null; } async update(collection: string, id: string, data: Record): Promise { const table = getTable(collection); const result = await this.tx.update(table).set(data).where(eq(table.id, id)).returning(); return result.length > 0; } async delete(collection: string, id: string): Promise { const table = getTable(collection); const result = await this.tx.delete(table).where(eq(table.id, id)).returning(); return result.length > 0; } async find>( collection: string, filter?: Record, opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, ): Promise { const table = getTable(collection); let query = this.tx.select().from(table); const where = buildWhereClause(table, filter); if (where) query = query.where(where); if (opts?.orderBy) { const col = table[opts.orderBy]; if (col) { query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col)); } } if (opts?.limit) query = query.limit(opts.limit); if (opts?.offset) query = query.offset(opts.offset); return (await query) as T[]; } async findOne>( collection: string, filter: Record, ): Promise { const results = await this.find(collection, filter, { limit: 1 }); return results[0] ?? null; } async count(collection: string, filter?: Record): Promise { const table = getTable(collection); let query = this.tx.select({ count: sql`count(*)::int` }).from(table); const where = buildWhereClause(table, filter); if (where) query = query.where(where); const [row] = await query; return (row as any)?.count ?? 0; } async transaction(fn: (tx: StorageAdapter) => Promise): Promise { return this.tx.transaction(async (nestedTx: any) => { const nested = new PostgresTxAdapter(nestedTx, this.url); return fn(nested); }); } async migrate(): Promise { await runMigrations(this.url); } async close(): Promise { // No-op inside a transaction } }