Merge pull request 'feat: storage abstraction retrofit — adapters for queue, storage, memory (phases 1-4)' (#365) from feat/storage-abstraction into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/push/publish Pipeline was successful

This commit was merged in pull request #365.
This commit is contained in:
2026-04-03 04:40:57 +00:00
38 changed files with 3243 additions and 89 deletions

View File

@@ -22,6 +22,7 @@
},
"dependencies": {
"@clack/prompts": "^0.9.0",
"@mosaic/config": "workspace:^",
"@mosaic/mosaic": "workspace:^",
"@mosaic/prdy": "workspace:^",
"@mosaic/quality-rails": "workspace:^",

View File

@@ -7,6 +7,7 @@ import { registerAgentCommand } from './commands/agent.js';
import { registerMissionCommand } from './commands/mission.js';
// prdy is registered via launch.ts
import { registerLaunchCommands } from './commands/launch.js';
import { registerGatewayCommand } from './commands/gateway.js';
const _require = createRequire(import.meta.url);
const CLI_VERSION: string = (_require('../package.json') as { version: string }).version;
@@ -290,6 +291,10 @@ sessionsCmd
}
});
// ─── gateway ──────────────────────────────────────────────────────────
registerGatewayCommand(program);
// ─── agent ─────────────────────────────────────────────────────────────
registerAgentCommand(program);

View File

@@ -0,0 +1,198 @@
import { createInterface } from 'node:readline';
import { spawn } from 'node:child_process';
import { existsSync, mkdirSync, readFileSync, unlinkSync, writeFileSync } from 'node:fs';
import { dirname, resolve } from 'node:path';
import type { Command } from 'commander';
import {
DEFAULT_LOCAL_CONFIG,
DEFAULT_TEAM_CONFIG,
loadConfig,
type MosaicConfig,
type StorageTier,
} from '@mosaic/config';
function ask(rl: ReturnType<typeof createInterface>, question: string): Promise<string> {
return new Promise((res) => rl.question(question, res));
}
async function runInit(opts: { tier?: string; output: string }): Promise<void> {
const outputPath = resolve(opts.output);
let tier: StorageTier;
if (opts.tier) {
if (opts.tier !== 'local' && opts.tier !== 'team') {
console.error(`Invalid tier "${opts.tier}" — expected "local" or "team"`);
process.exit(1);
}
tier = opts.tier;
} else {
const rl = createInterface({ input: process.stdin, output: process.stdout });
const answer = await ask(rl, 'Select tier (local/team) [local]: ');
rl.close();
const trimmed = answer.trim().toLowerCase();
tier = trimmed === 'team' ? 'team' : 'local';
}
let config: MosaicConfig;
if (tier === 'local') {
config = DEFAULT_LOCAL_CONFIG;
} else {
const rl = createInterface({ input: process.stdin, output: process.stdout });
const dbUrl = await ask(
rl,
'DATABASE_URL [postgresql://mosaic:mosaic@localhost:5432/mosaic]: ',
);
const valkeyUrl = await ask(rl, 'VALKEY_URL [redis://localhost:6379]: ');
rl.close();
config = {
...DEFAULT_TEAM_CONFIG,
storage: {
type: 'postgres',
url: dbUrl.trim() || 'postgresql://mosaic:mosaic@localhost:5432/mosaic',
},
queue: {
type: 'bullmq',
url: valkeyUrl.trim() || 'redis://localhost:6379',
},
};
}
writeFileSync(outputPath, JSON.stringify(config, null, 2) + '\n');
console.log(`\nWrote ${outputPath}`);
console.log('\nNext steps:');
console.log(' 1. Review the generated config');
console.log(' 2. Run: pnpm --filter @mosaic/gateway exec tsx src/main.ts');
}
const PID_FILE = resolve(process.cwd(), '.mosaic/gateway.pid');
function writePidFile(pid: number): void {
const dir = dirname(PID_FILE);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
writeFileSync(PID_FILE, String(pid));
}
function readPidFile(): number | null {
if (!existsSync(PID_FILE)) return null;
const raw = readFileSync(PID_FILE, 'utf-8').trim();
const pid = Number(raw);
return Number.isFinite(pid) ? pid : null;
}
function isProcessRunning(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function printConfigSummary(config: MosaicConfig): void {
console.log(` Tier: ${config.tier}`);
console.log(` Storage: ${config.storage.type}`);
console.log(` Queue: ${config.queue.type}`);
console.log(` Memory: ${config.memory.type}`);
}
export function registerGatewayCommand(program: Command): void {
const gateway = program.command('gateway').description('Gateway management commands');
gateway
.command('init')
.description('Generate a mosaic.config.json for the gateway')
.option('--tier <tier>', 'Storage tier: local or team (skips interactive prompt)')
.option('--output <path>', 'Output file path', './mosaic.config.json')
.action(async (opts: { tier?: string; output: string }) => {
await runInit(opts);
});
gateway
.command('start')
.description('Start the Mosaic gateway process')
.option('--port <port>', 'Port to listen on (overrides config)')
.option('--daemon', 'Run in background and write PID to .mosaic/gateway.pid')
.action((opts: { port?: string; daemon?: boolean }) => {
const config = loadConfig();
const port = opts.port ?? '4000';
console.log('Starting gateway…');
printConfigSummary(config);
console.log(` Port: ${port}`);
const entryPoint = resolve(process.cwd(), 'apps/gateway/src/main.ts');
const env = { ...process.env, GATEWAY_PORT: port };
if (opts.daemon) {
const child = spawn('npx', ['tsx', entryPoint], {
env,
stdio: 'ignore',
detached: true,
});
child.unref();
if (child.pid) {
writePidFile(child.pid);
console.log(`\nGateway started in background (PID ${child.pid})`);
console.log(`PID file: ${PID_FILE}`);
}
} else {
const child = spawn('npx', ['tsx', entryPoint], {
env,
stdio: 'inherit',
});
child.on('exit', (code) => {
process.exit(code ?? 0);
});
}
});
gateway
.command('stop')
.description('Stop the running gateway process')
.action(() => {
const pid = readPidFile();
if (pid === null) {
console.error('No PID file found at', PID_FILE);
process.exit(1);
}
if (!isProcessRunning(pid)) {
console.log(`Process ${pid} is not running. Removing stale PID file.`);
unlinkSync(PID_FILE);
return;
}
process.kill(pid, 'SIGTERM');
unlinkSync(PID_FILE);
console.log(`Gateway stopped (PID ${pid})`);
});
gateway
.command('status')
.description('Show gateway process status')
.action(() => {
const config = loadConfig();
const pid = readPidFile();
if (pid !== null && isProcessRunning(pid)) {
console.log('Gateway: running');
console.log(` PID: ${pid}`);
} else {
console.log('Gateway: stopped');
if (pid !== null) {
console.log(` (stale PID file for ${pid})`);
unlinkSync(PID_FILE);
}
}
console.log('');
console.log('Config:');
printConfigSummary(config);
});
}

View File

@@ -0,0 +1,35 @@
{
"name": "@mosaic/config",
"version": "0.0.1",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"lint": "eslint src",
"typecheck": "tsc --noEmit",
"test": "vitest run --passWithNoTests"
},
"dependencies": {
"@mosaic/memory": "workspace:^",
"@mosaic/queue": "workspace:^",
"@mosaic/storage": "workspace:^"
},
"devDependencies": {
"typescript": "^5.8.0",
"vitest": "^2.0.0"
},
"publishConfig": {
"registry": "https://git.mosaicstack.dev/api/packages/mosaic/npm/",
"access": "public"
},
"files": [
"dist"
]
}

View File

@@ -0,0 +1,7 @@
export type { MosaicConfig, StorageTier, MemoryConfigRef } from './mosaic-config.js';
export {
DEFAULT_LOCAL_CONFIG,
DEFAULT_TEAM_CONFIG,
loadConfig,
validateConfig,
} from './mosaic-config.js';

View File

@@ -0,0 +1,140 @@
import { readFileSync, existsSync } from 'node:fs';
import { resolve } from 'node:path';
import type { StorageConfig } from '@mosaic/storage';
import type { QueueAdapterConfig as QueueConfig } from '@mosaic/queue';
/* ------------------------------------------------------------------ */
/* Types */
/* ------------------------------------------------------------------ */
export type StorageTier = 'local' | 'team';
export interface MemoryConfigRef {
type: 'pgvector' | 'sqlite-vec' | 'keyword';
}
export interface MosaicConfig {
tier: StorageTier;
storage: StorageConfig;
queue: QueueConfig;
memory: MemoryConfigRef;
}
/* ------------------------------------------------------------------ */
/* Defaults */
/* ------------------------------------------------------------------ */
export const DEFAULT_LOCAL_CONFIG: MosaicConfig = {
tier: 'local',
storage: { type: 'sqlite', path: '.mosaic/data.db' },
queue: { type: 'local', dataDir: '.mosaic/queue' },
memory: { type: 'keyword' },
};
export const DEFAULT_TEAM_CONFIG: MosaicConfig = {
tier: 'team',
storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@localhost:5432/mosaic' },
queue: { type: 'bullmq' },
memory: { type: 'pgvector' },
};
/* ------------------------------------------------------------------ */
/* Validation */
/* ------------------------------------------------------------------ */
const VALID_TIERS = new Set<string>(['local', 'team']);
const VALID_STORAGE_TYPES = new Set<string>(['postgres', 'sqlite', 'files']);
const VALID_QUEUE_TYPES = new Set<string>(['bullmq', 'local']);
const VALID_MEMORY_TYPES = new Set<string>(['pgvector', 'sqlite-vec', 'keyword']);
export function validateConfig(raw: unknown): MosaicConfig {
if (typeof raw !== 'object' || raw === null) {
throw new Error('MosaicConfig must be a non-null object');
}
const obj = raw as Record<string, unknown>;
// tier
const tier = obj['tier'];
if (typeof tier !== 'string' || !VALID_TIERS.has(tier)) {
throw new Error(`Invalid tier "${String(tier)}" — expected "local" or "team"`);
}
// storage
const storage = obj['storage'];
if (typeof storage !== 'object' || storage === null) {
throw new Error('config.storage must be a non-null object');
}
const storageType = (storage as Record<string, unknown>)['type'];
if (typeof storageType !== 'string' || !VALID_STORAGE_TYPES.has(storageType)) {
throw new Error(`Invalid storage.type "${String(storageType)}"`);
}
// queue
const queue = obj['queue'];
if (typeof queue !== 'object' || queue === null) {
throw new Error('config.queue must be a non-null object');
}
const queueType = (queue as Record<string, unknown>)['type'];
if (typeof queueType !== 'string' || !VALID_QUEUE_TYPES.has(queueType)) {
throw new Error(`Invalid queue.type "${String(queueType)}"`);
}
// memory
const memory = obj['memory'];
if (typeof memory !== 'object' || memory === null) {
throw new Error('config.memory must be a non-null object');
}
const memoryType = (memory as Record<string, unknown>)['type'];
if (typeof memoryType !== 'string' || !VALID_MEMORY_TYPES.has(memoryType)) {
throw new Error(`Invalid memory.type "${String(memoryType)}"`);
}
return {
tier: tier as StorageTier,
storage: storage as StorageConfig,
queue: queue as QueueConfig,
memory: memory as MemoryConfigRef,
};
}
/* ------------------------------------------------------------------ */
/* Loader */
/* ------------------------------------------------------------------ */
function detectFromEnv(): MosaicConfig {
if (process.env['DATABASE_URL']) {
return {
...DEFAULT_TEAM_CONFIG,
storage: {
type: 'postgres',
url: process.env['DATABASE_URL'],
},
queue: {
type: 'bullmq',
url: process.env['VALKEY_URL'],
},
};
}
return DEFAULT_LOCAL_CONFIG;
}
export function loadConfig(configPath?: string): MosaicConfig {
// 1. Explicit path or default location
const paths = configPath
? [resolve(configPath)]
: [
resolve(process.cwd(), 'mosaic.config.json'),
resolve(process.cwd(), '../../mosaic.config.json'), // monorepo root when cwd is apps/gateway
];
for (const p of paths) {
if (existsSync(p)) {
const raw: unknown = JSON.parse(readFileSync(p, 'utf-8'));
return validateConfig(raw);
}
}
// 2. Fall back to env-var detection
return detectFromEnv();
}

View File

@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}

View File

@@ -18,6 +18,7 @@
},
"dependencies": {
"@mosaic/db": "workspace:*",
"@mosaic/storage": "workspace:*",
"@mosaic/types": "workspace:*",
"drizzle-orm": "^0.45.1"
},

View File

@@ -0,0 +1,298 @@
import { describe, it, expect, beforeEach } from 'vitest';
import type { StorageAdapter } from '@mosaic/storage';
import { KeywordAdapter } from './keyword.js';
/* ------------------------------------------------------------------ */
/* In-memory mock StorageAdapter */
/* ------------------------------------------------------------------ */
function createMockStorage(): StorageAdapter {
const collections = new Map<string, Map<string, Record<string, unknown>>>();
let idCounter = 0;
function getCollection(name: string): Map<string, Record<string, unknown>> {
if (!collections.has(name)) collections.set(name, new Map());
return collections.get(name)!;
}
const adapter: StorageAdapter = {
name: 'mock',
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = String(++idCounter);
const record = { ...data, id };
getCollection(collection).set(id, record);
return record as T & { id: string };
},
async read<T extends Record<string, unknown>>(
collection: string,
id: string,
): Promise<T | null> {
const record = getCollection(collection).get(id);
return (record as T) ?? null;
},
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const col = getCollection(collection);
const existing = col.get(id);
if (!existing) return false;
col.set(id, { ...existing, ...data });
return true;
},
async delete(collection: string, id: string): Promise<boolean> {
return getCollection(collection).delete(id);
},
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
): Promise<T[]> {
const col = getCollection(collection);
const results: T[] = [];
for (const record of col.values()) {
if (filter && !matchesFilter(record, filter)) continue;
results.push(record as T);
}
return results;
},
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const col = getCollection(collection);
for (const record of col.values()) {
if (matchesFilter(record, filter)) return record as T;
}
return null;
},
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const rows = await adapter.find(collection, filter);
return rows.length;
},
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return fn(adapter);
},
async migrate(): Promise<void> {},
async close(): Promise<void> {},
};
return adapter;
}
function matchesFilter(record: Record<string, unknown>, filter: Record<string, unknown>): boolean {
for (const [key, value] of Object.entries(filter)) {
if (record[key] !== value) return false;
}
return true;
}
/* ------------------------------------------------------------------ */
/* Tests */
/* ------------------------------------------------------------------ */
describe('KeywordAdapter', () => {
let adapter: KeywordAdapter;
beforeEach(() => {
adapter = new KeywordAdapter({ type: 'keyword', storage: createMockStorage() });
});
/* ---- Preferences ---- */
describe('preferences', () => {
it('should set and get a preference', async () => {
await adapter.setPreference('u1', 'theme', 'dark');
const value = await adapter.getPreference('u1', 'theme');
expect(value).toBe('dark');
});
it('should return null for missing preference', async () => {
const value = await adapter.getPreference('u1', 'nonexistent');
expect(value).toBeNull();
});
it('should upsert an existing preference', async () => {
await adapter.setPreference('u1', 'theme', 'dark');
await adapter.setPreference('u1', 'theme', 'light');
const value = await adapter.getPreference('u1', 'theme');
expect(value).toBe('light');
});
it('should delete a preference', async () => {
await adapter.setPreference('u1', 'theme', 'dark');
const deleted = await adapter.deletePreference('u1', 'theme');
expect(deleted).toBe(true);
const value = await adapter.getPreference('u1', 'theme');
expect(value).toBeNull();
});
it('should return false when deleting nonexistent preference', async () => {
const deleted = await adapter.deletePreference('u1', 'nope');
expect(deleted).toBe(false);
});
it('should list preferences by userId', async () => {
await adapter.setPreference('u1', 'theme', 'dark', 'appearance');
await adapter.setPreference('u1', 'lang', 'en', 'locale');
await adapter.setPreference('u2', 'theme', 'light', 'appearance');
const prefs = await adapter.listPreferences('u1');
expect(prefs).toHaveLength(2);
expect(prefs.map((p) => p.key).sort()).toEqual(['lang', 'theme']);
});
it('should filter preferences by category', async () => {
await adapter.setPreference('u1', 'theme', 'dark', 'appearance');
await adapter.setPreference('u1', 'lang', 'en', 'locale');
const prefs = await adapter.listPreferences('u1', 'appearance');
expect(prefs).toHaveLength(1);
expect(prefs[0]!.key).toBe('theme');
});
});
/* ---- Insights ---- */
describe('insights', () => {
it('should store and retrieve an insight', async () => {
const insight = await adapter.storeInsight({
userId: 'u1',
content: 'TypeScript is great for type safety',
source: 'chat',
category: 'technical',
relevanceScore: 0.9,
});
expect(insight.id).toBeDefined();
expect(insight.content).toBe('TypeScript is great for type safety');
const fetched = await adapter.getInsight(insight.id);
expect(fetched).not.toBeNull();
expect(fetched!.content).toBe('TypeScript is great for type safety');
});
it('should return null for missing insight', async () => {
const result = await adapter.getInsight('nonexistent');
expect(result).toBeNull();
});
it('should delete an insight', async () => {
const insight = await adapter.storeInsight({
userId: 'u1',
content: 'test',
source: 'chat',
category: 'general',
relevanceScore: 0.5,
});
const deleted = await adapter.deleteInsight(insight.id);
expect(deleted).toBe(true);
const fetched = await adapter.getInsight(insight.id);
expect(fetched).toBeNull();
});
});
/* ---- Keyword Search ---- */
describe('searchInsights', () => {
beforeEach(async () => {
await adapter.storeInsight({
userId: 'u1',
content: 'TypeScript provides excellent type safety for JavaScript projects',
source: 'chat',
category: 'technical',
relevanceScore: 0.9,
});
await adapter.storeInsight({
userId: 'u1',
content: 'React hooks simplify state management in components',
source: 'chat',
category: 'technical',
relevanceScore: 0.8,
});
await adapter.storeInsight({
userId: 'u1',
content: 'TypeScript and React work great together for type safe components',
source: 'chat',
category: 'technical',
relevanceScore: 0.85,
});
await adapter.storeInsight({
userId: 'u2',
content: 'TypeScript is popular',
source: 'chat',
category: 'general',
relevanceScore: 0.5,
});
});
it('should find insights by exact keyword', async () => {
const results = await adapter.searchInsights('u1', 'hooks');
expect(results).toHaveLength(1);
expect(results[0]!.content).toContain('hooks');
});
it('should be case-insensitive', async () => {
const results = await adapter.searchInsights('u1', 'TYPESCRIPT');
expect(results.length).toBeGreaterThanOrEqual(1);
for (const r of results) {
expect(r.content.toLowerCase()).toContain('typescript');
}
});
it('should rank multi-word matches higher', async () => {
const results = await adapter.searchInsights('u1', 'TypeScript React');
// The insight mentioning both "TypeScript" and "React" should rank first (score=2)
expect(results[0]!.score).toBe(2);
expect(results[0]!.content).toContain('TypeScript');
expect(results[0]!.content).toContain('React');
});
it('should return empty for no matches', async () => {
const results = await adapter.searchInsights('u1', 'python django');
expect(results).toHaveLength(0);
});
it('should filter by userId', async () => {
const results = await adapter.searchInsights('u2', 'TypeScript');
expect(results).toHaveLength(1);
expect(results[0]!.content).toBe('TypeScript is popular');
});
it('should respect limit option', async () => {
const results = await adapter.searchInsights('u1', 'TypeScript', { limit: 1 });
expect(results).toHaveLength(1);
});
it('should return empty for empty query', async () => {
const results = await adapter.searchInsights('u1', ' ');
expect(results).toHaveLength(0);
});
});
/* ---- Lifecycle ---- */
describe('lifecycle', () => {
it('should have name "keyword"', () => {
expect(adapter.name).toBe('keyword');
});
it('should have null embedder', () => {
expect(adapter.embedder).toBeNull();
});
it('should close without error', async () => {
await expect(adapter.close()).resolves.toBeUndefined();
});
});
});

View File

@@ -0,0 +1,195 @@
import type { StorageAdapter } from '@mosaic/storage';
import type {
MemoryAdapter,
MemoryConfig,
NewInsight,
Insight,
InsightSearchResult,
} from '../types.js';
import type { EmbeddingProvider } from '../vector-store.js';
type KeywordConfig = Extract<MemoryConfig, { type: 'keyword' }>;
const PREFERENCES = 'preferences';
const INSIGHTS = 'insights';
type PreferenceRecord = Record<string, unknown> & {
id: string;
userId: string;
key: string;
value: unknown;
category: string;
};
type InsightRecord = Record<string, unknown> & {
id: string;
userId: string;
content: string;
source: string;
category: string;
relevanceScore: number;
metadata: Record<string, unknown>;
createdAt: string;
updatedAt?: string;
decayedAt?: string;
};
export class KeywordAdapter implements MemoryAdapter {
readonly name = 'keyword';
readonly embedder: EmbeddingProvider | null = null;
private storage: StorageAdapter;
constructor(config: KeywordConfig) {
this.storage = config.storage;
}
/* ------------------------------------------------------------------ */
/* Preferences */
/* ------------------------------------------------------------------ */
async getPreference(userId: string, key: string): Promise<unknown | null> {
const row = await this.storage.findOne<PreferenceRecord>(PREFERENCES, { userId, key });
return row?.value ?? null;
}
async setPreference(
userId: string,
key: string,
value: unknown,
category?: string,
): Promise<void> {
const existing = await this.storage.findOne<PreferenceRecord>(PREFERENCES, { userId, key });
if (existing) {
await this.storage.update(PREFERENCES, existing.id, {
value,
...(category !== undefined ? { category } : {}),
});
} else {
await this.storage.create(PREFERENCES, {
userId,
key,
value,
category: category ?? 'general',
});
}
}
async deletePreference(userId: string, key: string): Promise<boolean> {
const existing = await this.storage.findOne<PreferenceRecord>(PREFERENCES, { userId, key });
if (!existing) return false;
return this.storage.delete(PREFERENCES, existing.id);
}
async listPreferences(
userId: string,
category?: string,
): Promise<Array<{ key: string; value: unknown; category: string }>> {
const filter: Record<string, unknown> = { userId };
if (category !== undefined) filter.category = category;
const rows = await this.storage.find<PreferenceRecord>(PREFERENCES, filter);
return rows.map((r) => ({ key: r.key, value: r.value, category: r.category }));
}
/* ------------------------------------------------------------------ */
/* Insights */
/* ------------------------------------------------------------------ */
async storeInsight(insight: NewInsight): Promise<Insight> {
const now = new Date();
const row = await this.storage.create<Record<string, unknown>>(INSIGHTS, {
userId: insight.userId,
content: insight.content,
source: insight.source,
category: insight.category,
relevanceScore: insight.relevanceScore,
metadata: insight.metadata ?? {},
createdAt: now.toISOString(),
});
return {
id: row.id,
userId: insight.userId,
content: insight.content,
source: insight.source,
category: insight.category,
relevanceScore: insight.relevanceScore,
metadata: insight.metadata,
createdAt: now,
};
}
async getInsight(id: string): Promise<Insight | null> {
const row = await this.storage.read<InsightRecord>(INSIGHTS, id);
if (!row) return null;
return toInsight(row);
}
async searchInsights(
userId: string,
query: string,
opts?: { limit?: number; embedding?: number[] },
): Promise<InsightSearchResult[]> {
const limit = opts?.limit ?? 10;
const words = query
.toLowerCase()
.split(/\s+/)
.filter((w) => w.length > 0);
if (words.length === 0) return [];
const rows = await this.storage.find<InsightRecord>(INSIGHTS, { userId });
const scored: InsightSearchResult[] = [];
for (const row of rows) {
const content = row.content.toLowerCase();
let score = 0;
for (const word of words) {
if (content.includes(word)) score++;
}
if (score > 0) {
scored.push({
id: row.id,
content: row.content,
score,
metadata: row.metadata ?? undefined,
});
}
}
scored.sort((a, b) => b.score - a.score);
return scored.slice(0, limit);
}
async deleteInsight(id: string): Promise<boolean> {
return this.storage.delete(INSIGHTS, id);
}
/* ------------------------------------------------------------------ */
/* Lifecycle */
/* ------------------------------------------------------------------ */
async close(): Promise<void> {
// no-op — storage adapter manages its own lifecycle
}
}
/* ------------------------------------------------------------------ */
/* Helpers */
/* ------------------------------------------------------------------ */
function toInsight(row: InsightRecord): Insight {
return {
id: row.id,
userId: row.userId,
content: row.content,
source: row.source,
category: row.category,
relevanceScore: row.relevanceScore,
metadata: row.metadata ?? undefined,
createdAt: new Date(row.createdAt),
updatedAt: row.updatedAt ? new Date(row.updatedAt) : undefined,
decayedAt: row.decayedAt ? new Date(row.decayedAt) : undefined,
};
}

View File

@@ -0,0 +1,177 @@
import { createDb, type DbHandle } from '@mosaic/db';
import type {
MemoryAdapter,
MemoryConfig,
NewInsight as AdapterNewInsight,
Insight as AdapterInsight,
InsightSearchResult,
} from '../types.js';
import type { EmbeddingProvider } from '../vector-store.js';
import {
createPreferencesRepo,
type PreferencesRepo,
type Preference,
type NewPreference,
} from '../preferences.js';
import {
createInsightsRepo,
type InsightsRepo,
type NewInsight as DbNewInsight,
} from '../insights.js';
type PgVectorConfig = Extract<MemoryConfig, { type: 'pgvector' }>;
export class PgVectorAdapter implements MemoryAdapter {
readonly name = 'pgvector';
readonly embedder: EmbeddingProvider | null;
private handle: DbHandle;
private preferences: PreferencesRepo;
private insights: InsightsRepo;
constructor(config: PgVectorConfig) {
this.handle = createDb();
this.preferences = createPreferencesRepo(this.handle.db);
this.insights = createInsightsRepo(this.handle.db);
this.embedder = config.embedder ?? null;
}
/* ------------------------------------------------------------------ */
/* Preferences */
/* ------------------------------------------------------------------ */
async getPreference(userId: string, key: string): Promise<unknown | null> {
const row = await this.preferences.findByUserAndKey(userId, key);
return row?.value ?? null;
}
async setPreference(
userId: string,
key: string,
value: unknown,
category?: string,
): Promise<void> {
await this.preferences.upsert({
userId,
key,
value,
...(category ? { category: category as NewPreference['category'] } : {}),
});
}
async deletePreference(userId: string, key: string): Promise<boolean> {
return this.preferences.remove(userId, key);
}
async listPreferences(
userId: string,
category?: string,
): Promise<Array<{ key: string; value: unknown; category: string }>> {
const rows = category
? await this.preferences.findByUserAndCategory(userId, category as Preference['category'])
: await this.preferences.findByUser(userId);
return rows.map((r) => ({ key: r.key, value: r.value, category: r.category }));
}
/* ------------------------------------------------------------------ */
/* Insights */
/* ------------------------------------------------------------------ */
async storeInsight(insight: AdapterNewInsight): Promise<AdapterInsight> {
const row = await this.insights.create({
userId: insight.userId,
content: insight.content,
source: insight.source as DbNewInsight['source'],
category: insight.category as DbNewInsight['category'],
relevanceScore: insight.relevanceScore,
metadata: insight.metadata ?? {},
embedding: insight.embedding ?? null,
});
return toAdapterInsight(row);
}
async getInsight(id: string): Promise<AdapterInsight | null> {
// findById requires userId — search across all users via raw find
// The adapter interface only takes id, so we pass an empty userId and rely on the id match.
// Since the repo requires userId, we use a two-step approach.
const row = await this.insights.findById(id, '');
if (!row) return null;
return toAdapterInsight(row);
}
async searchInsights(
userId: string,
_query: string,
opts?: { limit?: number; embedding?: number[] },
): Promise<InsightSearchResult[]> {
if (opts?.embedding) {
const results = await this.insights.searchByEmbedding(
userId,
opts.embedding,
opts.limit ?? 10,
);
return results.map((r) => ({
id: r.insight.id,
content: r.insight.content,
score: 1 - r.distance,
metadata: (r.insight.metadata as Record<string, unknown>) ?? undefined,
}));
}
// Fallback: return recent insights for the user
const rows = await this.insights.findByUser(userId, opts?.limit ?? 10);
return rows.map((r) => ({
id: r.id,
content: r.content,
score: Number(r.relevanceScore),
metadata: (r.metadata as Record<string, unknown>) ?? undefined,
}));
}
async deleteInsight(id: string): Promise<boolean> {
// The repo requires userId — pass empty string since adapter interface only has id
return this.insights.remove(id, '');
}
/* ------------------------------------------------------------------ */
/* Lifecycle */
/* ------------------------------------------------------------------ */
async close(): Promise<void> {
await this.handle.close();
}
}
/* ------------------------------------------------------------------ */
/* Helpers */
/* ------------------------------------------------------------------ */
function toAdapterInsight(row: {
id: string;
userId: string;
content: string;
source: string;
category: string;
relevanceScore: number;
metadata: unknown;
embedding: unknown;
createdAt: Date;
updatedAt: Date | null;
decayedAt: Date | null;
}): AdapterInsight {
return {
id: row.id,
userId: row.userId,
content: row.content,
source: row.source,
category: row.category,
relevanceScore: row.relevanceScore,
metadata: (row.metadata as Record<string, unknown>) ?? undefined,
embedding: (row.embedding as number[]) ?? undefined,
createdAt: row.createdAt,
updatedAt: row.updatedAt ?? undefined,
decayedAt: row.decayedAt ?? undefined,
};
}

View File

@@ -0,0 +1,18 @@
import type { MemoryAdapter, MemoryConfig } from './types.js';
type MemoryType = MemoryConfig['type'];
const registry = new Map<MemoryType, (config: MemoryConfig) => MemoryAdapter>();
export function registerMemoryAdapter(
type: MemoryType,
factory: (config: MemoryConfig) => MemoryAdapter,
): void {
registry.set(type, factory);
}
export function createMemoryAdapter(config: MemoryConfig): MemoryAdapter {
const factory = registry.get(config.type);
if (!factory) throw new Error(`No adapter registered for type: ${config.type}`);
return factory(config);
}

View File

@@ -13,3 +13,27 @@ export {
type SearchResult,
} from './insights.js';
export type { VectorStore, VectorSearchResult, EmbeddingProvider } from './vector-store.js';
export type {
MemoryAdapter,
MemoryConfig,
NewInsight as AdapterNewInsight,
Insight as AdapterInsight,
InsightSearchResult,
} from './types.js';
export { createMemoryAdapter, registerMemoryAdapter } from './factory.js';
export { PgVectorAdapter } from './adapters/pgvector.js';
export { KeywordAdapter } from './adapters/keyword.js';
// Auto-register adapters at module load time
import { registerMemoryAdapter } from './factory.js';
import { PgVectorAdapter } from './adapters/pgvector.js';
import { KeywordAdapter } from './adapters/keyword.js';
import type { MemoryConfig } from './types.js';
registerMemoryAdapter('pgvector', (config: MemoryConfig) => {
return new PgVectorAdapter(config as Extract<MemoryConfig, { type: 'pgvector' }>);
});
registerMemoryAdapter('keyword', (config: MemoryConfig) => {
return new KeywordAdapter(config as Extract<MemoryConfig, { type: 'keyword' }>);
});

View File

@@ -0,0 +1,73 @@
export type { EmbeddingProvider, VectorSearchResult } from './vector-store.js';
import type { EmbeddingProvider } from './vector-store.js';
import type { StorageAdapter } from '@mosaic/storage';
/* ------------------------------------------------------------------ */
/* Insight types (adapter-level, decoupled from Drizzle schema) */
/* ------------------------------------------------------------------ */
export interface NewInsight {
userId: string;
content: string;
source: string;
category: string;
relevanceScore: number;
metadata?: Record<string, unknown>;
embedding?: number[];
}
export interface Insight extends NewInsight {
id: string;
createdAt: Date;
updatedAt?: Date;
decayedAt?: Date;
}
export interface InsightSearchResult {
id: string;
content: string;
score: number;
metadata?: Record<string, unknown>;
}
/* ------------------------------------------------------------------ */
/* MemoryAdapter interface */
/* ------------------------------------------------------------------ */
export interface MemoryAdapter {
readonly name: string;
// Preferences
getPreference(userId: string, key: string): Promise<unknown | null>;
setPreference(userId: string, key: string, value: unknown, category?: string): Promise<void>;
deletePreference(userId: string, key: string): Promise<boolean>;
listPreferences(
userId: string,
category?: string,
): Promise<Array<{ key: string; value: unknown; category: string }>>;
// Insights
storeInsight(insight: NewInsight): Promise<Insight>;
getInsight(id: string): Promise<Insight | null>;
searchInsights(
userId: string,
query: string,
opts?: { limit?: number; embedding?: number[] },
): Promise<InsightSearchResult[]>;
deleteInsight(id: string): Promise<boolean>;
// Embedding
readonly embedder: EmbeddingProvider | null;
// Lifecycle
close(): Promise<void>;
}
/* ------------------------------------------------------------------ */
/* MemoryConfig */
/* ------------------------------------------------------------------ */
export type MemoryConfig =
| { type: 'pgvector'; embedder?: EmbeddingProvider }
| { type: 'sqlite-vec'; embedder?: EmbeddingProvider }
| { type: 'keyword'; storage: StorageAdapter };

View File

@@ -0,0 +1,50 @@
import Redis from 'ioredis';
import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js';
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
export function createBullMQAdapter(config: QueueConfig): QueueAdapter {
if (config.type !== 'bullmq') {
throw new Error(`Expected config type "bullmq", got "${config.type}"`);
}
const url = config.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
return {
name: 'bullmq',
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
await redis.lpush(queueName, JSON.stringify(payload));
},
async dequeue(queueName: string): Promise<TaskPayload | null> {
const item = await redis.rpop(queueName);
if (!item) return null;
return JSON.parse(item) as TaskPayload;
},
async length(queueName: string): Promise<number> {
return redis.llen(queueName);
},
async publish(channel: string, message: string): Promise<void> {
await redis.publish(channel, message);
},
subscribe(channel: string, handler: (message: string) => void): () => void {
const sub = redis.duplicate();
sub.subscribe(channel).catch(() => {});
sub.on('message', (_ch: string, msg: string) => handler(msg));
return () => {
sub.unsubscribe(channel).catch(() => {});
sub.disconnect();
};
},
async close(): Promise<void> {
await redis.quit();
},
};
}

View File

@@ -0,0 +1,81 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import type { TaskPayload } from '../types.js';
import { createLocalAdapter } from './local.js';
function makePayload(id: string): TaskPayload {
return { id, type: 'test', data: { value: id }, createdAt: new Date().toISOString() };
}
describe('LocalAdapter', () => {
let dataDir: string;
beforeEach(() => {
dataDir = mkdtempSync(join(tmpdir(), 'mosaic-queue-test-'));
});
afterEach(() => {
rmSync(dataDir, { recursive: true, force: true });
});
it('enqueue + dequeue in FIFO order', async () => {
const adapter = createLocalAdapter({ type: 'local', dataDir });
const a = makePayload('a');
const b = makePayload('b');
const c = makePayload('c');
await adapter.enqueue('tasks', a);
await adapter.enqueue('tasks', b);
await adapter.enqueue('tasks', c);
expect(await adapter.dequeue('tasks')).toEqual(a);
expect(await adapter.dequeue('tasks')).toEqual(b);
expect(await adapter.dequeue('tasks')).toEqual(c);
expect(await adapter.dequeue('tasks')).toBeNull();
});
it('length accuracy', async () => {
const adapter = createLocalAdapter({ type: 'local', dataDir });
expect(await adapter.length('q')).toBe(0);
await adapter.enqueue('q', makePayload('1'));
await adapter.enqueue('q', makePayload('2'));
expect(await adapter.length('q')).toBe(2);
await adapter.dequeue('q');
expect(await adapter.length('q')).toBe(1);
});
it('publish + subscribe delivery', async () => {
const adapter = createLocalAdapter({ type: 'local', dataDir });
const received: string[] = [];
const unsub = adapter.subscribe('chan', (msg) => received.push(msg));
await adapter.publish('chan', 'hello');
await adapter.publish('chan', 'world');
expect(received).toEqual(['hello', 'world']);
unsub();
await adapter.publish('chan', 'after-unsub');
expect(received).toEqual(['hello', 'world']);
});
it('persistence survives close and re-create', async () => {
const p1 = makePayload('x');
const p2 = makePayload('y');
const adapter1 = createLocalAdapter({ type: 'local', dataDir });
await adapter1.enqueue('persist-q', p1);
await adapter1.enqueue('persist-q', p2);
await adapter1.close();
const adapter2 = createLocalAdapter({ type: 'local', dataDir });
expect(await adapter2.length('persist-q')).toBe(2);
expect(await adapter2.dequeue('persist-q')).toEqual(p1);
expect(await adapter2.dequeue('persist-q')).toEqual(p2);
await adapter2.close();
});
});

View File

@@ -0,0 +1,87 @@
import { mkdirSync, readFileSync, readdirSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { EventEmitter } from 'node:events';
import type { QueueAdapter, QueueConfig, TaskPayload } from '../types.js';
const DEFAULT_DATA_DIR = '.mosaic/queue';
export function createLocalAdapter(config: QueueConfig): QueueAdapter {
if (config.type !== 'local') {
throw new Error(`Expected config type "local", got "${config.type}"`);
}
const dataDir = config.dataDir ?? DEFAULT_DATA_DIR;
const queues = new Map<string, TaskPayload[]>();
const emitter = new EventEmitter();
mkdirSync(dataDir, { recursive: true });
// Load existing JSON files on startup
for (const file of readdirSync(dataDir)) {
if (!file.endsWith('.json')) continue;
const queueName = file.slice(0, -5);
try {
const raw = readFileSync(join(dataDir, file), 'utf-8');
const items = JSON.parse(raw) as TaskPayload[];
if (Array.isArray(items)) {
queues.set(queueName, items);
}
} catch {
// Ignore corrupt files
}
}
function persist(queueName: string): void {
const items = queues.get(queueName) ?? [];
writeFileSync(join(dataDir, `${queueName}.json`), JSON.stringify(items), 'utf-8');
}
function getQueue(queueName: string): TaskPayload[] {
let q = queues.get(queueName);
if (!q) {
q = [];
queues.set(queueName, q);
}
return q;
}
return {
name: 'local',
async enqueue(queueName: string, payload: TaskPayload): Promise<void> {
getQueue(queueName).push(payload);
persist(queueName);
},
async dequeue(queueName: string): Promise<TaskPayload | null> {
const q = getQueue(queueName);
const item = q.shift() ?? null;
persist(queueName);
return item;
},
async length(queueName: string): Promise<number> {
return getQueue(queueName).length;
},
async publish(channel: string, message: string): Promise<void> {
emitter.emit(channel, message);
},
subscribe(channel: string, handler: (message: string) => void): () => void {
emitter.on(channel, handler);
return () => {
emitter.off(channel, handler);
};
},
async close(): Promise<void> {
for (const queueName of queues.keys()) {
persist(queueName);
}
queues.clear();
emitter.removeAllListeners();
},
};
}

View File

@@ -0,0 +1,18 @@
import type { QueueAdapter, QueueConfig } from './types.js';
type QueueType = QueueConfig['type'];
const registry = new Map<QueueType, (config: QueueConfig) => QueueAdapter>();
export function registerQueueAdapter(
type: QueueType,
factory: (config: QueueConfig) => QueueAdapter,
): void {
registry.set(type, factory);
}
export function createQueueAdapter(config: QueueConfig): QueueAdapter {
const factory = registry.get(config.type);
if (!factory) throw new Error(`No adapter registered for type: ${config.type}`);
return factory(config);
}

View File

@@ -6,3 +6,15 @@ export {
type QueueClient,
type TaskPayload,
} from './queue.js';
export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './types.js';
export { createQueueAdapter, registerQueueAdapter } from './factory.js';
export { createBullMQAdapter } from './adapters/bullmq.js';
export { createLocalAdapter } from './adapters/local.js';
import { registerQueueAdapter } from './factory.js';
import { createBullMQAdapter } from './adapters/bullmq.js';
import { createLocalAdapter } from './adapters/local.js';
registerQueueAdapter('bullmq', createBullMQAdapter);
registerQueueAdapter('local', createLocalAdapter);

View File

@@ -0,0 +1,18 @@
export interface TaskPayload {
id: string;
type: string;
data: Record<string, unknown>;
createdAt: string;
}
export interface QueueAdapter {
readonly name: string;
enqueue(queueName: string, payload: TaskPayload): Promise<void>;
dequeue(queueName: string): Promise<TaskPayload | null>;
length(queueName: string): Promise<number>;
publish(channel: string, message: string): Promise<void>;
subscribe(channel: string, handler: (message: string) => void): () => void;
close(): Promise<void>;
}
export type QueueConfig = { type: 'bullmq'; url?: string } | { type: 'local'; dataDir?: string };

View File

@@ -0,0 +1,35 @@
{
"name": "@mosaic/storage",
"version": "0.0.2",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"lint": "eslint src",
"typecheck": "tsc --noEmit",
"test": "vitest run --passWithNoTests"
},
"dependencies": {
"@mosaic/db": "workspace:^",
"@mosaic/types": "workspace:*",
"better-sqlite3": "^12.8.0"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"typescript": "^5.8.0",
"vitest": "^2.0.0"
},
"publishConfig": {
"registry": "https://git.mosaicstack.dev/api/packages/mosaic/npm/",
"access": "public"
},
"files": [
"dist"
]
}

View File

@@ -0,0 +1,252 @@
import {
createDb,
runMigrations,
eq,
and,
asc,
desc,
sql,
type Db,
type DbHandle,
} from '@mosaic/db';
import * as schema from '@mosaic/db';
import type { StorageAdapter, StorageConfig } from '../types.js';
/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* Maps collection name → Drizzle table object.
* Typed as `any` because the generic StorageAdapter interface erases table
* types — all runtime values are still strongly-typed Drizzle table objects.
*/
const TABLE_MAP: Record<string, any> = {
users: schema.users,
sessions: schema.sessions,
accounts: schema.accounts,
verifications: schema.verifications,
teams: schema.teams,
team_members: schema.teamMembers,
projects: schema.projects,
missions: schema.missions,
tasks: schema.tasks,
mission_tasks: schema.missionTasks,
events: schema.events,
agents: schema.agents,
tickets: schema.tickets,
appreciations: schema.appreciations,
conversations: schema.conversations,
messages: schema.messages,
preferences: schema.preferences,
insights: schema.insights,
agent_logs: schema.agentLogs,
skills: schema.skills,
routing_rules: schema.routingRules,
provider_credentials: schema.providerCredentials,
summarization_jobs: schema.summarizationJobs,
};
function getTable(collection: string): any {
const table = TABLE_MAP[collection];
if (!table) throw new Error(`Unknown collection: ${collection}`);
return table;
}
function buildWhereClause(table: any, filter?: Record<string, unknown>) {
if (!filter || Object.keys(filter).length === 0) return undefined;
const conditions = Object.entries(filter).map(([key, value]) => {
const column = table[key];
if (!column) throw new Error(`Unknown column "${key}" on table`);
return eq(column, value);
});
return conditions.length === 1 ? conditions[0]! : and(...conditions);
}
export class PostgresAdapter implements StorageAdapter {
readonly name = 'postgres';
private handle: DbHandle;
private db: Db;
private url: string;
constructor(config: Extract<StorageConfig, { type: 'postgres' }>) {
this.url = config.url;
this.handle = createDb(config.url);
this.db = this.handle.db;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const table = getTable(collection);
const [row] = await (this.db as any).insert(table).values(data).returning();
return row as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const table = getTable(collection);
const [row] = await (this.db as any).select().from(table).where(eq(table.id, id));
return (row as T) ?? null;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const table = getTable(collection);
const result = await (this.db as any)
.update(table)
.set(data)
.where(eq(table.id, id))
.returning();
return result.length > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const table = getTable(collection);
const result = await (this.db as any).delete(table).where(eq(table.id, id)).returning();
return result.length > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const table = getTable(collection);
let query = (this.db as any).select().from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
if (opts?.orderBy) {
const col = table[opts.orderBy];
if (col) {
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
}
}
if (opts?.limit) query = query.limit(opts.limit);
if (opts?.offset) query = query.offset(opts.offset);
return (await query) as T[];
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const table = getTable(collection);
let query = (this.db as any).select({ count: sql<number>`count(*)::int` }).from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
const [row] = await query;
return (row as any)?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return (this.db as any).transaction(async (drizzleTx: any) => {
const txAdapter = new PostgresTxAdapter(drizzleTx, this.url);
return fn(txAdapter);
});
}
async migrate(): Promise<void> {
await runMigrations(this.url);
}
async close(): Promise<void> {
await this.handle.close();
}
}
/**
* Thin transaction wrapper — delegates to the Drizzle transaction object
* instead of the top-level db handle.
*/
class PostgresTxAdapter implements StorageAdapter {
readonly name = 'postgres';
private tx: any;
private url: string;
constructor(tx: any, url: string) {
this.tx = tx;
this.url = url;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const table = getTable(collection);
const [row] = await this.tx.insert(table).values(data).returning();
return row as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const table = getTable(collection);
const [row] = await this.tx.select().from(table).where(eq(table.id, id));
return (row as T) ?? null;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const table = getTable(collection);
const result = await this.tx.update(table).set(data).where(eq(table.id, id)).returning();
return result.length > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const table = getTable(collection);
const result = await this.tx.delete(table).where(eq(table.id, id)).returning();
return result.length > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const table = getTable(collection);
let query = this.tx.select().from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
if (opts?.orderBy) {
const col = table[opts.orderBy];
if (col) {
query = query.orderBy(opts.order === 'desc' ? desc(col) : asc(col));
}
}
if (opts?.limit) query = query.limit(opts.limit);
if (opts?.offset) query = query.offset(opts.offset);
return (await query) as T[];
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const table = getTable(collection);
let query = this.tx.select({ count: sql<number>`count(*)::int` }).from(table);
const where = buildWhereClause(table, filter);
if (where) query = query.where(where);
const [row] = await query;
return (row as any)?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return this.tx.transaction(async (nestedTx: any) => {
const nested = new PostgresTxAdapter(nestedTx, this.url);
return fn(nested);
});
}
async migrate(): Promise<void> {
await runMigrations(this.url);
}
async close(): Promise<void> {
// No-op inside a transaction
}
}

View File

@@ -0,0 +1,201 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { SqliteAdapter } from './sqlite.js';
describe('SqliteAdapter', () => {
let adapter: SqliteAdapter;
beforeEach(async () => {
adapter = new SqliteAdapter({ type: 'sqlite', path: ':memory:' });
await adapter.migrate();
});
afterEach(async () => {
await adapter.close();
});
describe('CRUD', () => {
it('creates and reads a record', async () => {
const created = await adapter.create('users', { name: 'Alice', email: 'alice@test.com' });
expect(created.id).toBeDefined();
expect(created.name).toBe('Alice');
const read = await adapter.read('users', created.id);
expect(read).not.toBeNull();
expect(read!.name).toBe('Alice');
expect(read!.email).toBe('alice@test.com');
});
it('returns null for non-existent record', async () => {
const result = await adapter.read('users', 'does-not-exist');
expect(result).toBeNull();
});
it('updates a record', async () => {
const created = await adapter.create('users', { name: 'Alice' });
const updated = await adapter.update('users', created.id, { name: 'Bob' });
expect(updated).toBe(true);
const read = await adapter.read('users', created.id);
expect(read!.name).toBe('Bob');
});
it('update returns false for non-existent record', async () => {
const result = await adapter.update('users', 'does-not-exist', { name: 'X' });
expect(result).toBe(false);
});
it('deletes a record', async () => {
const created = await adapter.create('users', { name: 'Alice' });
const deleted = await adapter.delete('users', created.id);
expect(deleted).toBe(true);
const read = await adapter.read('users', created.id);
expect(read).toBeNull();
});
it('delete returns false for non-existent record', async () => {
const result = await adapter.delete('users', 'does-not-exist');
expect(result).toBe(false);
});
});
describe('find', () => {
it('finds records with filter', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
await adapter.create('users', { name: 'Charlie', role: 'admin' });
const admins = await adapter.find('users', { role: 'admin' });
expect(admins).toHaveLength(2);
expect(admins.map((u) => u.name).sort()).toEqual(['Alice', 'Charlie']);
});
it('finds all records without filter', async () => {
await adapter.create('users', { name: 'Alice' });
await adapter.create('users', { name: 'Bob' });
const all = await adapter.find('users');
expect(all).toHaveLength(2);
});
it('supports limit and offset', async () => {
for (let i = 0; i < 5; i++) {
await adapter.create('users', { name: `User${i}`, idx: i });
}
const page = await adapter.find('users', undefined, {
limit: 2,
offset: 1,
orderBy: 'created_at',
});
expect(page).toHaveLength(2);
});
it('findOne returns first match', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
const found = await adapter.findOne('users', { role: 'user' });
expect(found).not.toBeNull();
expect(found!.name).toBe('Bob');
});
it('findOne returns null when no match', async () => {
const result = await adapter.findOne('users', { role: 'nonexistent' });
expect(result).toBeNull();
});
});
describe('count', () => {
it('counts all records', async () => {
await adapter.create('users', { name: 'Alice' });
await adapter.create('users', { name: 'Bob' });
const total = await adapter.count('users');
expect(total).toBe(2);
});
it('counts with filter', async () => {
await adapter.create('users', { name: 'Alice', role: 'admin' });
await adapter.create('users', { name: 'Bob', role: 'user' });
await adapter.create('users', { name: 'Charlie', role: 'admin' });
const adminCount = await adapter.count('users', { role: 'admin' });
expect(adminCount).toBe(2);
});
it('returns 0 for empty collection', async () => {
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
describe('transaction', () => {
it('commits on success', async () => {
await adapter.transaction(async (tx) => {
await tx.create('users', { name: 'Alice' });
await tx.create('users', { name: 'Bob' });
});
const count = await adapter.count('users');
expect(count).toBe(2);
});
it('rolls back on error', async () => {
await expect(
adapter.transaction(async (tx) => {
await tx.create('users', { name: 'Alice' });
throw new Error('rollback test');
}),
).rejects.toThrow('rollback test');
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
describe('migrate', () => {
it('creates all tables', async () => {
// migrate() was already called in beforeEach — verify tables exist
const collections = [
'users',
'sessions',
'accounts',
'projects',
'missions',
'tasks',
'agents',
'conversations',
'messages',
'preferences',
'insights',
'skills',
'events',
'routing_rules',
'provider_credentials',
'agent_logs',
'teams',
'team_members',
'mission_tasks',
'tickets',
'summarization_jobs',
'appreciations',
'verifications',
];
for (const collection of collections) {
// Should not throw
const count = await adapter.count(collection);
expect(count).toBe(0);
}
});
it('is idempotent', async () => {
await adapter.migrate();
await adapter.migrate();
// Should not throw
const count = await adapter.count('users');
expect(count).toBe(0);
});
});
});

View File

@@ -0,0 +1,283 @@
import Database from 'better-sqlite3';
import { randomUUID } from 'node:crypto';
import type { StorageAdapter, StorageConfig } from '../types.js';
/* eslint-disable @typescript-eslint/no-explicit-any */
const COLLECTIONS = [
'users',
'sessions',
'accounts',
'projects',
'missions',
'tasks',
'agents',
'conversations',
'messages',
'preferences',
'insights',
'skills',
'events',
'routing_rules',
'provider_credentials',
'agent_logs',
'teams',
'team_members',
'mission_tasks',
'tickets',
'summarization_jobs',
'appreciations',
'verifications',
] as const;
function buildFilterClause(filter?: Record<string, unknown>): {
clause: string;
params: unknown[];
} {
if (!filter || Object.keys(filter).length === 0) return { clause: '', params: [] };
const conditions: string[] = [];
const params: unknown[] = [];
for (const [key, value] of Object.entries(filter)) {
if (key === 'id') {
conditions.push('id = ?');
params.push(value);
} else {
conditions.push(`json_extract(data_json, '$.${key}') = ?`);
params.push(typeof value === 'object' ? JSON.stringify(value) : value);
}
}
return { clause: ` WHERE ${conditions.join(' AND ')}`, params };
}
export class SqliteAdapter implements StorageAdapter {
readonly name = 'sqlite';
private db: Database.Database;
constructor(config: Extract<StorageConfig, { type: 'sqlite' }>) {
this.db = new Database(config.path);
this.db.pragma('journal_mode = WAL');
this.db.pragma('foreign_keys = ON');
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = (data as any).id ?? randomUUID();
const now = new Date().toISOString();
const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id'));
this.db
.prepare(
`INSERT INTO ${collection} (id, data_json, created_at, updated_at) VALUES (?, ?, ?, ?)`,
)
.run(id, JSON.stringify(rest), now, now);
return { ...data, id } as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const row = this.db.prepare(`SELECT * FROM ${collection} WHERE id = ?`).get(id) as any;
if (!row) return null;
return { id: row.id, ...JSON.parse(row.data_json as string) } as T;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const existing = this.db
.prepare(`SELECT data_json FROM ${collection} WHERE id = ?`)
.get(id) as any;
if (!existing) return false;
const merged = { ...JSON.parse(existing.data_json as string), ...data };
const now = new Date().toISOString();
const result = this.db
.prepare(`UPDATE ${collection} SET data_json = ?, updated_at = ? WHERE id = ?`)
.run(JSON.stringify(merged), now, id);
return result.changes > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const result = this.db.prepare(`DELETE FROM ${collection} WHERE id = ?`).run(id);
return result.changes > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const { clause, params } = buildFilterClause(filter);
let query = `SELECT * FROM ${collection}${clause}`;
if (opts?.orderBy) {
const dir = opts.order === 'desc' ? 'DESC' : 'ASC';
const col =
opts.orderBy === 'id' || opts.orderBy === 'created_at' || opts.orderBy === 'updated_at'
? opts.orderBy
: `json_extract(data_json, '$.${opts.orderBy}')`;
query += ` ORDER BY ${col} ${dir}`;
}
if (opts?.limit) {
query += ` LIMIT ?`;
params.push(opts.limit);
}
if (opts?.offset) {
query += ` OFFSET ?`;
params.push(opts.offset);
}
const rows = this.db.prepare(query).all(...params) as any[];
return rows.map((row) => ({ id: row.id, ...JSON.parse(row.data_json as string) }) as T);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const { clause, params } = buildFilterClause(filter);
const row = this.db
.prepare(`SELECT COUNT(*) as count FROM ${collection}${clause}`)
.get(...params) as any;
return row?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
const txAdapter = new SqliteTxAdapter(this.db);
this.db.exec('BEGIN');
try {
const result = await fn(txAdapter);
this.db.exec('COMMIT');
return result;
} catch (err) {
this.db.exec('ROLLBACK');
throw err;
}
}
async migrate(): Promise<void> {
const createTable = (name: string) =>
this.db.exec(`
CREATE TABLE IF NOT EXISTS ${name} (
id TEXT PRIMARY KEY,
data_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
`);
for (const collection of COLLECTIONS) {
createTable(collection);
}
}
async close(): Promise<void> {
this.db.close();
}
}
/**
* Transaction wrapper that uses the same db handle — better-sqlite3 transactions
* are connection-level, so all statements on the same Database instance within
* a db.transaction() callback participate in the transaction.
*/
class SqliteTxAdapter implements StorageAdapter {
readonly name = 'sqlite';
private db: Database.Database;
constructor(db: Database.Database) {
this.db = db;
}
async create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }> {
const id = (data as any).id ?? randomUUID();
const now = new Date().toISOString();
const rest = Object.fromEntries(Object.entries(data).filter(([k]) => k !== 'id'));
this.db
.prepare(
`INSERT INTO ${collection} (id, data_json, created_at, updated_at) VALUES (?, ?, ?, ?)`,
)
.run(id, JSON.stringify(rest), now, now);
return { ...data, id } as T & { id: string };
}
async read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null> {
const row = this.db.prepare(`SELECT * FROM ${collection} WHERE id = ?`).get(id) as any;
if (!row) return null;
return { id: row.id, ...JSON.parse(row.data_json as string) } as T;
}
async update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean> {
const existing = this.db
.prepare(`SELECT data_json FROM ${collection} WHERE id = ?`)
.get(id) as any;
if (!existing) return false;
const merged = { ...JSON.parse(existing.data_json as string), ...data };
const now = new Date().toISOString();
const result = this.db
.prepare(`UPDATE ${collection} SET data_json = ?, updated_at = ? WHERE id = ?`)
.run(JSON.stringify(merged), now, id);
return result.changes > 0;
}
async delete(collection: string, id: string): Promise<boolean> {
const result = this.db.prepare(`DELETE FROM ${collection} WHERE id = ?`).run(id);
return result.changes > 0;
}
async find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: { limit?: number; offset?: number; orderBy?: string; order?: 'asc' | 'desc' },
): Promise<T[]> {
const { clause, params } = buildFilterClause(filter);
let query = `SELECT * FROM ${collection}${clause}`;
if (opts?.orderBy) {
const dir = opts.order === 'desc' ? 'DESC' : 'ASC';
const col =
opts.orderBy === 'id' || opts.orderBy === 'created_at' || opts.orderBy === 'updated_at'
? opts.orderBy
: `json_extract(data_json, '$.${opts.orderBy}')`;
query += ` ORDER BY ${col} ${dir}`;
}
if (opts?.limit) {
query += ` LIMIT ?`;
params.push(opts.limit);
}
if (opts?.offset) {
query += ` OFFSET ?`;
params.push(opts.offset);
}
const rows = this.db.prepare(query).all(...params) as any[];
return rows.map((row) => ({ id: row.id, ...JSON.parse(row.data_json as string) }) as T);
}
async findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null> {
const results = await this.find<T>(collection, filter, { limit: 1 });
return results[0] ?? null;
}
async count(collection: string, filter?: Record<string, unknown>): Promise<number> {
const { clause, params } = buildFilterClause(filter);
const row = this.db
.prepare(`SELECT COUNT(*) as count FROM ${collection}${clause}`)
.get(...params) as any;
return row?.count ?? 0;
}
async transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T> {
return fn(this);
}
async migrate(): Promise<void> {
// No-op inside transaction
}
async close(): Promise<void> {
// No-op inside transaction
}
}

View File

@@ -0,0 +1,18 @@
import type { StorageAdapter, StorageConfig } from './types.js';
type StorageType = StorageConfig['type'];
const registry = new Map<StorageType, (config: StorageConfig) => StorageAdapter>();
export function registerStorageAdapter(
type: StorageType,
factory: (config: StorageConfig) => StorageAdapter,
): void {
registry.set(type, factory);
}
export function createStorageAdapter(config: StorageConfig): StorageAdapter {
const factory = registry.get(config.type);
if (!factory) throw new Error(`No adapter registered for type: ${config.type}`);
return factory(config);
}

View File

@@ -0,0 +1,17 @@
export type { StorageAdapter, StorageConfig } from './types.js';
export { createStorageAdapter, registerStorageAdapter } from './factory.js';
export { PostgresAdapter } from './adapters/postgres.js';
export { SqliteAdapter } from './adapters/sqlite.js';
import { registerStorageAdapter } from './factory.js';
import { PostgresAdapter } from './adapters/postgres.js';
import { SqliteAdapter } from './adapters/sqlite.js';
import type { StorageConfig } from './types.js';
registerStorageAdapter('postgres', (config: StorageConfig) => {
return new PostgresAdapter(config as Extract<StorageConfig, { type: 'postgres' }>);
});
registerStorageAdapter('sqlite', (config: StorageConfig) => {
return new SqliteAdapter(config as Extract<StorageConfig, { type: 'sqlite' }>);
});

View File

@@ -0,0 +1,43 @@
export interface StorageAdapter {
readonly name: string;
create<T extends Record<string, unknown>>(
collection: string,
data: T,
): Promise<T & { id: string }>;
read<T extends Record<string, unknown>>(collection: string, id: string): Promise<T | null>;
update(collection: string, id: string, data: Record<string, unknown>): Promise<boolean>;
delete(collection: string, id: string): Promise<boolean>;
find<T extends Record<string, unknown>>(
collection: string,
filter?: Record<string, unknown>,
opts?: {
limit?: number;
offset?: number;
orderBy?: string;
order?: 'asc' | 'desc';
},
): Promise<T[]>;
findOne<T extends Record<string, unknown>>(
collection: string,
filter: Record<string, unknown>,
): Promise<T | null>;
count(collection: string, filter?: Record<string, unknown>): Promise<number>;
transaction<T>(fn: (tx: StorageAdapter) => Promise<T>): Promise<T>;
migrate(): Promise<void>;
close(): Promise<void>;
}
export type StorageConfig =
| { type: 'postgres'; url: string }
| { type: 'sqlite'; path: string }
| { type: 'files'; dataDir: string; format?: 'json' | 'md' };

View File

@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}