Files
stack/packages/storage/src/adapters/postgres.ts
2026-04-02 20:44:10 -05:00

253 lines
8.0 KiB
TypeScript

import {
createDb,
runMigrations,
eq,
and,
asc,
desc,
sql,
type Db,
type DbHandle,
} from '@mosaic/db';
import * as schema from '@mosaic/db';
import type { StorageAdapter, StorageConfig } from '../types.js';
/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* Maps collection name → Drizzle table object.
* Typed as `any` because the generic StorageAdapter interface erases table
* types — all runtime values are still strongly-typed Drizzle table objects.
*/
const TABLE_MAP: Record<string, any> = {
users: schema.users,
sessions: schema.sessions,
accounts: schema.accounts,
verifications: schema.verifications,
teams: schema.teams,
team_members: schema.teamMembers,
projects: schema.projects,
missions: schema.missions,
tasks: schema.tasks,
mission_tasks: schema.missionTasks,
events: schema.events,
agents: schema.agents,
tickets: schema.tickets,
appreciations: schema.appreciations,
conversations: schema.conversations,
messages: schema.messages,
preferences: schema.preferences,
insights: schema.insights,
agent_logs: schema.agentLogs,
skills: schema.skills,
routing_rules: schema.routingRules,
provider_credentials: schema.providerCredentials,
summarization_jobs: schema.summarizationJobs,
};
function getTable(collection: string): any {
const table = TABLE_MAP[collection];
if (!table) throw new Error(`Unknown collection: ${collection}`);
return table;
}
function buildWhereClause(table: any, filter?: Record<string, unknown>) {
if (!filter || Object.keys(filter).length === 0) return undefined;
const conditions = Object.entries(filter).map(([key, value]) => {
const column = table[key];
if (!column) throw new Error(`Unknown column "${key}" on table`);
return eq(column, value);
});
return conditions.length === 1 ? conditions[0]! : and(...conditions);
}
export class PostgresAdapter implements StorageAdapter {
readonly name = 'postgres';
private handle: DbHandle;
private db: Db;
private url: string;
constructor(config: Extract<StorageConfig, { type: 'postgres' }>) {
this.url = config.url;
this.handle = createDb(config.url);
this.db = this.handle.db;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const table = getTable(collection);
const [row] = await (this.db as any).insert(table).values(data).returning();
return row as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const table = getTable(collection);
const [row] = await (this.db as any).select().from(table).where(eq(table.id, id));
return (row as T) ?? null;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const table = getTable(collection);
const result = await (this.db as any)
.update(table)
.set(data)
.where(eq(table.id, id))
.returning();
return result.length > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const table = getTable(collection);
const result = await (this.db as any).delete(table).where(eq(table.id, id)).returning();
return result.length > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const table = getTable(collection);
let query = (this.db as any).select().from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
if (opts?.orderBy) {
const col = table[opts.orderBy];
if (col) {
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
}
}
if (opts?.limit) query = query.limit(opts.limit);
if (opts?.offset) query = query.offset(opts.offset);
return (await query) as T[];
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const table = getTable(collection);
let query = (this.db as any).select({ count: sql<number>`count(*)::int` }).from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
const [row] = await query;
return (row as any)?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return (this.db as any).transaction(async (drizzleTx: any) => {
const txAdapter = new PostgresTxAdapter(drizzleTx, this.url);
return fn(txAdapter);
});
}
async migrate(): Promise<void> {
await runMigrations(this.url);
}
async close(): Promise<void> {
await this.handle.close();
}
}
/**
* Thin transaction wrapper — delegates to the Drizzle transaction object
* instead of the top-level db handle.
*/
class PostgresTxAdapter implements StorageAdapter {
readonly name = 'postgres';
private tx: any;
private url: string;
constructor(tx: any, url: string) {
this.tx = tx;
this.url = url;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const table = getTable(collection);
const [row] = await this.tx.insert(table).values(data).returning();
return row as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const table = getTable(collection);
const [row] = await this.tx.select().from(table).where(eq(table.id, id));
return (row as T) ?? null;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const table = getTable(collection);
const result = await this.tx.update(table).set(data).where(eq(table.id, id)).returning();
return result.length > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const table = getTable(collection);
const result = await this.tx.delete(table).where(eq(table.id, id)).returning();
return result.length > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const table = getTable(collection);
let query = this.tx.select().from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
if (opts?.orderBy) {
const col = table[opts.orderBy];
if (col) {
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
}
}
if (opts?.limit) query = query.limit(opts.limit);
if (opts?.offset) query = query.offset(opts.offset);
return (await query) as T[];
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const table = getTable(collection);
let query = this.tx.select({ count: sql<number>`count(*)::int` }).from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
const [row] = await query;
return (row as any)?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return this.tx.transaction(async (nestedTx: any) => {
const nested = new PostgresTxAdapter(nestedTx, this.url);
return fn(nested);
});
}
async migrate(): Promise<void> {
await runMigrations(this.url);
}
async close(): Promise<void> {
// No-op inside a transaction
}
}