feat(storage): implement SQLite adapter with better-sqlite3

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jarvis
2026-04-02 20:51:13 -05:00
parent 7bb878718d
commit 25383ea645
5 changed files with 768 additions and 15 deletions

View File

@@ -17,9 +17,11 @@
},
"dependencies": {
"@mosaic/db": "workspace:^",
"@mosaic/types": "workspace:*"
"@mosaic/types": "workspace:*",
"better-sqlite3": "^12.8.0"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"typescript": "^5.8.0",
"vitest": "^2.0.0"
},

View File

@@ -0,0 +1,201 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { SqliteAdapter } from './sqlite.js';
describe('SqliteAdapter', () => {
let adapter: SqliteAdapter;
beforeEach(async () => {
adapter = new SqliteAdapter({ type: 'sqlite', path: ':memory:' });
await adapter.migrate();
});
afterEach(async () => {
await adapter.close();
});
describe('CRUD', () => {
it('creates and reads a record', async () => {
const created = await adapter.create('users', { name: 'Alice', email: 'alice@test.com' });
expect(created.id).toBeDefined();
expect(created.name).toBe('Alice');
const read = await adapter.read('users', created.id);
expect(read).not.toBeNull();
expect(read!.name).toBe('Alice');
expect(read!.email).toBe('alice@test.com');
});
it('returns null for non-existent record', async () => {
const result = await adapter.read('users', 'does-not-exist');
expect(result).toBeNull();
});
it('updates a record', async () => {
const created = await adapter.create('users', { name: 'Alice' });
const updated = await adapter.update('users', created.id, { name: 'Bob' });
expect(updated).toBe(true);
const read = await adapter.read('users', created.id);
expect(read!.name).toBe('Bob');
});
it('update returns false for non-existent record', async () => {
const result = await adapter.update('users', 'does-not-exist', { name: 'X' });
expect(result).toBe(false);
});
it('deletes a record', async () => {
const created = await adapter.create('users', { name: 'Alice' });
const deleted = await adapter.delete('users', created.id);
expect(deleted).toBe(true);
const read = await adapter.read('users', created.id);
expect(read).toBeNull();
});
it('delete returns false for non-existent record', async () => {
const result = await adapter.delete('users', 'does-not-exist');
expect(result).toBe(false);
});
});
describe('find', () => {
it('finds records with filter', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
await adapter.create('users', { name: 'Charlie', role: 'admin' });
const admins = await adapter.find('users', { role: 'admin' });
expect(admins).toHaveLength(2);
expect(admins.map((u) => u.name).sort()).toEqual(['Alice', 'Charlie']);
});
it('finds all records without filter', async () => {
await adapter.create('users', { name: 'Alice' });
await adapter.create('users', { name: 'Bob' });
const all = await adapter.find('users');
expect(all).toHaveLength(2);
});
it('supports limit and offset', async () => {
for (let i = 0; i < 5; i++) {
await adapter.create('users', { name: `User${i}`, idx: i });
}
const page = await adapter.find('users', undefined, {
limit: 2,
offset: 1,
orderBy: 'created_at',
});
expect(page).toHaveLength(2);
});
it('findOne returns first match', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
const found = await adapter.findOne('users', { role: 'user' });
expect(found).not.toBeNull();
expect(found!.name).toBe('Bob');
});
it('findOne returns null when no match', async () => {
const result = await adapter.findOne('users', { role: 'nonexistent' });
expect(result).toBeNull();
});
});
describe('count', () => {
it('counts all records', async () => {
await adapter.create('users', { name: 'Alice' });
await adapter.create('users', { name: 'Bob' });
const total = await adapter.count('users');
expect(total).toBe(2);
});
it('counts with filter', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
await adapter.create('users', { name: 'Charlie', role: 'admin' });
const adminCount = await adapter.count('users', { role: 'admin' });
expect(adminCount).toBe(2);
});
it('returns 0 for empty collection', async () => {
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
describe('transaction', () => {
it('commits on success', async () => {
await adapter.transaction(async (tx) => {
await tx.create('users', { name: 'Alice' });
await tx.create('users', { name: 'Bob' });
});
const count = await adapter.count('users');
expect(count).toBe(2);
});
it('rolls back on error', async () => {
await expect(
adapter.transaction(async (tx) => {
await tx.create('users', { name: 'Alice' });
throw new Error('rollback test');
}),
).rejects.toThrow('rollback test');
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
describe('migrate', () => {
it('creates all tables', async () => {
// migrate() was already called in beforeEach — verify tables exist
const collections = [
'users',
'sessions',
'accounts',
'projects',
'missions',
'tasks',
'agents',
'conversations',
'messages',
'preferences',
'insights',
'skills',
'events',
'routing_rules',
'provider_credentials',
'agent_logs',
'teams',
'team_members',
'mission_tasks',
'tickets',
'summarization_jobs',
'appreciations',
'verifications',
];
for (const collection of collections) {
// Should not throw
const count = await adapter.count(collection);
expect(count).toBe(0);
}
});
it('is idempotent', async () => {
await adapter.migrate();
await adapter.migrate();
// Should not throw
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
});

View File

@@ -0,0 +1,283 @@
import Database from 'better-sqlite3';
import { randomUUID } from 'node:crypto';
import type { StorageAdapter, StorageConfig } from '../types.js';
/* eslint-disable @typescript-eslint/no-explicit-any */
const COLLECTIONS = [
'users',
'sessions',
'accounts',
'projects',
'missions',
'tasks',
'agents',
'conversations',
'messages',
'preferences',
'insights',
'skills',
'events',
'routing_rules',
'provider_credentials',
'agent_logs',
'teams',
'team_members',
'mission_tasks',
'tickets',
'summarization_jobs',
'appreciations',
'verifications',
] as const;
function buildFilterClause(filter?: Record<string, unknown>): {
clause: string;
params: unknown[];
} {
if (!filter || Object.keys(filter).length === 0) return { clause: '', params: [] };
const conditions: string[] = [];
const params: unknown[] = [];
for (const [key, value] of Object.entries(filter)) {
if (key === 'id') {
conditions.push('id = ?');
params.push(value);
} else {
conditions.push(`json_extract(data_json, '$.${key}') = ?`);
params.push(typeof value === 'object' ? JSON.stringify(value) : value);
}
}
return { clause: ` WHERE ${conditions.join(' AND ')}`, params };
}
export class SqliteAdapter implements StorageAdapter {
readonly name = 'sqlite';
private db: Database.Database;
constructor(config: Extract<StorageConfig, { type: 'sqlite' }>) {
this.db = new Database(config.path);
this.db.pragma('journal_mode = WAL');
this.db.pragma('foreign_keys = ON');
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = (data as any).id ?? randomUUID();
const now = new Date().toISOString();
const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id'));
this.db
.prepare(
`INSERT INTO ${collection} (id, data_json, created_at, updated_at) VALUES (?, ?, ?, ?)`,
)
.run(id, JSON.stringify(rest), now, now);
return { ...data, id } as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const row = this.db.prepare(`SELECT * FROM ${collection} WHERE id = ?`).get(id) as any;
if (!row) return null;
return { id: row.id, ...JSON.parse(row.data_json as string) } as T;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const existing = this.db
.prepare(`SELECT data_json FROM ${collection} WHERE id = ?`)
.get(id) as any;
if (!existing) return false;
const merged = { ...JSON.parse(existing.data_json as string), ...data };
const now = new Date().toISOString();
const result = this.db
.prepare(`UPDATE ${collection} SET data_json = ?, updated_at = ? WHERE id = ?`)
.run(JSON.stringify(merged), now, id);
return result.changes > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const result = this.db.prepare(`DELETE FROM ${collection} WHERE id = ?`).run(id);
return result.changes > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const { clause, params } = buildFilterClause(filter);
let query = `SELECT * FROM ${collection}${clause}`;
if (opts?.orderBy) {
const dir = opts.order === 'desc' ? 'DESC' : 'ASC';
const col =
opts.orderBy === 'id' || opts.orderBy === 'created_at' || opts.orderBy === 'updated_at'
? opts.orderBy
: `json_extract(data_json, '$.${opts.orderBy}')`;
query += ` ORDER BY ${col} ${dir}`;
}
if (opts?.limit) {
query += ` LIMIT ?`;
params.push(opts.limit);
}
if (opts?.offset) {
query += ` OFFSET ?`;
params.push(opts.offset);
}
const rows = this.db.prepare(query).all(...params) as any[];
return rows.map((row) => ({ id: row.id, ...JSON.parse(row.data_json as string) }) as T);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const { clause, params } = buildFilterClause(filter);
const row = this.db
.prepare(`SELECT COUNT(*) as count FROM ${collection}${clause}`)
.get(...params) as any;
return row?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
const txAdapter = new SqliteTxAdapter(this.db);
this.db.exec('BEGIN');
try {
const result = await fn(txAdapter);
this.db.exec('COMMIT');
return result;
} catch (err) {
this.db.exec('ROLLBACK');
throw err;
}
}
async migrate(): Promise<void> {
const createTable = (name: string) =>
this.db.exec(`
CREATE TABLE IF NOT EXISTS ${name} (
id TEXT PRIMARY KEY,
data_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
`);
for (const collection of COLLECTIONS) {
createTable(collection);
}
}
async close(): Promise<void> {
this.db.close();
}
}
/**
* Transaction wrapper that uses the same db handle — better-sqlite3 transactions
* are connection-level, so all statements on the same Database instance within
* a db.transaction() callback participate in the transaction.
*/
class SqliteTxAdapter implements StorageAdapter {
readonly name = 'sqlite';
private db: Database.Database;
constructor(db: Database.Database) {
this.db = db;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = (data as any).id ?? randomUUID();
const now = new Date().toISOString();
const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id'));
this.db
.prepare(
`INSERT INTO ${collection} (id, data_json, created_at, updated_at) VALUES (?, ?, ?, ?)`,
)
.run(id, JSON.stringify(rest), now, now);
return { ...data, id } as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const row = this.db.prepare(`SELECT * FROM ${collection} WHERE id = ?`).get(id) as any;
if (!row) return null;
return { id: row.id, ...JSON.parse(row.data_json as string) } as T;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const existing = this.db
.prepare(`SELECT data_json FROM ${collection} WHERE id = ?`)
.get(id) as any;
if (!existing) return false;
const merged = { ...JSON.parse(existing.data_json as string), ...data };
const now = new Date().toISOString();
const result = this.db
.prepare(`UPDATE ${collection} SET data_json = ?, updated_at = ? WHERE id = ?`)
.run(JSON.stringify(merged), now, id);
return result.changes > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const result = this.db.prepare(`DELETE FROM ${collection} WHERE id = ?`).run(id);
return result.changes > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const { clause, params } = buildFilterClause(filter);
let query = `SELECT * FROM ${collection}${clause}`;
if (opts?.orderBy) {
const dir = opts.order === 'desc' ? 'DESC' : 'ASC';
const col =
opts.orderBy === 'id' || opts.orderBy === 'created_at' || opts.orderBy === 'updated_at'
? opts.orderBy
: `json_extract(data_json, '$.${opts.orderBy}')`;
query += ` ORDER BY ${col} ${dir}`;
}
if (opts?.limit) {
query += ` LIMIT ?`;
params.push(opts.limit);
}
if (opts?.offset) {
query += ` OFFSET ?`;
params.push(opts.offset);
}
const rows = this.db.prepare(query).all(...params) as any[];
return rows.map((row) => ({ id: row.id, ...JSON.parse(row.data_json as string) }) as T);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const { clause, params } = buildFilterClause(filter);
const row = this.db
.prepare(`SELECT COUNT(*) as count FROM ${collection}${clause}`)
.get(...params) as any;
return row?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return fn(this);
}
async migrate(): Promise<void> {
// No-op inside transaction
}
async close(): Promise<void> {
// No-op inside transaction
}
}

View File

@@ -1,11 +1,17 @@
export type { StorageAdapter, StorageConfig } from './types.js';
export { createStorageAdapter, registerStorageAdapter } from './factory.js';
export { PostgresAdapter } from './adapters/postgres.js';
export { SqliteAdapter } from './adapters/sqlite.js';
import { registerStorageAdapter } from './factory.js';
import { PostgresAdapter } from './adapters/postgres.js';
import { SqliteAdapter } from './adapters/sqlite.js';
import type { StorageConfig } from './types.js';
registerStorageAdapter('postgres', (config: StorageConfig) => {
return new PostgresAdapter(config as Extract<StorageConfig, { type: 'postgres' }>);
});
registerStorageAdapter('sqlite', (config: StorageConfig) => {
return new SqliteAdapter(config as Extract<StorageConfig, { type: 'sqlite' }>);
});