feat(storage): implement Postgres adapter wrapping Drizzle + @mosaic/db
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
"test": "vitest run --passWithNoTests"
|
||||
},
|
||||
"dependencies": {
|
||||
"@mosaic/db": "workspace:^",
|
||||
"@mosaic/types": "workspace:*"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
252
packages/storage/src/adapters/postgres.ts
Normal file
252
packages/storage/src/adapters/postgres.ts
Normal file
@@ -0,0 +1,252 @@
|
||||
import {
|
||||
createDb,
|
||||
runMigrations,
|
||||
eq,
|
||||
and,
|
||||
asc,
|
||||
desc,
|
||||
sql,
|
||||
type Db,
|
||||
type DbHandle,
|
||||
} from '@mosaic/db';
|
||||
import * as schema from '@mosaic/db';
|
||||
import type { StorageAdapter, StorageConfig } from '../types.js';
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
|
||||
/**
|
||||
* Maps collection name → Drizzle table object.
|
||||
* Typed as `any` because the generic StorageAdapter interface erases table
|
||||
* types — all runtime values are still strongly-typed Drizzle table objects.
|
||||
*/
|
||||
const TABLE_MAP: Record<string, any> = {
|
||||
users: schema.users,
|
||||
sessions: schema.sessions,
|
||||
accounts: schema.accounts,
|
||||
verifications: schema.verifications,
|
||||
teams: schema.teams,
|
||||
team_members: schema.teamMembers,
|
||||
projects: schema.projects,
|
||||
missions: schema.missions,
|
||||
tasks: schema.tasks,
|
||||
mission_tasks: schema.missionTasks,
|
||||
events: schema.events,
|
||||
agents: schema.agents,
|
||||
tickets: schema.tickets,
|
||||
appreciations: schema.appreciations,
|
||||
conversations: schema.conversations,
|
||||
messages: schema.messages,
|
||||
preferences: schema.preferences,
|
||||
insights: schema.insights,
|
||||
agent_logs: schema.agentLogs,
|
||||
skills: schema.skills,
|
||||
routing_rules: schema.routingRules,
|
||||
provider_credentials: schema.providerCredentials,
|
||||
summarization_jobs: schema.summarizationJobs,
|
||||
};
|
||||
|
||||
function getTable(collection: string): any {
|
||||
const table = TABLE_MAP[collection];
|
||||
if (!table) throw new Error(`Unknown collection: ${collection}`);
|
||||
return table;
|
||||
}
|
||||
|
||||
function buildWhereClause(table: any, filter?: Record<string, unknown>) {
|
||||
if (!filter || Object.keys(filter).length === 0) return undefined;
|
||||
const conditions = Object.entries(filter).map(([key, value]) => {
|
||||
const column = table[key];
|
||||
if (!column) throw new Error(`Unknown column "${key}" on table`);
|
||||
return eq(column, value);
|
||||
});
|
||||
return conditions.length === 1 ? conditions[0]! : and(...conditions);
|
||||
}
|
||||
|
||||
export class PostgresAdapter implements StorageAdapter {
|
||||
readonly name = 'postgres';
|
||||
private handle: DbHandle;
|
||||
private db: Db;
|
||||
private url: string;
|
||||
|
||||
constructor(config: Extract<StorageConfig, { type: 'postgres' }>) {
|
||||
this.url = config.url;
|
||||
this.handle = createDb(config.url);
|
||||
this.db = this.handle.db;
|
||||
}
|
||||
|
||||
async create<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
data: T,
|
||||
): Promise<T & { id: string }> {
|
||||
const table = getTable(collection);
|
||||
const [row] = await (this.db as any).insert(table).values(data).returning();
|
||||
return row as T & { id: string };
|
||||
}
|
||||
|
||||
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
|
||||
const table = getTable(collection);
|
||||
const [row] = await (this.db as any).select().from(table).where(eq(table.id, id));
|
||||
return (row as T) ?? null;
|
||||
}
|
||||
|
||||
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
|
||||
const table = getTable(collection);
|
||||
const result = await (this.db as any)
|
||||
.update(table)
|
||||
.set(data)
|
||||
.where(eq(table.id, id))
|
||||
.returning();
|
||||
return result.length > 0;
|
||||
}
|
||||
|
||||
async delete(collection: string, id: string): Promise<boolean> {
|
||||
const table = getTable(collection);
|
||||
const result = await (this.db as any).delete(table).where(eq(table.id, id)).returning();
|
||||
return result.length > 0;
|
||||
}
|
||||
|
||||
async find<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
filter?: Record<string, unknown>,
|
||||
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
|
||||
): Promise<T[]> {
|
||||
const table = getTable(collection);
|
||||
let query = (this.db as any).select().from(table);
|
||||
const where = buildWhereClause(table, filter);
|
||||
if (where) query = query.where(where);
|
||||
if (opts?.orderBy) {
|
||||
const col = table[opts.orderBy];
|
||||
if (col) {
|
||||
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
|
||||
}
|
||||
}
|
||||
if (opts?.limit) query = query.limit(opts.limit);
|
||||
if (opts?.offset) query = query.offset(opts.offset);
|
||||
return (await query) as T[];
|
||||
}
|
||||
|
||||
async findOne<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
filter: Record<string, unknown>,
|
||||
): Promise<T | null> {
|
||||
const results = await this.find<T>(collection, filter, { limit: 1 });
|
||||
return results[0] ?? null;
|
||||
}
|
||||
|
||||
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
|
||||
const table = getTable(collection);
|
||||
let query = (this.db as any).select({ count: sql<number>`count(*)::int` }).from(table);
|
||||
const where = buildWhereClause(table, filter);
|
||||
if (where) query = query.where(where);
|
||||
const [row] = await query;
|
||||
return (row as any)?.count ?? 0;
|
||||
}
|
||||
|
||||
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
|
||||
return (this.db as any).transaction(async (drizzleTx: any) => {
|
||||
const txAdapter = new PostgresTxAdapter(drizzleTx, this.url);
|
||||
return fn(txAdapter);
|
||||
});
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
await runMigrations(this.url);
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
await this.handle.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thin transaction wrapper — delegates to the Drizzle transaction object
|
||||
* instead of the top-level db handle.
|
||||
*/
|
||||
class PostgresTxAdapter implements StorageAdapter {
|
||||
readonly name = 'postgres';
|
||||
private tx: any;
|
||||
private url: string;
|
||||
|
||||
constructor(tx: any, url: string) {
|
||||
this.tx = tx;
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
async create<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
data: T,
|
||||
): Promise<T & { id: string }> {
|
||||
const table = getTable(collection);
|
||||
const [row] = await this.tx.insert(table).values(data).returning();
|
||||
return row as T & { id: string };
|
||||
}
|
||||
|
||||
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
|
||||
const table = getTable(collection);
|
||||
const [row] = await this.tx.select().from(table).where(eq(table.id, id));
|
||||
return (row as T) ?? null;
|
||||
}
|
||||
|
||||
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
|
||||
const table = getTable(collection);
|
||||
const result = await this.tx.update(table).set(data).where(eq(table.id, id)).returning();
|
||||
return result.length > 0;
|
||||
}
|
||||
|
||||
async delete(collection: string, id: string): Promise<boolean> {
|
||||
const table = getTable(collection);
|
||||
const result = await this.tx.delete(table).where(eq(table.id, id)).returning();
|
||||
return result.length > 0;
|
||||
}
|
||||
|
||||
async find<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
filter?: Record<string, unknown>,
|
||||
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
|
||||
): Promise<T[]> {
|
||||
const table = getTable(collection);
|
||||
let query = this.tx.select().from(table);
|
||||
const where = buildWhereClause(table, filter);
|
||||
if (where) query = query.where(where);
|
||||
if (opts?.orderBy) {
|
||||
const col = table[opts.orderBy];
|
||||
if (col) {
|
||||
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
|
||||
}
|
||||
}
|
||||
if (opts?.limit) query = query.limit(opts.limit);
|
||||
if (opts?.offset) query = query.offset(opts.offset);
|
||||
return (await query) as T[];
|
||||
}
|
||||
|
||||
async findOne<T extends Record<string, unknown>>(
|
||||
collection: string,
|
||||
filter: Record<string, unknown>,
|
||||
): Promise<T | null> {
|
||||
const results = await this.find<T>(collection, filter, { limit: 1 });
|
||||
return results[0] ?? null;
|
||||
}
|
||||
|
||||
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
|
||||
const table = getTable(collection);
|
||||
let query = this.tx.select({ count: sql<number>`count(*)::int` }).from(table);
|
||||
const where = buildWhereClause(table, filter);
|
||||
if (where) query = query.where(where);
|
||||
const [row] = await query;
|
||||
return (row as any)?.count ?? 0;
|
||||
}
|
||||
|
||||
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
|
||||
return this.tx.transaction(async (nestedTx: any) => {
|
||||
const nested = new PostgresTxAdapter(nestedTx, this.url);
|
||||
return fn(nested);
|
||||
});
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
await runMigrations(this.url);
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
// No-op inside a transaction
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,11 @@
|
||||
export type { StorageAdapter, StorageConfig } from './types.js';
|
||||
export { createStorageAdapter, registerStorageAdapter } from './factory.js';
|
||||
export { PostgresAdapter } from './adapters/postgres.js';
|
||||
|
||||
import { registerStorageAdapter } from './factory.js';
|
||||
import { PostgresAdapter } from './adapters/postgres.js';
|
||||
import type { StorageConfig } from './types.js';
|
||||
|
||||
registerStorageAdapter('postgres', (config: StorageConfig) => {
|
||||
return new PostgresAdapter(config as Extract<StorageConfig, { type: 'postgres' }>);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user