diff --git a/packages/storage/package.json b/packages/storage/package.json index 3b233ec..36d1661 100644 --- a/packages/storage/package.json +++ b/packages/storage/package.json @@ -16,6 +16,7 @@ "test": "vitest run --passWithNoTests" }, "dependencies": { + "@mosaic/db": "workspace:^", "@mosaic/types": "workspace:*" }, "devDependencies": { diff --git a/packages/storage/src/adapters/postgres.ts b/packages/storage/src/adapters/postgres.ts new file mode 100644 index 0000000..f6c69a5 --- /dev/null +++ b/packages/storage/src/adapters/postgres.ts @@ -0,0 +1,252 @@ +import { + createDb, + runMigrations, + eq, + and, + asc, + desc, + sql, + type Db, + type DbHandle, +} from '@mosaic/db'; +import * as schema from '@mosaic/db'; +import type { StorageAdapter, StorageConfig } from '../types.js'; + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +/** + * Maps collection name → Drizzle table object. + * Typed as `any` because the generic StorageAdapter interface erases table + * types — all runtime values are still strongly-typed Drizzle table objects. + */ +const TABLE_MAP: Record = { + users: schema.users, + sessions: schema.sessions, + accounts: schema.accounts, + verifications: schema.verifications, + teams: schema.teams, + team_members: schema.teamMembers, + projects: schema.projects, + missions: schema.missions, + tasks: schema.tasks, + mission_tasks: schema.missionTasks, + events: schema.events, + agents: schema.agents, + tickets: schema.tickets, + appreciations: schema.appreciations, + conversations: schema.conversations, + messages: schema.messages, + preferences: schema.preferences, + insights: schema.insights, + agent_logs: schema.agentLogs, + skills: schema.skills, + routing_rules: schema.routingRules, + provider_credentials: schema.providerCredentials, + summarization_jobs: schema.summarizationJobs, +}; + +function getTable(collection: string): any { + const table = TABLE_MAP[collection]; + if (!table) throw new Error(`Unknown collection: ${collection}`); + return table; +} + +function buildWhereClause(table: any, filter?: Record) { + if (!filter || Object.keys(filter).length === 0) return undefined; + const conditions = Object.entries(filter).map(([key, value]) => { + const column = table[key]; + if (!column) throw new Error(`Unknown column "${key}" on table`); + return eq(column, value); + }); + return conditions.length === 1 ? conditions[0]! : and(...conditions); +} + +export class PostgresAdapter implements StorageAdapter { + readonly name = 'postgres'; + private handle: DbHandle; + private db: Db; + private url: string; + + constructor(config: Extract) { + this.url = config.url; + this.handle = createDb(config.url); + this.db = this.handle.db; + } + + async create>( + collection: string, + data: T, + ): Promise { + const table = getTable(collection); + const [row] = await (this.db as any).insert(table).values(data).returning(); + return row as T & { id: string }; + } + + async read>(collection: string, id: string): Promise { + const table = getTable(collection); + const [row] = await (this.db as any).select().from(table).where(eq(table.id, id)); + return (row as T) ?? null; + } + + async update(collection: string, id: string, data: Record): Promise { + const table = getTable(collection); + const result = await (this.db as any) + .update(table) + .set(data) + .where(eq(table.id, id)) + .returning(); + return result.length > 0; + } + + async delete(collection: string, id: string): Promise { + const table = getTable(collection); + const result = await (this.db as any).delete(table).where(eq(table.id, id)).returning(); + return result.length > 0; + } + + async find>( + collection: string, + filter?: Record, + opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, + ): Promise { + const table = getTable(collection); + let query = (this.db as any).select().from(table); + const where = buildWhereClause(table, filter); + if (where) query = query.where(where); + if (opts?.orderBy) { + const col = table[opts.orderBy]; + if (col) { + query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col)); + } + } + if (opts?.limit) query = query.limit(opts.limit); + if (opts?.offset) query = query.offset(opts.offset); + return (await query) as T[]; + } + + async findOne>( + collection: string, + filter: Record, + ): Promise { + const results = await this.find(collection, filter, { limit: 1 }); + return results[0] ?? null; + } + + async count(collection: string, filter?: Record): Promise { + const table = getTable(collection); + let query = (this.db as any).select({ count: sql`count(*)::int` }).from(table); + const where = buildWhereClause(table, filter); + if (where) query = query.where(where); + const [row] = await query; + return (row as any)?.count ?? 0; + } + + async transaction(fn: (tx: StorageAdapter) => Promise): Promise { + return (this.db as any).transaction(async (drizzleTx: any) => { + const txAdapter = new PostgresTxAdapter(drizzleTx, this.url); + return fn(txAdapter); + }); + } + + async migrate(): Promise { + await runMigrations(this.url); + } + + async close(): Promise { + await this.handle.close(); + } +} + +/** + * Thin transaction wrapper — delegates to the Drizzle transaction object + * instead of the top-level db handle. + */ +class PostgresTxAdapter implements StorageAdapter { + readonly name = 'postgres'; + private tx: any; + private url: string; + + constructor(tx: any, url: string) { + this.tx = tx; + this.url = url; + } + + async create>( + collection: string, + data: T, + ): Promise { + const table = getTable(collection); + const [row] = await this.tx.insert(table).values(data).returning(); + return row as T & { id: string }; + } + + async read>(collection: string, id: string): Promise { + const table = getTable(collection); + const [row] = await this.tx.select().from(table).where(eq(table.id, id)); + return (row as T) ?? null; + } + + async update(collection: string, id: string, data: Record): Promise { + const table = getTable(collection); + const result = await this.tx.update(table).set(data).where(eq(table.id, id)).returning(); + return result.length > 0; + } + + async delete(collection: string, id: string): Promise { + const table = getTable(collection); + const result = await this.tx.delete(table).where(eq(table.id, id)).returning(); + return result.length > 0; + } + + async find>( + collection: string, + filter?: Record, + opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' }, + ): Promise { + const table = getTable(collection); + let query = this.tx.select().from(table); + const where = buildWhereClause(table, filter); + if (where) query = query.where(where); + if (opts?.orderBy) { + const col = table[opts.orderBy]; + if (col) { + query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col)); + } + } + if (opts?.limit) query = query.limit(opts.limit); + if (opts?.offset) query = query.offset(opts.offset); + return (await query) as T[]; + } + + async findOne>( + collection: string, + filter: Record, + ): Promise { + const results = await this.find(collection, filter, { limit: 1 }); + return results[0] ?? null; + } + + async count(collection: string, filter?: Record): Promise { + const table = getTable(collection); + let query = this.tx.select({ count: sql`count(*)::int` }).from(table); + const where = buildWhereClause(table, filter); + if (where) query = query.where(where); + const [row] = await query; + return (row as any)?.count ?? 0; + } + + async transaction(fn: (tx: StorageAdapter) => Promise): Promise { + return this.tx.transaction(async (nestedTx: any) => { + const nested = new PostgresTxAdapter(nestedTx, this.url); + return fn(nested); + }); + } + + async migrate(): Promise { + await runMigrations(this.url); + } + + async close(): Promise { + // No-op inside a transaction + } +} diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 52f6448..6974b07 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -1,2 +1,11 @@ export type { StorageAdapter, StorageConfig } from './types.js'; export { createStorageAdapter, registerStorageAdapter } from './factory.js'; +export { PostgresAdapter } from './adapters/postgres.js'; + +import { registerStorageAdapter } from './factory.js'; +import { PostgresAdapter } from './adapters/postgres.js'; +import type { StorageConfig } from './types.js'; + +registerStorageAdapter('postgres', (config: StorageConfig) => { + return new PostgresAdapter(config as Extract); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bf7d0b9..74c8296 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -570,6 +570,22 @@ importers: specifier: ^2.0.0 version: 2.1.9(@types/node@24.12.0)(jsdom@29.0.0(@noble/hashes@2.0.1))(lightningcss@1.31.1) + packages/storage: + dependencies: + '@mosaic/db': + specifier: workspace:^ + version: link:../db + '@mosaic/types': + specifier: workspace:* + version: link:../types + devDependencies: + typescript: + specifier: ^5.8.0 + version: 5.9.3 + vitest: + specifier: ^2.0.0 + version: 2.1.9(@types/node@24.12.0)(jsdom@29.0.0(@noble/hashes@2.0.1))(lightningcss@1.31.1) + packages/types: dependencies: class-transformer: