# Storage & Queue Abstraction — Middleware Architecture Design Status: Design (retrofit required) date: 2026-04-02 context: Agents coupled directly to infrastructure backends, bypassing intended middleware layer --- ## The Problem Current packages are **direct adapters**, not **middleware**: | Package | Current State | Intended Design | |---------|---------------|-----------------| | `@mosaicstack/queue` | `ioredis` hardcoded | Interface → BullMQ OR local-files | | `@mosaicstack/db` | Drizzle + Postgres hardcoded | Interface → Postgres OR SQLite OR JSON/MD | | `@mosaicstack/memory` | pgvector required | Interface → pgvector OR sqlite-vec OR keyword-search | ## The gateway and TUI import these packages directly, which means they they're coupled to specific infrastructure. Users cannot run Mosaic Stack without Postgres + Valkey. ## The Intended Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Gateway / TUI / CLI │ │ (agnostic of storage backend, talks to middleware) │ └───────────────────────────┬─────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼─────────────────┴─────────────────┴─────────────────┘ | | | | ▼─────────────────┴───────────────────┴─────────────────┘ | | | | Queue Storage Memory | | | | ┌─────────┬─────────┬─────────┬─────────────────────────────────┐ | BullMQ | | Local | | Postgres | SQLite | JSON/MD | pgvector | sqlite-vec | keyword | |(Valkey)| |(files) | | | | | | └─────────┴─────────┴─────────┴─────────────────────────────────┘ ``` The gateway imports the interface, not the backend. At startup it reads config and instantiates the correct adapter. ## The Drift ```typescript // What should have happened: gateway/queue.service.ts → @mosaicstack/queue (interface) → queue.adapter.ts // What actually happened: gateway/queue.service.ts → @mosaicstack/queue → ioredis (hardcoded) ``` ## The Current State Analysis ### `@mosaicstack/queue` (packages/queue/src/queue.ts) ```typescript import Redis from 'ioredis'; // ← Direct import of backend export function createQueue(config?: QueueConfig): QueueHandle { const url = config?.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL; const redis = new Redis(url, { maxRetriesPerRequest: 3 }); // ...queue ops directly on redis... } ``` **Problem:** `ioredis` is imported in the package, not the adapter interface. Consumers cannot swap backends. ### `@mosaicstack/db` (packages/db/src/client.ts) ```typescript import { drizzle, type PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import postgres from 'postgres'; export function createDb(url?: string): DbHandle { const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL; const sql = postgres(connectionString, { max: 20, idle_timeout: 30, connect_timeout: 5 }); const db = drizzle(sql, { schema }); // ... } ``` **Problem:** Drizzle + Postgres is hardcoded. No SQLite, JSON, or file-based options. ### `@mosaicstack/memory` (packages/memory/src/memory.ts) ```typescript import type { Db } from '@mosaicstack/db'; // ← Depends on Drizzle/PG export function createMemory(db: Db): Memory { return { preferences: createPreferencesRepo(db), insights: createInsightsRepo(db), }; } ``` **Problem:** Memory package is tightly coupled to `@mosaicstack/db` (which is Postgres-only). No alternative storage backends. ## The Target Interfaces ### Queue Interface ```typescript // packages/queue/src/types.ts export interface QueueAdapter { readonly name: string; enqueue(queueName: string, payload: TaskPayload): Promise; dequeue(queueName: string): Promise; length(queueName: string): Promise; publish(channel: string, message: string): Promise; subscribe(channel: string, handler: (message: string) => void): () => void; close(): Promise; } export interface TaskPayload { id: string; type: string; data: Record; createdAt: string; } export interface QueueConfig { type: 'bullmq' | 'local'; url?: string; // For bullmq: Valkey/Redis URL dataDir?: string; // For local: directory for JSON persistence } ``` ### Storage Interface ```typescript // packages/storage/src/types.ts export interface StorageAdapter { readonly name: string; // Entity CRUD create(collection: string, data: O): Promise; read(collection: string, id: string): Promise; update(collection: string, id: string, data: Partial): Promise; delete(collection: string, id: string): Promise; // Queries find(collection: string, filter: Record): Promise; findOne(collection: string, filter: Record; // Bulk operations createMany(collection: string, items: O[]): Promise; updateMany(collection: string, ids: string[], data: Partial): Promise; deleteMany(collection: string, ids: string[]): Promise; // Raw queries (for complex queries) query(collection: string, query: string, params?: unknown[]): Promise; // Transaction support transaction(fn: (tx: StorageTransaction) => Promise): Promise; close(): Promise; } export interface StorageTransaction { commit(): Promise; rollback(): Promise; } export interface StorageConfig { type: 'postgres' | 'sqlite' | 'files'; url?: string; // For postgres path?: string; // For sqlite/files } ``` ### Memory Interface (Vector + Preferences) ```typescript // packages/memory/src/types.ts export interface MemoryAdapter { readonly name: string; // Preferences (key-value storage) getPreference(userId: string, key: string): Promise; setPreference(userId: string, key: string, value: unknown): Promise; deletePreference(userId: string, key: string): Promise; listPreferences( userId: string, category?: string, ): Promise>; // Insights (with optional vector search) storeInsight(insight: NewInsight): Promise; getInsight(id: string): Promise; searchInsights(query: string, limit?: number, filter?: InsightFilter): Promise; deleteInsight(id: string): Promise; // Embedding provider (optional, null = no vector search) readonly embedder?: EmbeddingProvider | null; close(): Promise; } export interface NewInsight { id: string; userId: string; content: string; embedding?: number[]; // If embedder is available source: 'agent' | 'user' | 'summarization' | 'system'; category: 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general'; relevanceScore: number; metadata?: Record; createdAt: Date; decayedAt?: Date; } export interface InsightFilter { userId?: string; category?: string; source?: string; minRelevance?: number; fromDate?: Date; toDate?: Date; } export interface SearchResult { documentId: string; content: string; distance: number; metadata?: Record; } export interface MemoryConfig { type: 'pgvector' | 'sqlite-vec' | 'keyword'; storage: StorageAdapter; embedder?: EmbeddingProvider; } export interface EmbeddingProvider { embed(text: string): Promise; embedBatch(texts: string[]): Promise; readonly dimensions: number; } ``` ## Three Tiers ### Tier 1: Local (Zero Dependencies) **Target:** Single user, single machine, no external services | Component | Backend | Storage | | --------- | --------------------------------------------- | ------------ | | Queue | In-process + JSON files in `~/.mosaic/queue/` | | Storage | SQLite (better-sqlite3) `~/.mosaic/data.db` | | Memory | Keyword search | SQLite table | | Vector | None | N/A | **Dependencies:** - `better-sqlite3` (bundled) - No Postgres, No Valkey, No pgvector **Upgrade path:** 1. Run `mosaic gateway configure` → select "local" tier 2. Gateway starts with SQLite database 3. Optional: run `mosaic gateway upgrade --tier team` to migrate to Postgres ### Tier 2: Team (Postgres + Valkey) **Target:** Multiple users, shared server, CI/CD environments | Component | Backend | Storage | | --------- | -------------- | ------------------------------ | | Queue | BullMQ | Valkey | | Storage | Postgres | Shared PG instance | | Memory | pgvector | Postgres with vector extension | | Vector | LLM embeddings | Configured provider | **Dependencies:** - PostgreSQL 17+ with pgvector extension - Valkey (Redis-compatible) - LLM provider for embeddings **Migration from Local → Team:** 1. `mosaic gateway backup` → creates dump of SQLite database 2. `mosaic gateway upgrade --tier team` → restores to Postgres 3. Queue replays from BullMQ (may need manual reconciliation for in-flight jobs) 4. Memory embeddings regenerated if vector search was new ### Tier 3: Enterprise (Clustered) **Target:** Large teams, multi-region, high availability | Component | Backend | Storage | | --------- | --------------------------- | ----------------------------- | | Queue | BullMQ cluster | Multiple Valkey nodes | | Storage | Postgres cluster | Primary + replicas | | Memory | Dedicated vector DB | Qdrant, Pinecone, or pgvector | | Vector | Dedicated embedding service | Separate microservice | ## MarkdownDB Integration For file-based storage, we use [MarkdownDB](https://markdowndb.com) to parse MD files into queryable data. **What it provides:** - Parses frontmatter (YAML/JSON/TOML) - Extracts links, tags, metadata - Builds index in JSON or SQLite - Queryable via SQL-like interface **Usage in Mosaic:** ```typescript // Local tier with MD files for documents const storage = createStorageAdapter({ type: 'files', path: path.join(mosaicHome, 'docs'), markdowndb: { parseFrontmatter: true, extractLinks: true, indexFile: 'index.json', }, }); ``` ## Dream Mode — Memory Consolidation Automated equivalent to Claude Code's "Dream: Memory Consolidation" cycle **Trigger:** Every 24 hours (if 5+ sessions active) **Phases:** 1. **Orient** — What happened, what's the current state - Scan recent session logs - Identify active tasks, missions, conversations - Calculate time window (last 24h) 2. **Gather** — Pull in relevant context - Load conversations, decisions, agent logs - Extract key interactions and outcomes - Identify patterns and learnings 3. **Consolidate** — Summarize and compress - Generate summary of the last 24h - Extract key decisions and their rationale - Identify recurring patterns - Compress verbose logs into concise insights 4. **Prune** — Archive and cleanup - Archive raw session files to dated folders - Delete redundant/temporary data - Update MEMORY.md with consolidated content - Update insight relevance scores **Implementation:** ```typescript // In @mosaicstack/dream (new package) export async function runDreamCycle(config: DreamConfig): Promise { const memory = await loadMemoryAdapter(config.storage); // Orient const sessions = await memory.getRecentSessions(24 * 60 * 60 * 1000); if (sessions.length < 5) return { skipped: true, reason: 'insufficient_sessions' }; // Gather const context = await gatherContext(memory, sessions); // Consolidate const consolidated = await consolidateWithLLM(context, config.llm); // Prune await pruneArchivedData(memory, config.retention); // Store consolidated insights await memory.storeInsights(consolidated.insights); return { sessionsProcessed: sessions.length, insightsCreated: consolidated.insights.length, bytesPruned: consolidated.bytesRemoved, }; } ``` --- ## Retrofit Plan ### Phase 1: Interface Extraction (2-3 days) **Goal:** Define interfaces without changing existing behavior 1. Create `packages/queue/src/types.ts` with `QueueAdapter` interface 2. Create `packages/storage/src/types.ts` with `StorageAdapter` interface 3. Create `packages/memory/src/types.ts` with `MemoryAdapter` interface (refactor existing) 4. Add adapter registry pattern to each package 5. No breaking changes — existing code continues to work ### Phase 2: Refactor Existing to Adapters (3-5 days) **Goal:** Move existing implementations behind adapters #### 2.1 Queue Refactor 1. Rename `packages/queue/src/queue.ts` → `packages/queue/src/adapters/bullmq.ts` 2. Create `packages/queue/src/index.ts` to export factory function 3. Factory function reads config, instantiates correct adapter 4. Update gateway imports to use factory #### 2.2 Storage Refactor 1. Create `packages/storage/` (new package) 2. Move Drizzle logic to `packages/storage/src/adapters/postgres.ts` 3. Create SQLite adapter in `packages/storage/src/adapters/sqlite.ts` 4. Update gateway to use storage factory 5. Deprecate direct `@mosaicstack/db` imports #### 2.3 Memory Refactor 1. Extract existing logic to `packages/memory/src/adapters/pgvector.ts` 2. Create keyword adapter in `packages/memory/src/adapters/keyword.ts` 3. Update vector-store.ts to be adapter-agnostic ### Phase 3: Local Tier Implementation (2-3 days) **Goal:** Zero-dependency baseline 1. Implement `packages/queue/src/adapters/local.ts` (in-process + JSON persistence) 2. Implement `packages/storage/src/adapters/files.ts` (JSON + MD via MarkdownDB) 3. Implement `packages/memory/src/adapters/keyword.ts` (TF-IDF search) 4. Add `packages/dream/` for consolidation cycle 5. Wire up local tier in gateway startup ### Phase 4: Configuration System (1-2 days) **Goal:** Runtime backend selection 1. Create `packages/config/src/storage.ts` for storage configuration 2. Add `mosaic.config.ts` schema with storage tier settings 3. Update gateway to read config on startup 4. Add `mosaic gateway configure` CLI command 5. Add tier migration commands (`mosaic gateway upgrade`) ### Phase 5: Testing & Documentation (2-3 days) 1. Unit tests for each adapter 2. Integration tests for factory pattern 3. Migration tests (local → team) 4. Update README and architecture docs 5. Add configuration guide --- ## File Changes Summary ### New Files ``` packages/ ├── config/ │ └── src/ │ ├── storage.ts # Storage config schema │ └── index.ts ├── dream/ # NEW: Dream mode consolidation │ ├── src/ │ │ ├── index.ts │ │ ├── orient.ts │ │ ├── gather.ts │ │ ├── consolidate.ts │ │ └── prune.ts │ └── package.json ├── queue/ │ └── src/ │ ├── types.ts # NEW: QueueAdapter interface │ ├── index.ts # NEW: Factory function │ └── adapters/ │ ├── bullmq.ts # MOVED from queue.ts │ └── local.ts # NEW: In-process adapter ├── storage/ # NEW: Storage abstraction │ ├── src/ │ │ ├── types.ts # StorageAdapter interface │ │ ├── index.ts # Factory function │ │ └── adapters/ │ │ ├── postgres.ts # MOVED from @mosaicstack/db │ │ ├── sqlite.ts # NEW: SQLite adapter │ │ └── files.ts # NEW: JSON/MD adapter │ └── package.json └── memory/ └── src/ ├── types.ts # UPDATED: MemoryAdapter interface ├── index.ts # UPDATED: Factory function └── adapters/ ├── pgvector.ts # EXTRACTED from existing code ├── sqlite-vec.ts # NEW: SQLite with vectors └── keyword.ts # NEW: TF-IDF search ``` ### Modified Files ``` packages/ ├── db/ # DEPRECATED: Logic moved to storage adapters ├── queue/ │ └── src/ │ └── queue.ts # → adapters/bullmq.ts ├── memory/ │ ├── src/ │ │ ├── memory.ts # → use factory │ │ ├── insights.ts # → use factory │ │ └── preferences.ts # → use factory │ └── package.json # Remove pgvector from dependencies └── gateway/ └── src/ ├── database/ │ └── database.module.ts # Update to use storage factory ├── memory/ │ └── memory.module.ts # Update to use memory factory └── queue/ └── queue.module.ts # Update to use queue factory ``` --- ## Breaking Changes 1. **`@mosaicstack/db`** → **`@mosaicstack/storage`** (with migration guide) 2. Direct `ioredis` imports → Use `@mosaicstack/queue` factory 3. Direct `pgvector` queries → Use `@mosaicstack/memory` factory 4. Gateway startup now requires storage config (defaults to local) ## Non-Breaking Migration Path 1. Existing deployments with Postgres/Valkey continue to work (default config) 2. New deployments can choose local tier 3. Migration commands available when ready to upgrade --- ## Success Criteria - [ ] Local tier runs with zero external dependencies - [ ] All three tiers (local, team, enterprise) work correctly - [ ] Factory pattern correctly selects backend at runtime - [ ] Migration from local → team preserves all data - [ ] Dream mode consolidates 24h of sessions - [ ] Documentation covers all three tiers and migration paths - [ ] All existing tests pass - [ ] New adapters have >80% coverage