refactor(storage): replace better-sqlite3 with PGlite adapter (#378)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful

This commit was merged in pull request #378.
This commit is contained in:
2026-04-04 21:58:14 +00:00
parent 30c0fb1308
commit c0d0fd44b7
13 changed files with 356 additions and 326 deletions

View File

@@ -0,0 +1,290 @@
import { PGlite } from '@electric-sql/pglite';
import { randomUUID } from 'node:crypto';
import type { StorageAdapter, StorageConfig } from '../types.js';
/* eslint-disable @typescript-eslint/no-explicit-any */
const COLLECTIONS = [
'users',
'sessions',
'accounts',
'projects',
'missions',
'tasks',
'agents',
'conversations',
'messages',
'preferences',
'insights',
'skills',
'events',
'routing_rules',
'provider_credentials',
'agent_logs',
'teams',
'team_members',
'mission_tasks',
'tickets',
'summarization_jobs',
'appreciations',
'verifications',
] as const;
function buildFilterClause(filter?: Record<string, unknown>): {
clause: string;
params: unknown[];
} {
if (!filter || Object.keys(filter).length === 0) return { clause: '', params: [] };
const conditions: string[] = [];
const params: unknown[] = [];
let paramIdx = 1;
for (const [key, value] of Object.entries(filter)) {
if (key === 'id') {
conditions.push(`id = $${paramIdx.toString()}`);
params.push(value);
paramIdx++;
} else {
conditions.push(`data->>'${key}' = $${paramIdx.toString()}`);
params.push(typeof value === 'object' ? JSON.stringify(value) : value);
paramIdx++;
}
}
return { clause: ` WHERE ${conditions.join(' AND ')}`, params };
}
type PgClient = PGlite | { query: PGlite['query'] };
async function pgCreate<T extends Record<string, unknown>>(
pg: PgClient,
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = (data as any).id ?? randomUUID();
const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id'));
await pg.query(`INSERT INTO ${collection} (id, data) VALUES ($1, $2::jsonb)`, [
id,
JSON.stringify(rest),
]);
return { ...data, id } as T & { id: string };
}
async function pgRead<T extends Record<string, unknown>>(
pg: PgClient,
collection: string,
id: string,
): Promise<T | null> {
const result = await pg.query<{ id: string; data: Record<string, unknown> }>(
`SELECT id, data FROM ${collection} WHERE id = $1`,
[id],
);
const row = result.rows[0];
if (!row) return null;
return { id: row.id, ...(row.data as object) } as unknown as T;
}
async function pgUpdate(
pg: PgClient,
collection: string,
id: string,
data: Record<string, unknown>,
): Promise<boolean> {
const existing = await pg.query<{ data: Record<string, unknown> }>(
`SELECT data FROM ${collection} WHERE id = $1`,
[id],
);
const row = existing.rows[0];
if (!row) return false;
const merged = { ...(row.data as object), ...data };
const result = await pg.query(
`UPDATE ${collection} SET data = $1::jsonb, updated_at = now() WHERE id = $2`,
[JSON.stringify(merged), id],
);
return (result.affectedRows ?? 0) > 0;
}
async function pgDelete(pg: PgClient, collection: string, id: string): Promise<boolean> {
const result = await pg.query(`DELETE FROM ${collection} WHERE id = $1`, [id]);
return (result.affectedRows ?? 0) > 0;
}
async function pgFind<T extends Record<string, unknown>>(
pg: PgClient,
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const { clause, params } = buildFilterClause(filter);
let paramIdx = params.length + 1;
let query = `SELECT id, data FROM ${collection}${clause}`;
if (opts?.orderBy) {
const dir = opts.order === 'desc' ? 'DESC' : 'ASC';
const col =
opts.orderBy === 'id'
? 'id'
: opts.orderBy === 'created_at' || opts.orderBy === 'updated_at'
? opts.orderBy
: `data->>'${opts.orderBy}'`;
query += ` ORDER BY ${col} ${dir}`;
}
if (opts?.limit !== undefined) {
query += ` LIMIT $${paramIdx.toString()}`;
params.push(opts.limit);
paramIdx++;
}
if (opts?.offset !== undefined) {
query += ` OFFSET $${paramIdx.toString()}`;
params.push(opts.offset);
paramIdx++;
}
const result = await pg.query<{ id: string; data: Record<string, unknown> }>(query, params);
return result.rows.map((row) => ({ id: row.id, ...(row.data as object) }) as unknown as T);
}
async function pgCount(
pg: PgClient,
collection: string,
filter?: Record<string, unknown>,
): Promise<number> {
const { clause, params } = buildFilterClause(filter);
const result = await pg.query<{ count: string }>(
`SELECT COUNT(*) as count FROM ${collection}${clause}`,
params,
);
return parseInt(result.rows[0]?.count ?? '0', 10);
}
export class PgliteAdapter implements StorageAdapter {
readonly name = 'pglite';
private pg: PGlite;
constructor(config: Extract<StorageConfig, { type: 'pglite' }>) {
this.pg = new PGlite(config.dataDir);
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
return pgCreate(this.pg, collection, data);
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
return pgRead(this.pg, collection, id);
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
return pgUpdate(this.pg, collection, id, data);
}
async delete(collection: string, id: string): Promise<boolean> {
return pgDelete(this.pg, collection, id);
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
return pgFind(this.pg, collection, filter, opts);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
return pgCount(this.pg, collection, filter);
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return this.pg.transaction(async (tx) => {
const txAdapter = new PgliteTxAdapter(tx as unknown as PgClient);
return fn(txAdapter);
});
}
async migrate(): Promise<void> {
for (const name of COLLECTIONS) {
await this.pg.query(`
CREATE TABLE IF NOT EXISTS ${name} (
id TEXT PRIMARY KEY,
data JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
`);
}
}
async close(): Promise<void> {
await this.pg.close();
}
}
/**
* Transaction wrapper that delegates to the PGlite transaction connection.
*/
class PgliteTxAdapter implements StorageAdapter {
readonly name = 'pglite';
private pg: PgClient;
constructor(pg: PgClient) {
this.pg = pg;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
return pgCreate(this.pg, collection, data);
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
return pgRead(this.pg, collection, id);
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
return pgUpdate(this.pg, collection, id, data);
}
async delete(collection: string, id: string): Promise<boolean> {
return pgDelete(this.pg, collection, id);
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
return pgFind(this.pg, collection, filter, opts);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
return pgCount(this.pg, collection, filter);
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
// Already inside a transaction — run directly
return fn(this);
}
async migrate(): Promise<void> {
// No-op inside transaction
}
async close(): Promise<void> {
// No-op inside transaction
}
}