chore(orchestrator): Bootstrap storage abstraction retrofit

Mission: Decouple gateway from hardcoded Postgres/Valkey backends.
20 tasks across 5 phases. Estimated total: ~214K tokens.

Phase 1: Interface extraction (4 tasks)
Phase 2: Wrap existing backends as adapters (5 tasks)
Phase 3: Local tier implementation (4 tasks)
Phase 4: Config + CLI commands (4 tasks)
Phase 5: Migration + docs (3 tasks)
This commit is contained in:
Jarvis
2026-04-02 20:15:57 -05:00
parent 0be9729e40
commit 73043773d8
2 changed files with 580 additions and 68 deletions

View File

@@ -0,0 +1,555 @@
# Storage & Queue Abstraction — Middleware Architecture
Design
Status: Design (retrofit required)
date: 2026-04-02
context: Agents coupled directly to infrastructure backends, bypassing intended middleware layer
---
## The Problem
Current packages are **direct adapters**, not **middleware**:
| Package | Current State | Intended Design |
|---------|---------------|-----------------|
| `@mosaic/queue` | `ioredis` hardcoded | Interface → BullMQ OR local-files |
| `@mosaic/db` | Drizzle + Postgres hardcoded | Interface → Postgres OR SQLite OR JSON/MD |
| `@mosaic/memory` | pgvector required | Interface → pgvector OR sqlite-vec OR keyword-search |
## The gateway and TUI import these packages directly, which means they they're coupled to specific infrastructure. Users cannot run Mosaic Stack without Postgres + Valkey.
## The Intended Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Gateway / TUI / CLI │
│ (agnostic of storage backend, talks to middleware) │
└───────────────────────────┬─────────────────────────────────────┘
┌───────────────────┼───────────────────┐
│ │ │
▼─────────────────┴─────────────────┴─────────────────┘
| | | |
▼─────────────────┴───────────────────┴─────────────────┘
| | | |
Queue Storage Memory
| | | |
┌─────────┬─────────┬─────────┬─────────────────────────────────┐
| BullMQ | | Local | | Postgres | SQLite | JSON/MD | pgvector | sqlite-vec | keyword |
|(Valkey)| |(files) | | | | | |
└─────────┴─────────┴─────────┴─────────────────────────────────┘
```
The gateway imports the interface, not the backend. At startup it reads config and instantiates the correct adapter.
## The Drift
```typescript
// What should have happened:
gateway/queue.service.ts @mosaic/queue (interface) queue.adapter.ts
// What actually happened:
gateway/queue.service.ts @mosaic/queue ioredis (hardcoded)
```
## The Current State Analysis
### `@mosaic/queue` (packages/queue/src/queue.ts)
```typescript
import Redis from 'ioredis'; // ← Direct import of backend
export function createQueue(config?: QueueConfig): QueueHandle {
const url = config?.url ?? process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
const redis = new Redis(url, { maxRetriesPerRequest: 3 });
// ...queue ops directly on redis...
}
```
**Problem:** `ioredis` is imported in the package, not the adapter interface. Consumers cannot swap backends.
### `@mosaic/db` (packages/db/src/client.ts)
```typescript
import { drizzle, type PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
export function createDb(url?: string): DbHandle {
const connectionString = url ?? process.env['DATABASE_URL'] ?? DEFAULT_DATABASE_URL;
const sql = postgres(connectionString, { max: 20, idle_timeout: 30, connect_timeout: 5 });
const db = drizzle(sql, { schema });
// ...
}
```
**Problem:** Drizzle + Postgres is hardcoded. No SQLite, JSON, or file-based options.
### `@mosaic/memory` (packages/memory/src/memory.ts)
```typescript
import type { Db } from '@mosaic/db'; // ← Depends on Drizzle/PG
export function createMemory(db: Db): Memory {
return {
preferences: createPreferencesRepo(db),
insights: createInsightsRepo(db),
};
}
```
**Problem:** Memory package is tightly coupled to `@mosaic/db` (which is Postgres-only). No alternative storage backends.
## The Target Interfaces
### Queue Interface
```typescript
// packages/queue/src/types.ts
export interface QueueAdapter {
readonly name: string;
enqueue(queueName: string, payload: TaskPayload): Promise<void>;
dequeue(queueName: string): Promise<TaskPayload | null>;
length(queueName: string): Promise<number>;
publish(channel: string, message: string): Promise<void>;
subscribe(channel: string, handler: (message: string) => void): () => void;
close(): Promise<void>;
}
export interface TaskPayload {
id: string;
type: string;
data: Record<string, unknown>;
createdAt: string;
}
export interface QueueConfig {
type: 'bullmq' | 'local';
url?: string; // For bullmq: Valkey/Redis URL
dataDir?: string; // For local: directory for JSON persistence
}
```
### Storage Interface
```typescript
// packages/storage/src/types.ts
export interface StorageAdapter {
readonly name: string;
// Entity CRUD
create<T>(collection: string, data: O): Promise<T>;
read<T>(collection: string, id: string): Promise<T | null>;
update<T>(collection: string, id: string, data: Partial<O>): Promise<T | null>;
delete(collection: string, id: string): Promise<boolean>;
// Queries
find<T>(collection: string, filter: Record<string, unknown>): Promise<T[]>;
findOne<T>(collection: string, filter: Record<string, unknown): Promise<T | null>;
// Bulk operations
createMany<T>(collection: string, items: O[]): Promise<T[]>;
updateMany<T>(collection: string, ids: string[], data: Partial<O>): Promise<number>;
deleteMany(collection: string, ids: string[]): Promise<number>;
// Raw queries (for complex queries)
query<T>(collection: string, query: string, params?: unknown[]): Promise<T[]>;
// Transaction support
transaction<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T>;
close(): Promise<void>;
}
export interface StorageTransaction {
commit(): Promise<void>;
rollback(): Promise<void>;
}
export interface StorageConfig {
type: 'postgres' | 'sqlite' | 'files';
url?: string; // For postgres
path?: string; // For sqlite/files
}
```
### Memory Interface (Vector + Preferences)
```typescript
// packages/memory/src/types.ts
export interface MemoryAdapter {
readonly name: string;
// Preferences (key-value storage)
getPreference(userId: string, key: string): Promise<unknown | null>;
setPreference(userId: string, key: string, value: unknown): Promise<void>;
deletePreference(userId: string, key: string): Promise<boolean>;
listPreferences(
userId: string,
category?: string,
): Promise<Array<{ key: string; value: unknown }>>;
// Insights (with optional vector search)
storeInsight(insight: NewInsight): Promise<Insight>;
getInsight(id: string): Promise<Insight | null>;
searchInsights(query: string, limit?: number, filter?: InsightFilter): Promise<SearchResult[]>;
deleteInsight(id: string): Promise<boolean>;
// Embedding provider (optional, null = no vector search)
readonly embedder?: EmbeddingProvider | null;
close(): Promise<void>;
}
export interface NewInsight {
id: string;
userId: string;
content: string;
embedding?: number[]; // If embedder is available
source: 'agent' | 'user' | 'summarization' | 'system';
category: 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general';
relevanceScore: number;
metadata?: Record<string, unknown>;
createdAt: Date;
decayedAt?: Date;
}
export interface InsightFilter {
userId?: string;
category?: string;
source?: string;
minRelevance?: number;
fromDate?: Date;
toDate?: Date;
}
export interface SearchResult {
documentId: string;
content: string;
distance: number;
metadata?: Record<string, unknown>;
}
export interface MemoryConfig {
type: 'pgvector' | 'sqlite-vec' | 'keyword';
storage: StorageAdapter;
embedder?: EmbeddingProvider;
}
export interface EmbeddingProvider {
embed(text: string): Promise<number[]>;
embedBatch(texts: string[]): Promise<number[][]>;
readonly dimensions: number;
}
```
## Three Tiers
### Tier 1: Local (Zero Dependencies)
**Target:** Single user, single machine, no external services
| Component | Backend | Storage |
| --------- | --------------------------------------------- | ------------ |
| Queue | In-process + JSON files in `~/.mosaic/queue/` |
| Storage | SQLite (better-sqlite3) `~/.mosaic/data.db` |
| Memory | Keyword search | SQLite table |
| Vector | None | N/A |
**Dependencies:**
- `better-sqlite3` (bundled)
- No Postgres, No Valkey, No pgvector
**Upgrade path:**
1. Run `mosaic gateway configure` → select "local" tier
2. Gateway starts with SQLite database
3. Optional: run `mosaic gateway upgrade --tier team` to migrate to Postgres
### Tier 2: Team (Postgres + Valkey)
**Target:** Multiple users, shared server, CI/CD environments
| Component | Backend | Storage |
| --------- | -------------- | ------------------------------ |
| Queue | BullMQ | Valkey |
| Storage | Postgres | Shared PG instance |
| Memory | pgvector | Postgres with vector extension |
| Vector | LLM embeddings | Configured provider |
**Dependencies:**
- PostgreSQL 17+ with pgvector extension
- Valkey (Redis-compatible)
- LLM provider for embeddings
**Migration from Local → Team:**
1. `mosaic gateway backup` → creates dump of SQLite database
2. `mosaic gateway upgrade --tier team` → restores to Postgres
3. Queue replays from BullMQ (may need manual reconciliation for in-flight jobs)
4. Memory embeddings regenerated if vector search was new
### Tier 3: Enterprise (Clustered)
**Target:** Large teams, multi-region, high availability
| Component | Backend | Storage |
| --------- | --------------------------- | ----------------------------- |
| Queue | BullMQ cluster | Multiple Valkey nodes |
| Storage | Postgres cluster | Primary + replicas |
| Memory | Dedicated vector DB | Qdrant, Pinecone, or pgvector |
| Vector | Dedicated embedding service | Separate microservice |
## MarkdownDB Integration
For file-based storage, we use [MarkdownDB](https://markdowndb.com) to parse MD files into queryable data.
**What it provides:**
- Parses frontmatter (YAML/JSON/TOML)
- Extracts links, tags, metadata
- Builds index in JSON or SQLite
- Queryable via SQL-like interface
**Usage in Mosaic:**
```typescript
// Local tier with MD files for documents
const storage = createStorageAdapter({
type: 'files',
path: path.join(mosaicHome, 'docs'),
markdowndb: {
parseFrontmatter: true,
extractLinks: true,
indexFile: 'index.json',
},
});
```
## Dream Mode — Memory Consolidation
Automated equivalent to Claude Code's "Dream: Memory Consolidation" cycle
**Trigger:** Every 24 hours (if 5+ sessions active)
**Phases:**
1. **Orient** — What happened, what's the current state
- Scan recent session logs
- Identify active tasks, missions, conversations
- Calculate time window (last 24h)
2. **Gather** — Pull in relevant context
- Load conversations, decisions, agent logs
- Extract key interactions and outcomes
- Identify patterns and learnings
3. **Consolidate** — Summarize and compress
- Generate summary of the last 24h
- Extract key decisions and their rationale
- Identify recurring patterns
- Compress verbose logs into concise insights
4. **Prune** — Archive and cleanup
- Archive raw session files to dated folders
- Delete redundant/temporary data
- Update MEMORY.md with consolidated content
- Update insight relevance scores
**Implementation:**
```typescript
// In @mosaic/dream (new package)
export async function runDreamCycle(config: DreamConfig): Promise<DreamResult> {
const memory = await loadMemoryAdapter(config.storage);
// Orient
const sessions = await memory.getRecentSessions(24 * 60 * 60 * 1000);
if (sessions.length < 5) return { skipped: true, reason: 'insufficient_sessions' };
// Gather
const context = await gatherContext(memory, sessions);
// Consolidate
const consolidated = await consolidateWithLLM(context, config.llm);
// Prune
await pruneArchivedData(memory, config.retention);
// Store consolidated insights
await memory.storeInsights(consolidated.insights);
return {
sessionsProcessed: sessions.length,
insightsCreated: consolidated.insights.length,
bytesPruned: consolidated.bytesRemoved,
};
}
```
---
## Retrofit Plan
### Phase 1: Interface Extraction (2-3 days)
**Goal:** Define interfaces without changing existing behavior
1. Create `packages/queue/src/types.ts` with `QueueAdapter` interface
2. Create `packages/storage/src/types.ts` with `StorageAdapter` interface
3. Create `packages/memory/src/types.ts` with `MemoryAdapter` interface (refactor existing)
4. Add adapter registry pattern to each package
5. No breaking changes — existing code continues to work
### Phase 2: Refactor Existing to Adapters (3-5 days)
**Goal:** Move existing implementations behind adapters
#### 2.1 Queue Refactor
1. Rename `packages/queue/src/queue.ts``packages/queue/src/adapters/bullmq.ts`
2. Create `packages/queue/src/index.ts` to export factory function
3. Factory function reads config, instantiates correct adapter
4. Update gateway imports to use factory
#### 2.2 Storage Refactor
1. Create `packages/storage/` (new package)
2. Move Drizzle logic to `packages/storage/src/adapters/postgres.ts`
3. Create SQLite adapter in `packages/storage/src/adapters/sqlite.ts`
4. Update gateway to use storage factory
5. Deprecate direct `@mosaic/db` imports
#### 2.3 Memory Refactor
1. Extract existing logic to `packages/memory/src/adapters/pgvector.ts`
2. Create keyword adapter in `packages/memory/src/adapters/keyword.ts`
3. Update vector-store.ts to be adapter-agnostic
### Phase 3: Local Tier Implementation (2-3 days)
**Goal:** Zero-dependency baseline
1. Implement `packages/queue/src/adapters/local.ts` (in-process + JSON persistence)
2. Implement `packages/storage/src/adapters/files.ts` (JSON + MD via MarkdownDB)
3. Implement `packages/memory/src/adapters/keyword.ts` (TF-IDF search)
4. Add `packages/dream/` for consolidation cycle
5. Wire up local tier in gateway startup
### Phase 4: Configuration System (1-2 days)
**Goal:** Runtime backend selection
1. Create `packages/config/src/storage.ts` for storage configuration
2. Add `mosaic.config.ts` schema with storage tier settings
3. Update gateway to read config on startup
4. Add `mosaic gateway configure` CLI command
5. Add tier migration commands (`mosaic gateway upgrade`)
### Phase 5: Testing & Documentation (2-3 days)
1. Unit tests for each adapter
2. Integration tests for factory pattern
3. Migration tests (local → team)
4. Update README and architecture docs
5. Add configuration guide
---
## File Changes Summary
### New Files
```
packages/
├── config/
│ └── src/
│ ├── storage.ts # Storage config schema
│ └── index.ts
├── dream/ # NEW: Dream mode consolidation
│ ├── src/
│ │ ├── index.ts
│ │ ├── orient.ts
│ │ ├── gather.ts
│ │ ├── consolidate.ts
│ │ └── prune.ts
│ └── package.json
├── queue/
│ └── src/
│ ├── types.ts # NEW: QueueAdapter interface
│ ├── index.ts # NEW: Factory function
│ └── adapters/
│ ├── bullmq.ts # MOVED from queue.ts
│ └── local.ts # NEW: In-process adapter
├── storage/ # NEW: Storage abstraction
│ ├── src/
│ │ ├── types.ts # StorageAdapter interface
│ │ ├── index.ts # Factory function
│ │ └── adapters/
│ │ ├── postgres.ts # MOVED from @mosaic/db
│ │ ├── sqlite.ts # NEW: SQLite adapter
│ │ └── files.ts # NEW: JSON/MD adapter
│ └── package.json
└── memory/
└── src/
├── types.ts # UPDATED: MemoryAdapter interface
├── index.ts # UPDATED: Factory function
└── adapters/
├── pgvector.ts # EXTRACTED from existing code
├── sqlite-vec.ts # NEW: SQLite with vectors
└── keyword.ts # NEW: TF-IDF search
```
### Modified Files
```
packages/
├── db/ # DEPRECATED: Logic moved to storage adapters
├── queue/
│ └── src/
│ └── queue.ts # → adapters/bullmq.ts
├── memory/
│ ├── src/
│ │ ├── memory.ts # → use factory
│ │ ├── insights.ts # → use factory
│ │ └── preferences.ts # → use factory
│ └── package.json # Remove pgvector from dependencies
└── gateway/
└── src/
├── database/
│ └── database.module.ts # Update to use storage factory
├── memory/
│ └── memory.module.ts # Update to use memory factory
└── queue/
└── queue.module.ts # Update to use queue factory
```
---
## Breaking Changes
1. **`@mosaic/db`** → **`@mosaic/storage`** (with migration guide)
2. Direct `ioredis` imports → Use `@mosaic/queue` factory
3. Direct `pgvector` queries → Use `@mosaic/memory` factory
4. Gateway startup now requires storage config (defaults to local)
## Non-Breaking Migration Path
1. Existing deployments with Postgres/Valkey continue to work (default config)
2. New deployments can choose local tier
3. Migration commands available when ready to upgrade
---
## Success Criteria
- [ ] Local tier runs with zero external dependencies
- [ ] All three tiers (local, team, enterprise) work correctly
- [ ] Factory pattern correctly selects backend at runtime
- [ ] Migration from local → team preserves all data
- [ ] Dream mode consolidates 24h of sessions
- [ ] Documentation covers all three tiers and migration paths
- [ ] All existing tests pass
- [ ] New adapters have >80% coverage