import { PGlite } from '@electric-sql/pglite'; import { randomUUID } from 'node:crypto'; import type { StorageAdapter, StorageConfig } from '../types.js'; /* eslint-disable @typescript-eslint/no-explicit-any */ const COLLECTIONS = [ 'users', 'sessions', 'accounts', 'projects', 'missions', 'tasks', 'agents', 'conversations', 'messages', 'preferences', 'insights', 'skills', 'events', 'routing_rules', 'provider_credentials', 'agent_logs', 'teams', 'team_members', 'mission_tasks', 'tickets', 'summarization_jobs', 'appreciations', 'verifications', ] as const; function buildFilterClause(filter?: Record): { clause: string; params: unknown[]; } { if (!filter || Object.keys(filter).length === 0) return { clause: '', params: [] }; const conditions: string[] = []; const params: unknown[] = []; let paramIdx = 1; for (const [key, value] of Object.entries(filter)) { if (key === 'id') { conditions.push(`id = $${paramIdx.toString()}`); params.push(value); paramIdx++; } else { conditions.push(`data->>'${key}' = $${paramIdx.toString()}`); params.push(typeof value === 'object' ? JSON.stringify(value) : value); paramIdx++; } } return { clause: ` WHERE ${conditions.join(' AND ')}`, params }; } type PgClient = PGlite | { query: PGlite['query'] }; async function pgCreate>( pg: PgClient, collection: string, data: T, ): Promise { const id = (data as any).id ?? randomUUID(); const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id')); await pg.query(`INSERT INTO ${collection} (id, data) VALUES ($1, $2::jsonb)`, [ id, JSON.stringify(rest), ]); return { ...data, id } as T & { id: string }; } async function pgRead>( pg: PgClient, collection: string, id: string, ): Promise { const result = await pg.query<{ id: string; data: Record }>( `SELECT id, data FROM ${collection} WHERE id = $1`, [id], ); const row = result.rows[0]; if (!row) return null; return { id: row.id, ...(row.data as object) } as unknown as T; } async function pgUpdate( pg: PgClient, collection: string, id: string, data: Record, ): Promise { const existing = await pg.query<{ data: Record }>( `SELECT data FROM ${collection} WHERE id = $1`, [id], ); const row = existing.rows[0]; if (!row) return false; const merged = { ...(row.data as object), ...data }; const result = await pg.query( `UPDATE ${collection} SET data = $1::jsonb, updated_at = now() WHERE id = $2`, [JSON.stringify(merged), id], ); return (result.affectedRows ?? 0) > 0; } async function pgDelete(pg: PgClient, collection: string, id: string): Promise { const result = await pg.query(`DELETE FROM ${collection} WHERE id = $1`, [id]); return (result.affectedRows ?? 0) > 0; } async function pgFind>( pg: PgClient, collection: string, filter?: Record, opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, ): Promise { const { clause, params } = buildFilterClause(filter); let paramIdx = params.length + 1; let query = `SELECT id, data FROM ${collection}${clause}`; if (opts?.orderBy) { const dir = opts.order === 'desc' ? 'DESC' : 'ASC'; const col = opts.orderBy === 'id' ? 'id' : opts.orderBy === 'created_at' || opts.orderBy === 'updated_at' ? opts.orderBy : `data->>'${opts.orderBy}'`; query += ` ORDER BY ${col} ${dir}`; } if (opts?.limit !== undefined) { query += ` LIMIT $${paramIdx.toString()}`; params.push(opts.limit); paramIdx++; } if (opts?.offset !== undefined) { query += ` OFFSET $${paramIdx.toString()}`; params.push(opts.offset); paramIdx++; } const result = await pg.query<{ id: string; data: Record }>(query, params); return result.rows.map((row) => ({ id: row.id, ...(row.data as object) }) as unknown as T); } async function pgCount( pg: PgClient, collection: string, filter?: Record, ): Promise { const { clause, params } = buildFilterClause(filter); const result = await pg.query<{ count: string }>( `SELECT COUNT(*) as count FROM ${collection}${clause}`, params, ); return parseInt(result.rows[0]?.count ?? '0', 10); } export class PgliteAdapter implements StorageAdapter { readonly name = 'pglite'; private pg: PGlite; constructor(config: Extract) { this.pg = new PGlite(config.dataDir); } async create>( collection: string, data: T, ): Promise { return pgCreate(this.pg, collection, data); } async read>(collection: string, id: string): Promise { return pgRead(this.pg, collection, id); } async update(collection: string, id: string, data: Record): Promise { return pgUpdate(this.pg, collection, id, data); } async delete(collection: string, id: string): Promise { return pgDelete(this.pg, collection, id); } async find>( collection: string, filter?: Record, opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, ): Promise { return pgFind(this.pg, collection, filter, opts); } async findOne>( collection: string, filter: Record, ): Promise { const results = await this.find(collection, filter, { limit: 1 }); return results[0] ?? null; } async count(collection: string, filter?: Record): Promise { return pgCount(this.pg, collection, filter); } async transaction(fn: (tx: StorageAdapter) => Promise): Promise { return this.pg.transaction(async (tx) => { const txAdapter = new PgliteTxAdapter(tx as unknown as PgClient); return fn(txAdapter); }); } async migrate(): Promise { for (const name of COLLECTIONS) { await this.pg.query(` CREATE TABLE IF NOT EXISTS ${name} ( id TEXT PRIMARY KEY, data JSONB NOT NULL DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) `); } } async close(): Promise { await this.pg.close(); } } /** * Transaction wrapper that delegates to the PGlite transaction connection. */ class PgliteTxAdapter implements StorageAdapter { readonly name = 'pglite'; private pg: PgClient; constructor(pg: PgClient) { this.pg = pg; } async create>( collection: string, data: T, ): Promise { return pgCreate(this.pg, collection, data); } async read>(collection: string, id: string): Promise { return pgRead(this.pg, collection, id); } async update(collection: string, id: string, data: Record): Promise { return pgUpdate(this.pg, collection, id, data); } async delete(collection: string, id: string): Promise { return pgDelete(this.pg, collection, id); } async find>( collection: string, filter?: Record, opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, ): Promise { return pgFind(this.pg, collection, filter, opts); } async findOne>( collection: string, filter: Record, ): Promise { const results = await this.find(collection, filter, { limit: 1 }); return results[0] ?? null; } async count(collection: string, filter?: Record): Promise { return pgCount(this.pg, collection, filter); } async transaction(fn: (tx: StorageAdapter) => Promise): Promise { // Already inside a transaction — run directly return fn(this); } async migrate(): Promise { // No-op inside transaction } async close(): Promise { // No-op inside transaction } }