Files
stack/docs/design/storage-abstraction-middleware.md
Jarvis 774b76447d
Some checks failed
ci/woodpecker/pr/ci Pipeline failed
ci/woodpecker/push/ci Pipeline failed
fix: rename all packages from @mosaic/* to @mosaicstack/*
- Updated all package.json name fields and dependency references
- Updated all TypeScript/JavaScript imports
- Updated .woodpecker/publish.yml filters and registry paths
- Updated tools/install.sh scope default
- Updated .npmrc registry paths (worktree + host)
- Enhanced update-checker.ts with checkForAllUpdates() multi-package support
- Updated CLI update command to show table of all packages
- Added KNOWN_PACKAGES, formatAllPackagesTable, getInstallAllCommand
- Marked checkForUpdate() with @deprecated JSDoc

Closes #391
2026-04-04 21:43:23 -05:00

19 KiB

Storage & Queue Abstraction — Middleware Architecture

Design Status: Design (retrofit required) date: 2026-04-02 context: Agents coupled directly to infrastructure backends, bypassing intended middleware layer


The Problem

Current packages are direct adapters, not middleware:

Package Current State Intended Design
@mosaicstack/queue ioredis hardcoded Interface → BullMQ OR local-files
@mosaicstack/db Drizzle + Postgres hardcoded Interface → Postgres OR SQLite OR JSON/MD
@mosaicstack/memory pgvector required Interface → pgvector OR sqlite-vec OR keyword-search

The gateway and TUI import these packages directly, which means they they're coupled to specific infrastructure. Users cannot run Mosaic Stack without Postgres + Valkey.

The Intended Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Gateway / TUI / CLI                          │
│            (agnostic of storage backend, talks to middleware)    │
└───────────────────────────┬─────────────────────────────────────┘
                            │
        ┌───────────────────┼───────────────────┐
        │                 │                       │
        ▼─────────────────┴─────────────────┴─────────────────┘
        |                 |                     |                 |
        ▼─────────────────┴───────────────────┴─────────────────┘
        |                 |                     |                 |
   Queue               Storage             Memory
        |                 |                     |                 |
   ┌─────────┬─────────┬─────────┬─────────────────────────────────┐
   | BullMQ | | Local  | | Postgres | SQLite | JSON/MD | pgvector | sqlite-vec | keyword |
   |(Valkey)| |(files) |         |             |         |          |          |
   └─────────┴─────────┴─────────┴─────────────────────────────────┘

The gateway imports the interface, not the backend. At startup it reads config and instantiates the correct adapter.

The Drift

// What should have happened:
gateway/queue.service.ts  @mosaicstack/queue (interface)  queue.adapter.ts

// What actually happened:
gateway/queue.service.ts  @mosaicstack/queue  ioredis (hardcoded)

The Current State Analysis

@mosaicstack/queue (packages/queue/src/queue.ts)

import Redis from 'ioredis'; // ← Direct import of backend

export function createQueue(config?: QueueConfig): QueueHandle {
  const url = config?.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
  const redis = new Redis(url, { maxRetriesPerRequest: 3 });
  // ...queue ops directly on redis...
}

Problem: ioredis is imported in the package, not the adapter interface. Consumers cannot swap backends.

@mosaicstack/db (packages/db/src/client.ts)

import { drizzle, type PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';

export function createDb(url?: string): DbHandle {
  const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL;
  const sql = postgres(connectionString, { max: 20, idle_timeout: 30, connect_timeout: 5 });
  const db = drizzle(sql, { schema });
  // ...
}

Problem: Drizzle + Postgres is hardcoded. No SQLite, JSON, or file-based options.

@mosaicstack/memory (packages/memory/src/memory.ts)

import type { Db } from '@mosaicstack/db'; // ← Depends on Drizzle/PG

export function createMemory(db: Db): Memory {
  return {
    preferences: createPreferencesRepo(db),
    insights: createInsightsRepo(db),
  };
}

Problem: Memory package is tightly coupled to @mosaicstack/db (which is Postgres-only). No alternative storage backends.

The Target Interfaces

Queue Interface

// packages/queue/src/types.ts
export interface QueueAdapter {
  readonly name: string;

  enqueue(queueName: string, payload: TaskPayload): Promise<void>;
  dequeue(queueName: string): Promise<TaskPayload | null>;
  length(queueName: string): Promise<number>;
  publish(channel: string, message: string): Promise<void>;
  subscribe(channel: string, handler: (message: string) => void): () => void;
  close(): Promise<void>;
}

export interface TaskPayload {
  id: string;
  type: string;
  data: Record<string, unknown>;
  createdAt: string;
}

export interface QueueConfig {
  type: 'bullmq' | 'local';
  url?: string; // For bullmq: Valkey/Redis URL
  dataDir?: string; // For local: directory for JSON persistence
}

Storage Interface

// packages/storage/src/types.ts
export interface StorageAdapter {
  readonly name: string;

  // Entity CRUD
  create<T>(collection: string, data: O): Promise<T>;
  read<T>(collection: string, id: string): Promise<T | null>;
  update<T>(collection: string, id: string, data: Partial<O>): Promise<T | null>;
  delete(collection: string, id: string): Promise<boolean>;

  // Queries
  find<T>(collection: string, filter: Record<string, unknown>): Promise<T[]>;
  findOne<T>(collection: string, filter: Record<string, unknown): Promise<T | null>;

  // Bulk operations
  createMany<T>(collection: string, items: O[]): Promise<T[]>;
  updateMany<T>(collection: string, ids: string[], data: Partial<O>): Promise<number>;
  deleteMany(collection: string, ids: string[]): Promise<number>;

  // Raw queries (for complex queries)
  query<T>(collection: string, query: string, params?: unknown[]): Promise<T[]>;

  // Transaction support
  transaction<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T>;

  close(): Promise<void>;
}

export interface StorageTransaction {
  commit(): Promise<void>;
  rollback(): Promise<void>;
}

export interface StorageConfig {
  type: 'postgres' | 'sqlite' | 'files';
  url?: string;      // For postgres
  path?: string;      // For sqlite/files
}

Memory Interface (Vector + Preferences)

// packages/memory/src/types.ts
export interface MemoryAdapter {
  readonly name: string;

  // Preferences (key-value storage)
  getPreference(userId: string, key: string): Promise<unknown | null>;
  setPreference(userId: string, key: string, value: unknown): Promise<void>;
  deletePreference(userId: string, key: string): Promise<boolean>;
  listPreferences(
    userId: string,
    category?: string,
  ): Promise<Array<{ key: string; value: unknown }>>;

  // Insights (with optional vector search)
  storeInsight(insight: NewInsight): Promise<Insight>;
  getInsight(id: string): Promise<Insight | null>;
  searchInsights(query: string, limit?: number, filter?: InsightFilter): Promise<SearchResult[]>;
  deleteInsight(id: string): Promise<boolean>;

  // Embedding provider (optional, null = no vector search)
  readonly embedder?: EmbeddingProvider | null;

  close(): Promise<void>;
}

export interface NewInsight {
  id: string;
  userId: string;
  content: string;
  embedding?: number[]; // If embedder is available
  source: 'agent' | 'user' | 'summarization' | 'system';
  category: 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general';
  relevanceScore: number;
  metadata?: Record<string, unknown>;
  createdAt: Date;
  decayedAt?: Date;
}

export interface InsightFilter {
  userId?: string;
  category?: string;
  source?: string;
  minRelevance?: number;
  fromDate?: Date;
  toDate?: Date;
}

export interface SearchResult {
  documentId: string;
  content: string;
  distance: number;
  metadata?: Record<string, unknown>;
}

export interface MemoryConfig {
  type: 'pgvector' | 'sqlite-vec' | 'keyword';
  storage: StorageAdapter;
  embedder?: EmbeddingProvider;
}

export interface EmbeddingProvider {
  embed(text: string): Promise<number[]>;
  embedBatch(texts: string[]): Promise<number[][]>;
  readonly dimensions: number;
}

Three Tiers

Tier 1: Local (Zero Dependencies)

Target: Single user, single machine, no external services

Component Backend Storage
Queue In-process + JSON files in ~/.mosaic/queue/
Storage SQLite (better-sqlite3) ~/.mosaic/data.db
Memory Keyword search SQLite table
Vector None N/A

Dependencies:

  • better-sqlite3 (bundled)
  • No Postgres, No Valkey, No pgvector

Upgrade path:

  1. Run mosaic gateway configure → select "local" tier
  2. Gateway starts with SQLite database
  3. Optional: run mosaic gateway upgrade --tier team to migrate to Postgres

Tier 2: Team (Postgres + Valkey)

Target: Multiple users, shared server, CI/CD environments

Component Backend Storage
Queue BullMQ Valkey
Storage Postgres Shared PG instance
Memory pgvector Postgres with vector extension
Vector LLM embeddings Configured provider

Dependencies:

  • PostgreSQL 17+ with pgvector extension
  • Valkey (Redis-compatible)
  • LLM provider for embeddings

Migration from Local → Team:

  1. mosaic gateway backup → creates dump of SQLite database
  2. mosaic gateway upgrade --tier team → restores to Postgres
  3. Queue replays from BullMQ (may need manual reconciliation for in-flight jobs)
  4. Memory embeddings regenerated if vector search was new

Tier 3: Enterprise (Clustered)

Target: Large teams, multi-region, high availability

Component Backend Storage
Queue BullMQ cluster Multiple Valkey nodes
Storage Postgres cluster Primary + replicas
Memory Dedicated vector DB Qdrant, Pinecone, or pgvector
Vector Dedicated embedding service Separate microservice

MarkdownDB Integration

For file-based storage, we use MarkdownDB to parse MD files into queryable data.

What it provides:

  • Parses frontmatter (YAML/JSON/TOML)
  • Extracts links, tags, metadata
  • Builds index in JSON or SQLite
  • Queryable via SQL-like interface

Usage in Mosaic:

// Local tier with MD files for documents
const storage = createStorageAdapter({
  type: 'files',
  path: path.join(mosaicHome, 'docs'),
  markdowndb: {
    parseFrontmatter: true,
    extractLinks: true,
    indexFile: 'index.json',
  },
});

Dream Mode — Memory Consolidation

Automated equivalent to Claude Code's "Dream: Memory Consolidation" cycle

Trigger: Every 24 hours (if 5+ sessions active)

Phases:

  1. Orient — What happened, what's the current state

    • Scan recent session logs
    • Identify active tasks, missions, conversations
    • Calculate time window (last 24h)
  2. Gather — Pull in relevant context

    • Load conversations, decisions, agent logs
    • Extract key interactions and outcomes
    • Identify patterns and learnings
  3. Consolidate — Summarize and compress

    • Generate summary of the last 24h
    • Extract key decisions and their rationale
    • Identify recurring patterns
    • Compress verbose logs into concise insights
  4. Prune — Archive and cleanup

    • Archive raw session files to dated folders
    • Delete redundant/temporary data
    • Update MEMORY.md with consolidated content
    • Update insight relevance scores

Implementation:

// In @mosaicstack/dream (new package)
export async function runDreamCycle(config: DreamConfig): Promise<DreamResult> {
  const memory = await loadMemoryAdapter(config.storage);

  // Orient
  const sessions = await memory.getRecentSessions(24 * 60 * 60 * 1000);
  if (sessions.length < 5) return { skipped: true, reason: 'insufficient_sessions' };

  // Gather
  const context = await gatherContext(memory, sessions);

  // Consolidate
  const consolidated = await consolidateWithLLM(context, config.llm);

  // Prune
  await pruneArchivedData(memory, config.retention);

  // Store consolidated insights
  await memory.storeInsights(consolidated.insights);

  return {
    sessionsProcessed: sessions.length,
    insightsCreated: consolidated.insights.length,
    bytesPruned: consolidated.bytesRemoved,
  };
}

Retrofit Plan

Phase 1: Interface Extraction (2-3 days)

Goal: Define interfaces without changing existing behavior

  1. Create packages/queue/src/types.ts with QueueAdapter interface
  2. Create packages/storage/src/types.ts with StorageAdapter interface
  3. Create packages/memory/src/types.ts with MemoryAdapter interface (refactor existing)
  4. Add adapter registry pattern to each package
  5. No breaking changes — existing code continues to work

Phase 2: Refactor Existing to Adapters (3-5 days)

Goal: Move existing implementations behind adapters

2.1 Queue Refactor

  1. Rename packages/queue/src/queue.tspackages/queue/src/adapters/bullmq.ts
  2. Create packages/queue/src/index.ts to export factory function
  3. Factory function reads config, instantiates correct adapter
  4. Update gateway imports to use factory

2.2 Storage Refactor

  1. Create packages/storage/ (new package)
  2. Move Drizzle logic to packages/storage/src/adapters/postgres.ts
  3. Create SQLite adapter in packages/storage/src/adapters/sqlite.ts
  4. Update gateway to use storage factory
  5. Deprecate direct @mosaicstack/db imports

2.3 Memory Refactor

  1. Extract existing logic to packages/memory/src/adapters/pgvector.ts
  2. Create keyword adapter in packages/memory/src/adapters/keyword.ts
  3. Update vector-store.ts to be adapter-agnostic

Phase 3: Local Tier Implementation (2-3 days)

Goal: Zero-dependency baseline

  1. Implement packages/queue/src/adapters/local.ts (in-process + JSON persistence)
  2. Implement packages/storage/src/adapters/files.ts (JSON + MD via MarkdownDB)
  3. Implement packages/memory/src/adapters/keyword.ts (TF-IDF search)
  4. Add packages/dream/ for consolidation cycle
  5. Wire up local tier in gateway startup

Phase 4: Configuration System (1-2 days)

Goal: Runtime backend selection

  1. Create packages/config/src/storage.ts for storage configuration
  2. Add mosaic.config.ts schema with storage tier settings
  3. Update gateway to read config on startup
  4. Add mosaic gateway configure CLI command
  5. Add tier migration commands (mosaic gateway upgrade)

Phase 5: Testing & Documentation (2-3 days)

  1. Unit tests for each adapter
  2. Integration tests for factory pattern
  3. Migration tests (local → team)
  4. Update README and architecture docs
  5. Add configuration guide

File Changes Summary

New Files

packages/
├── config/
│   └── src/
│       ├── storage.ts          # Storage config schema
│       └── index.ts
├── dream/                    # NEW: Dream mode consolidation
│   ├── src/
│   │   ├── index.ts
│   │   ├── orient.ts
│   │   ├── gather.ts
│   │   ├── consolidate.ts
│   │   └── prune.ts
│   └── package.json
├── queue/
│   └── src/
│       ├── types.ts           # NEW: QueueAdapter interface
│       ├── index.ts           # NEW: Factory function
│       └── adapters/
│           ├── bullmq.ts      # MOVED from queue.ts
│           └── local.ts       # NEW: In-process adapter
├── storage/                  # NEW: Storage abstraction
│   ├── src/
│   │   ├── types.ts           # StorageAdapter interface
│   │   ├── index.ts           # Factory function
│   │   └── adapters/
│   │       ├── postgres.ts    # MOVED from @mosaicstack/db
│   │       ├── sqlite.ts      # NEW: SQLite adapter
│   │       └── files.ts       # NEW: JSON/MD adapter
│   └── package.json
└── memory/
    └── src/
        ├── types.ts           # UPDATED: MemoryAdapter interface
        ├── index.ts           # UPDATED: Factory function
        └── adapters/
            ├── pgvector.ts    # EXTRACTED from existing code
            ├── sqlite-vec.ts  # NEW: SQLite with vectors
            └── keyword.ts     # NEW: TF-IDF search

Modified Files

packages/
├── db/                       # DEPRECATED: Logic moved to storage adapters
├── queue/
│   └── src/
│       └── queue.ts          # → adapters/bullmq.ts
├── memory/
│   ├── src/
│   │   ├── memory.ts         # → use factory
│   │   ├── insights.ts       # → use factory
│   │   └── preferences.ts    # → use factory
│   └── package.json          # Remove pgvector from dependencies
└── gateway/
    └── src/
        ├── database/
        │   └── database.module.ts  # Update to use storage factory
        ├── memory/
        │   └── memory.module.ts  # Update to use memory factory
        └── queue/
            └── queue.module.ts  # Update to use queue factory

Breaking Changes

  1. @mosaicstack/db@mosaicstack/storage (with migration guide)
  2. Direct ioredis imports → Use @mosaicstack/queue factory
  3. Direct pgvector queries → Use @mosaicstack/memory factory
  4. Gateway startup now requires storage config (defaults to local)

Non-Breaking Migration Path

  1. Existing deployments with Postgres/Valkey continue to work (default config)
  2. New deployments can choose local tier
  3. Migration commands available when ready to upgrade

Success Criteria

  • Local tier runs with zero external dependencies
  • All three tiers (local, team, enterprise) work correctly
  • Factory pattern correctly selects backend at runtime
  • Migration from local → team preserves all data
  • Dream mode consolidates 24h of sessions
  • Documentation covers all three tiers and migration paths
  • All existing tests pass
  • New adapters have >80% coverage