- Updated all package.json name fields and dependency references - Updated all TypeScript/JavaScript imports - Updated .woodpecker/publish.yml filters and registry paths - Updated tools/install.sh scope default - Updated .npmrc registry paths (worktree + host) - Enhanced update-checker.ts with checkForAllUpdates() multi-package support - Updated CLI update command to show table of all packages - Added KNOWN_PACKAGES, formatAllPackagesTable, getInstallAllCommand - Marked checkForUpdate() with @deprecated JSDoc Closes #391
19 KiB
Storage & Queue Abstraction — Middleware Architecture
Design Status: Design (retrofit required) date: 2026-04-02 context: Agents coupled directly to infrastructure backends, bypassing intended middleware layer
The Problem
Current packages are direct adapters, not middleware:
| Package | Current State | Intended Design |
|---|---|---|
@mosaicstack/queue |
ioredis hardcoded |
Interface → BullMQ OR local-files |
@mosaicstack/db |
Drizzle + Postgres hardcoded | Interface → Postgres OR SQLite OR JSON/MD |
@mosaicstack/memory |
pgvector required | Interface → pgvector OR sqlite-vec OR keyword-search |
The gateway and TUI import these packages directly, which means they they're coupled to specific infrastructure. Users cannot run Mosaic Stack without Postgres + Valkey.
The Intended Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Gateway / TUI / CLI │
│ (agnostic of storage backend, talks to middleware) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼─────────────────┴─────────────────┴─────────────────┘
| | | |
▼─────────────────┴───────────────────┴─────────────────┘
| | | |
Queue Storage Memory
| | | |
┌─────────┬─────────┬─────────┬─────────────────────────────────┐
| BullMQ | | Local | | Postgres | SQLite | JSON/MD | pgvector | sqlite-vec | keyword |
|(Valkey)| |(files) | | | | | |
└─────────┴─────────┴─────────┴─────────────────────────────────┘
The gateway imports the interface, not the backend. At startup it reads config and instantiates the correct adapter.
The Drift
// What should have happened:
gateway/queue.service.ts → @mosaicstack/queue (interface) → queue.adapter.ts
// What actually happened:
gateway/queue.service.ts → @mosaicstack/queue → ioredis (hardcoded)
The Current State Analysis
@mosaicstack/queue (packages/queue/src/queue.ts)
import Redis from 'ioredis'; // ← Direct import of backend
export function createQueue(config?: QueueConfig): QueueHandle {
const url = config?.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
// ...queue ops directly on redis...
}
Problem: ioredis is imported in the package, not the adapter interface. Consumers cannot swap backends.
@mosaicstack/db (packages/db/src/client.ts)
import { drizzle, type PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
export function createDb(url?: string): DbHandle {
const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL;
const sql = postgres(connectionString, { max: 20, idle_timeout: 30, connect_timeout: 5 });
const db = drizzle(sql, { schema });
// ...
}
Problem: Drizzle + Postgres is hardcoded. No SQLite, JSON, or file-based options.
@mosaicstack/memory (packages/memory/src/memory.ts)
import type { Db } from '@mosaicstack/db'; // ← Depends on Drizzle/PG
export function createMemory(db: Db): Memory {
return {
preferences: createPreferencesRepo(db),
insights: createInsightsRepo(db),
};
}
Problem: Memory package is tightly coupled to @mosaicstack/db (which is Postgres-only). No alternative storage backends.
The Target Interfaces
Queue Interface
// packages/queue/src/types.ts
export interface QueueAdapter {
readonly name: string;
enqueue(queueName: string, payload: TaskPayload): Promise<void>;
dequeue(queueName: string): Promise<TaskPayload | null>;
length(queueName: string): Promise<number>;
publish(channel: string, message: string): Promise<void>;
subscribe(channel: string, handler: (message: string) => void): () => void;
close(): Promise<void>;
}
export interface TaskPayload {
id: string;
type: string;
data: Record<string, unknown>;
createdAt: string;
}
export interface QueueConfig {
type: 'bullmq' | 'local';
url?: string; // For bullmq: Valkey/Redis URL
dataDir?: string; // For local: directory for JSON persistence
}
Storage Interface
// packages/storage/src/types.ts
export interface StorageAdapter {
readonly name: string;
// Entity CRUD
create<T>(collection: string, data: O): Promise<T>;
read<T>(collection: string, id: string): Promise<T | null>;
update<T>(collection: string, id: string, data: Partial<O>): Promise<T | null>;
delete(collection: string, id: string): Promise<boolean>;
// Queries
find<T>(collection: string, filter: Record<string, unknown>): Promise<T[]>;
findOne<T>(collection: string, filter: Record<string, unknown): Promise<T | null>;
// Bulk operations
createMany<T>(collection: string, items: O[]): Promise<T[]>;
updateMany<T>(collection: string, ids: string[], data: Partial<O>): Promise<number>;
deleteMany(collection: string, ids: string[]): Promise<number>;
// Raw queries (for complex queries)
query<T>(collection: string, query: string, params?: unknown[]): Promise<T[]>;
// Transaction support
transaction<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T>;
close(): Promise<void>;
}
export interface StorageTransaction {
commit(): Promise<void>;
rollback(): Promise<void>;
}
export interface StorageConfig {
type: 'postgres' | 'sqlite' | 'files';
url?: string; // For postgres
path?: string; // For sqlite/files
}
Memory Interface (Vector + Preferences)
// packages/memory/src/types.ts
export interface MemoryAdapter {
readonly name: string;
// Preferences (key-value storage)
getPreference(userId: string, key: string): Promise<unknown | null>;
setPreference(userId: string, key: string, value: unknown): Promise<void>;
deletePreference(userId: string, key: string): Promise<boolean>;
listPreferences(
userId: string,
category?: string,
): Promise<Array<{ key: string; value: unknown }>>;
// Insights (with optional vector search)
storeInsight(insight: NewInsight): Promise<Insight>;
getInsight(id: string): Promise<Insight | null>;
searchInsights(query: string, limit?: number, filter?: InsightFilter): Promise<SearchResult[]>;
deleteInsight(id: string): Promise<boolean>;
// Embedding provider (optional, null = no vector search)
readonly embedder?: EmbeddingProvider | null;
close(): Promise<void>;
}
export interface NewInsight {
id: string;
userId: string;
content: string;
embedding?: number[]; // If embedder is available
source: 'agent' | 'user' | 'summarization' | 'system';
category: 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general';
relevanceScore: number;
metadata?: Record<string, unknown>;
createdAt: Date;
decayedAt?: Date;
}
export interface InsightFilter {
userId?: string;
category?: string;
source?: string;
minRelevance?: number;
fromDate?: Date;
toDate?: Date;
}
export interface SearchResult {
documentId: string;
content: string;
distance: number;
metadata?: Record<string, unknown>;
}
export interface MemoryConfig {
type: 'pgvector' | 'sqlite-vec' | 'keyword';
storage: StorageAdapter;
embedder?: EmbeddingProvider;
}
export interface EmbeddingProvider {
embed(text: string): Promise<number[]>;
embedBatch(texts: string[]): Promise<number[][]>;
readonly dimensions: number;
}
Three Tiers
Tier 1: Local (Zero Dependencies)
Target: Single user, single machine, no external services
| Component | Backend | Storage |
|---|---|---|
| Queue | In-process + JSON files in ~/.mosaic/queue/ |
|
| Storage | SQLite (better-sqlite3) ~/.mosaic/data.db |
|
| Memory | Keyword search | SQLite table |
| Vector | None | N/A |
Dependencies:
better-sqlite3(bundled)- No Postgres, No Valkey, No pgvector
Upgrade path:
- Run
mosaic gateway configure→ select "local" tier - Gateway starts with SQLite database
- Optional: run
mosaic gateway upgrade --tier teamto migrate to Postgres
Tier 2: Team (Postgres + Valkey)
Target: Multiple users, shared server, CI/CD environments
| Component | Backend | Storage |
|---|---|---|
| Queue | BullMQ | Valkey |
| Storage | Postgres | Shared PG instance |
| Memory | pgvector | Postgres with vector extension |
| Vector | LLM embeddings | Configured provider |
Dependencies:
- PostgreSQL 17+ with pgvector extension
- Valkey (Redis-compatible)
- LLM provider for embeddings
Migration from Local → Team:
mosaic gateway backup→ creates dump of SQLite databasemosaic gateway upgrade --tier team→ restores to Postgres- Queue replays from BullMQ (may need manual reconciliation for in-flight jobs)
- Memory embeddings regenerated if vector search was new
Tier 3: Enterprise (Clustered)
Target: Large teams, multi-region, high availability
| Component | Backend | Storage |
|---|---|---|
| Queue | BullMQ cluster | Multiple Valkey nodes |
| Storage | Postgres cluster | Primary + replicas |
| Memory | Dedicated vector DB | Qdrant, Pinecone, or pgvector |
| Vector | Dedicated embedding service | Separate microservice |
MarkdownDB Integration
For file-based storage, we use MarkdownDB to parse MD files into queryable data.
What it provides:
- Parses frontmatter (YAML/JSON/TOML)
- Extracts links, tags, metadata
- Builds index in JSON or SQLite
- Queryable via SQL-like interface
Usage in Mosaic:
// Local tier with MD files for documents
const storage = createStorageAdapter({
type: 'files',
path: path.join(mosaicHome, 'docs'),
markdowndb: {
parseFrontmatter: true,
extractLinks: true,
indexFile: 'index.json',
},
});
Dream Mode — Memory Consolidation
Automated equivalent to Claude Code's "Dream: Memory Consolidation" cycle
Trigger: Every 24 hours (if 5+ sessions active)
Phases:
-
Orient — What happened, what's the current state
- Scan recent session logs
- Identify active tasks, missions, conversations
- Calculate time window (last 24h)
-
Gather — Pull in relevant context
- Load conversations, decisions, agent logs
- Extract key interactions and outcomes
- Identify patterns and learnings
-
Consolidate — Summarize and compress
- Generate summary of the last 24h
- Extract key decisions and their rationale
- Identify recurring patterns
- Compress verbose logs into concise insights
-
Prune — Archive and cleanup
- Archive raw session files to dated folders
- Delete redundant/temporary data
- Update MEMORY.md with consolidated content
- Update insight relevance scores
Implementation:
// In @mosaicstack/dream (new package)
export async function runDreamCycle(config: DreamConfig): Promise<DreamResult> {
const memory = await loadMemoryAdapter(config.storage);
// Orient
const sessions = await memory.getRecentSessions(24 * 60 * 60 * 1000);
if (sessions.length < 5) return { skipped: true, reason: 'insufficient_sessions' };
// Gather
const context = await gatherContext(memory, sessions);
// Consolidate
const consolidated = await consolidateWithLLM(context, config.llm);
// Prune
await pruneArchivedData(memory, config.retention);
// Store consolidated insights
await memory.storeInsights(consolidated.insights);
return {
sessionsProcessed: sessions.length,
insightsCreated: consolidated.insights.length,
bytesPruned: consolidated.bytesRemoved,
};
}
Retrofit Plan
Phase 1: Interface Extraction (2-3 days)
Goal: Define interfaces without changing existing behavior
- Create
packages/queue/src/types.tswithQueueAdapterinterface - Create
packages/storage/src/types.tswithStorageAdapterinterface - Create
packages/memory/src/types.tswithMemoryAdapterinterface (refactor existing) - Add adapter registry pattern to each package
- No breaking changes — existing code continues to work
Phase 2: Refactor Existing to Adapters (3-5 days)
Goal: Move existing implementations behind adapters
2.1 Queue Refactor
- Rename
packages/queue/src/queue.ts→packages/queue/src/adapters/bullmq.ts - Create
packages/queue/src/index.tsto export factory function - Factory function reads config, instantiates correct adapter
- Update gateway imports to use factory
2.2 Storage Refactor
- Create
packages/storage/(new package) - Move Drizzle logic to
packages/storage/src/adapters/postgres.ts - Create SQLite adapter in
packages/storage/src/adapters/sqlite.ts - Update gateway to use storage factory
- Deprecate direct
@mosaicstack/dbimports
2.3 Memory Refactor
- Extract existing logic to
packages/memory/src/adapters/pgvector.ts - Create keyword adapter in
packages/memory/src/adapters/keyword.ts - Update vector-store.ts to be adapter-agnostic
Phase 3: Local Tier Implementation (2-3 days)
Goal: Zero-dependency baseline
- Implement
packages/queue/src/adapters/local.ts(in-process + JSON persistence) - Implement
packages/storage/src/adapters/files.ts(JSON + MD via MarkdownDB) - Implement
packages/memory/src/adapters/keyword.ts(TF-IDF search) - Add
packages/dream/for consolidation cycle - Wire up local tier in gateway startup
Phase 4: Configuration System (1-2 days)
Goal: Runtime backend selection
- Create
packages/config/src/storage.tsfor storage configuration - Add
mosaic.config.tsschema with storage tier settings - Update gateway to read config on startup
- Add
mosaic gateway configureCLI command - Add tier migration commands (
mosaic gateway upgrade)
Phase 5: Testing & Documentation (2-3 days)
- Unit tests for each adapter
- Integration tests for factory pattern
- Migration tests (local → team)
- Update README and architecture docs
- Add configuration guide
File Changes Summary
New Files
packages/
├── config/
│ └── src/
│ ├── storage.ts # Storage config schema
│ └── index.ts
├── dream/ # NEW: Dream mode consolidation
│ ├── src/
│ │ ├── index.ts
│ │ ├── orient.ts
│ │ ├── gather.ts
│ │ ├── consolidate.ts
│ │ └── prune.ts
│ └── package.json
├── queue/
│ └── src/
│ ├── types.ts # NEW: QueueAdapter interface
│ ├── index.ts # NEW: Factory function
│ └── adapters/
│ ├── bullmq.ts # MOVED from queue.ts
│ └── local.ts # NEW: In-process adapter
├── storage/ # NEW: Storage abstraction
│ ├── src/
│ │ ├── types.ts # StorageAdapter interface
│ │ ├── index.ts # Factory function
│ │ └── adapters/
│ │ ├── postgres.ts # MOVED from @mosaicstack/db
│ │ ├── sqlite.ts # NEW: SQLite adapter
│ │ └── files.ts # NEW: JSON/MD adapter
│ └── package.json
└── memory/
└── src/
├── types.ts # UPDATED: MemoryAdapter interface
├── index.ts # UPDATED: Factory function
└── adapters/
├── pgvector.ts # EXTRACTED from existing code
├── sqlite-vec.ts # NEW: SQLite with vectors
└── keyword.ts # NEW: TF-IDF search
Modified Files
packages/
├── db/ # DEPRECATED: Logic moved to storage adapters
├── queue/
│ └── src/
│ └── queue.ts # → adapters/bullmq.ts
├── memory/
│ ├── src/
│ │ ├── memory.ts # → use factory
│ │ ├── insights.ts # → use factory
│ │ └── preferences.ts # → use factory
│ └── package.json # Remove pgvector from dependencies
└── gateway/
└── src/
├── database/
│ └── database.module.ts # Update to use storage factory
├── memory/
│ └── memory.module.ts # Update to use memory factory
└── queue/
└── queue.module.ts # Update to use queue factory
Breaking Changes
@mosaicstack/db→@mosaicstack/storage(with migration guide)- Direct
ioredisimports → Use@mosaicstack/queuefactory - Direct
pgvectorqueries → Use@mosaicstack/memoryfactory - Gateway startup now requires storage config (defaults to local)
Non-Breaking Migration Path
- Existing deployments with Postgres/Valkey continue to work (default config)
- New deployments can choose local tier
- Migration commands available when ready to upgrade
Success Criteria
- Local tier runs with zero external dependencies
- All three tiers (local, team, enterprise) work correctly
- Factory pattern correctly selects backend at runtime
- Migration from local → team preserves all data
- Dream mode consolidates 24h of sessions
- Documentation covers all three tiers and migration paths
- All existing tests pass
- New adapters have >80% coverage