feat(storage): pgvector adapter support gated on tier=federated (FED-M1-03)
Adds pgvector support to the postgres storage adapter without affecting local or standalone tiers: - `StorageConfig` postgres variant gains optional `enableVector: boolean` - `PostgresAdapter.migrate()` runs `CREATE EXTENSION IF NOT EXISTS vector` via `db.execute(sql)` BEFORE migrations when `enableVector === true` (so vector-typed columns are creatable in the same migration pass). Idempotent — safe to re-run on already-installed extension. - `vector` custom type in `@mosaicstack/db/schema` is now exported so downstream packages can declare vector columns in their own schemas. - `DEFAULT_FEDERATED_CONFIG.storage.enableVector = true` so the federated default flows through the adapter. - `detectFromEnv()` restructured: `MOSAIC_STORAGE_TIER` is now checked BEFORE the `DATABASE_URL` guard so `MOSAIC_STORAGE_TIER=federated` alone returns the federated default config instead of silently misrouting to local. Same applies to `=standalone`. With `DATABASE_URL` set, the URL is honored and `enableVector` is preserved on federated. - `detectFromEnv` is now exported for direct test access. Tests: - 4 PostgresAdapter unit tests cover: extension SQL issued when enabled, not issued when disabled or unset, ordering (extension before runMigrations). Assertion uses Drizzle's `toSQL()` with documented fallback for older versions. - 4 detectFromEnv tests cover the four env-var permutations. - 1 federated default constant test. No behavior change for local or standalone tier deployments. Refs #460 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -5,4 +5,5 @@ export {
|
|||||||
DEFAULT_FEDERATED_CONFIG,
|
DEFAULT_FEDERATED_CONFIG,
|
||||||
loadConfig,
|
loadConfig,
|
||||||
validateConfig,
|
validateConfig,
|
||||||
|
detectFromEnv,
|
||||||
} from './mosaic-config.js';
|
} from './mosaic-config.js';
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||||
import {
|
import {
|
||||||
validateConfig,
|
validateConfig,
|
||||||
|
detectFromEnv,
|
||||||
DEFAULT_LOCAL_CONFIG,
|
DEFAULT_LOCAL_CONFIG,
|
||||||
DEFAULT_STANDALONE_CONFIG,
|
DEFAULT_STANDALONE_CONFIG,
|
||||||
DEFAULT_FEDERATED_CONFIG,
|
DEFAULT_FEDERATED_CONFIG,
|
||||||
@@ -106,4 +107,64 @@ describe('DEFAULT_* config constants', () => {
|
|||||||
const url = (DEFAULT_FEDERATED_CONFIG.storage as { url: string }).url;
|
const url = (DEFAULT_FEDERATED_CONFIG.storage as { url: string }).url;
|
||||||
expect(url).toContain('5433');
|
expect(url).toContain('5433');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('DEFAULT_FEDERATED_CONFIG has enableVector=true on storage', () => {
|
||||||
|
const storage = DEFAULT_FEDERATED_CONFIG.storage as {
|
||||||
|
type: string;
|
||||||
|
url: string;
|
||||||
|
enableVector?: boolean;
|
||||||
|
};
|
||||||
|
expect(storage.enableVector).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('detectFromEnv — tier env-var routing', () => {
|
||||||
|
const originalEnv = process.env;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// Work on a fresh copy so individual tests can set/delete keys freely.
|
||||||
|
process.env = { ...originalEnv };
|
||||||
|
delete process.env['MOSAIC_STORAGE_TIER'];
|
||||||
|
delete process.env['DATABASE_URL'];
|
||||||
|
delete process.env['VALKEY_URL'];
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
process.env = originalEnv;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('no env vars → returns local config', () => {
|
||||||
|
const config = detectFromEnv();
|
||||||
|
expect(config.tier).toBe('local');
|
||||||
|
expect(config.storage.type).toBe('pglite');
|
||||||
|
expect(config.memory.type).toBe('keyword');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('MOSAIC_STORAGE_TIER=federated alone → returns federated config with enableVector=true', () => {
|
||||||
|
process.env['MOSAIC_STORAGE_TIER'] = 'federated';
|
||||||
|
const config = detectFromEnv();
|
||||||
|
expect(config.tier).toBe('federated');
|
||||||
|
expect(config.memory.type).toBe('pgvector');
|
||||||
|
const storage = config.storage as { type: string; enableVector?: boolean };
|
||||||
|
expect(storage.enableVector).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('MOSAIC_STORAGE_TIER=federated + DATABASE_URL → uses the URL and still has enableVector=true', () => {
|
||||||
|
process.env['MOSAIC_STORAGE_TIER'] = 'federated';
|
||||||
|
process.env['DATABASE_URL'] = 'postgresql://custom:pass@db.example.com:5432/mydb';
|
||||||
|
const config = detectFromEnv();
|
||||||
|
expect(config.tier).toBe('federated');
|
||||||
|
const storage = config.storage as { type: string; url: string; enableVector?: boolean };
|
||||||
|
expect(storage.url).toBe('postgresql://custom:pass@db.example.com:5432/mydb');
|
||||||
|
expect(storage.enableVector).toBe(true);
|
||||||
|
expect(config.memory.type).toBe('pgvector');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('MOSAIC_STORAGE_TIER=standalone alone → returns standalone-shaped config (not local)', () => {
|
||||||
|
process.env['MOSAIC_STORAGE_TIER'] = 'standalone';
|
||||||
|
const config = detectFromEnv();
|
||||||
|
expect(config.tier).toBe('standalone');
|
||||||
|
expect(config.storage.type).toBe('postgres');
|
||||||
|
expect(config.memory.type).toBe('keyword');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -40,7 +40,11 @@ export const DEFAULT_STANDALONE_CONFIG: MosaicConfig = {
|
|||||||
|
|
||||||
export const DEFAULT_FEDERATED_CONFIG: MosaicConfig = {
|
export const DEFAULT_FEDERATED_CONFIG: MosaicConfig = {
|
||||||
tier: 'federated',
|
tier: 'federated',
|
||||||
storage: { type: 'postgres', url: 'postgresql://mosaic:mosaic@localhost:5433/mosaic' },
|
storage: {
|
||||||
|
type: 'postgres',
|
||||||
|
url: 'postgresql://mosaic:mosaic@localhost:5433/mosaic',
|
||||||
|
enableVector: true,
|
||||||
|
},
|
||||||
queue: { type: 'bullmq' },
|
queue: { type: 'bullmq' },
|
||||||
memory: { type: 'pgvector' },
|
memory: { type: 'pgvector' },
|
||||||
};
|
};
|
||||||
@@ -119,7 +123,49 @@ export function validateConfig(raw: unknown): MosaicConfig {
|
|||||||
/* Loader */
|
/* Loader */
|
||||||
/* ------------------------------------------------------------------ */
|
/* ------------------------------------------------------------------ */
|
||||||
|
|
||||||
function detectFromEnv(): MosaicConfig {
|
export function detectFromEnv(): MosaicConfig {
|
||||||
|
const tier = process.env['MOSAIC_STORAGE_TIER'];
|
||||||
|
|
||||||
|
if (tier === 'federated') {
|
||||||
|
if (process.env['DATABASE_URL']) {
|
||||||
|
return {
|
||||||
|
...DEFAULT_FEDERATED_CONFIG,
|
||||||
|
storage: {
|
||||||
|
type: 'postgres',
|
||||||
|
url: process.env['DATABASE_URL'],
|
||||||
|
enableVector: true,
|
||||||
|
},
|
||||||
|
queue: {
|
||||||
|
type: 'bullmq',
|
||||||
|
url: process.env['VALKEY_URL'],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
// MOSAIC_STORAGE_TIER=federated without DATABASE_URL — use the default
|
||||||
|
// federated config (port 5433, enableVector: true, pgvector memory).
|
||||||
|
return DEFAULT_FEDERATED_CONFIG;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tier === 'standalone') {
|
||||||
|
if (process.env['DATABASE_URL']) {
|
||||||
|
return {
|
||||||
|
...DEFAULT_STANDALONE_CONFIG,
|
||||||
|
storage: {
|
||||||
|
type: 'postgres',
|
||||||
|
url: process.env['DATABASE_URL'],
|
||||||
|
},
|
||||||
|
queue: {
|
||||||
|
type: 'bullmq',
|
||||||
|
url: process.env['VALKEY_URL'],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
// MOSAIC_STORAGE_TIER=standalone without DATABASE_URL — use the default
|
||||||
|
// standalone config instead of silently falling back to local.
|
||||||
|
return DEFAULT_STANDALONE_CONFIG;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Legacy: DATABASE_URL set without MOSAIC_STORAGE_TIER — treat as standalone.
|
||||||
if (process.env['DATABASE_URL']) {
|
if (process.env['DATABASE_URL']) {
|
||||||
return {
|
return {
|
||||||
...DEFAULT_STANDALONE_CONFIG,
|
...DEFAULT_STANDALONE_CONFIG,
|
||||||
@@ -133,6 +179,7 @@ function detectFromEnv(): MosaicConfig {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
return DEFAULT_LOCAL_CONFIG;
|
return DEFAULT_LOCAL_CONFIG;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -372,7 +372,11 @@ export const messages = pgTable(
|
|||||||
|
|
||||||
// ─── pgvector custom type ───────────────────────────────────────────────────
|
// ─── pgvector custom type ───────────────────────────────────────────────────
|
||||||
|
|
||||||
const vector = customType<{ data: number[]; driverParam: string; config: { dimensions: number } }>({
|
export const vector = customType<{
|
||||||
|
data: number[];
|
||||||
|
driverParam: string;
|
||||||
|
config: { dimensions: number };
|
||||||
|
}>({
|
||||||
dataType(config) {
|
dataType(config) {
|
||||||
return `vector(${config?.dimensions ?? 1536})`;
|
return `vector(${config?.dimensions ?? 1536})`;
|
||||||
},
|
},
|
||||||
|
|||||||
107
packages/storage/src/adapters/postgres.spec.ts
Normal file
107
packages/storage/src/adapters/postgres.spec.ts
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
|
import type { DbHandle } from '@mosaicstack/db';
|
||||||
|
|
||||||
|
// Mock @mosaicstack/db before importing the adapter
|
||||||
|
vi.mock('@mosaicstack/db', async (importOriginal) => {
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
const actual = await importOriginal<Record<string, any>>();
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
createDb: vi.fn(),
|
||||||
|
runMigrations: vi.fn().mockResolvedValue(undefined),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
import { createDb, runMigrations } from '@mosaicstack/db';
|
||||||
|
import { PostgresAdapter } from './postgres.js';
|
||||||
|
|
||||||
|
describe('PostgresAdapter — vector extension gating', () => {
|
||||||
|
let mockExecute: ReturnType<typeof vi.fn>;
|
||||||
|
let mockDb: { execute: ReturnType<typeof vi.fn> };
|
||||||
|
let mockHandle: Pick<DbHandle, 'close'> & { db: typeof mockDb };
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
mockExecute = vi.fn().mockResolvedValue(undefined);
|
||||||
|
mockDb = { execute: mockExecute };
|
||||||
|
mockHandle = { db: mockDb, close: vi.fn().mockResolvedValue(undefined) };
|
||||||
|
vi.mocked(createDb).mockReturnValue(mockHandle as unknown as DbHandle);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('calls db.execute with CREATE EXTENSION IF NOT EXISTS vector when enableVector=true', async () => {
|
||||||
|
const adapter = new PostgresAdapter({
|
||||||
|
type: 'postgres',
|
||||||
|
url: 'postgresql://test:test@localhost:5432/test',
|
||||||
|
enableVector: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
await adapter.migrate();
|
||||||
|
|
||||||
|
// Should have called execute
|
||||||
|
expect(mockExecute).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Verify the SQL contains the extension creation statement.
|
||||||
|
// Prefer Drizzle's public toSQL() API; fall back to queryChunks if unavailable.
|
||||||
|
// NOTE: queryChunks is an undocumented Drizzle internal (drizzle-orm ^0.45.x).
|
||||||
|
// toSQL() was not present on the raw sql`` result in this version — if a future
|
||||||
|
// Drizzle upgrade adds it, remove the fallback path and delete this comment.
|
||||||
|
const sqlObj = mockExecute.mock.calls[0]![0] as {
|
||||||
|
toSQL?: () => { sql: string; params: unknown[] };
|
||||||
|
queryChunks?: Array<{ value: string[] }>;
|
||||||
|
};
|
||||||
|
const sqlText = sqlObj.toSQL
|
||||||
|
? sqlObj.toSQL().sql.toLowerCase()
|
||||||
|
: (sqlObj.queryChunks ?? [])
|
||||||
|
.flatMap((chunk) => chunk.value)
|
||||||
|
.join('')
|
||||||
|
.toLowerCase();
|
||||||
|
expect(sqlText).toContain('create extension if not exists vector');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT call db.execute for extension when enableVector is false', async () => {
|
||||||
|
const adapter = new PostgresAdapter({
|
||||||
|
type: 'postgres',
|
||||||
|
url: 'postgresql://test:test@localhost:5432/test',
|
||||||
|
enableVector: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await adapter.migrate();
|
||||||
|
|
||||||
|
expect(mockExecute).not.toHaveBeenCalled();
|
||||||
|
expect(vi.mocked(runMigrations)).toHaveBeenCalledOnce();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT call db.execute for extension when enableVector is unset', async () => {
|
||||||
|
const adapter = new PostgresAdapter({
|
||||||
|
type: 'postgres',
|
||||||
|
url: 'postgresql://test:test@localhost:5432/test',
|
||||||
|
});
|
||||||
|
|
||||||
|
await adapter.migrate();
|
||||||
|
|
||||||
|
expect(mockExecute).not.toHaveBeenCalled();
|
||||||
|
expect(vi.mocked(runMigrations)).toHaveBeenCalledOnce();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('calls runMigrations after the extension is created', async () => {
|
||||||
|
const callOrder: string[] = [];
|
||||||
|
mockExecute.mockImplementation(() => {
|
||||||
|
callOrder.push('execute');
|
||||||
|
return Promise.resolve(undefined);
|
||||||
|
});
|
||||||
|
vi.mocked(runMigrations).mockImplementation(() => {
|
||||||
|
callOrder.push('runMigrations');
|
||||||
|
return Promise.resolve();
|
||||||
|
});
|
||||||
|
|
||||||
|
const adapter = new PostgresAdapter({
|
||||||
|
type: 'postgres',
|
||||||
|
url: 'postgresql://test:test@localhost:5432/test',
|
||||||
|
enableVector: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
await adapter.migrate();
|
||||||
|
|
||||||
|
expect(callOrder).toEqual(['execute', 'runMigrations']);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -66,13 +66,19 @@ export class PostgresAdapter implements StorageAdapter {
|
|||||||
private handle: DbHandle;
|
private handle: DbHandle;
|
||||||
private db: Db;
|
private db: Db;
|
||||||
private url: string;
|
private url: string;
|
||||||
|
private enableVector: boolean;
|
||||||
|
|
||||||
constructor(config: Extract<StorageConfig, { type: 'postgres' }>) {
|
constructor(config: Extract<StorageConfig, { type: 'postgres' }>) {
|
||||||
this.url = config.url;
|
this.url = config.url;
|
||||||
|
this.enableVector = config.enableVector ?? false;
|
||||||
this.handle = createDb(config.url);
|
this.handle = createDb(config.url);
|
||||||
this.db = this.handle.db;
|
this.db = this.handle.db;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async ensureVectorExtension(): Promise<void> {
|
||||||
|
await this.db.execute(sql`CREATE EXTENSION IF NOT EXISTS vector`);
|
||||||
|
}
|
||||||
|
|
||||||
async create<T extends Record<string, unknown>>(
|
async create<T extends Record<string, unknown>>(
|
||||||
collection: string,
|
collection: string,
|
||||||
data: T,
|
data: T,
|
||||||
@@ -149,6 +155,9 @@ export class PostgresAdapter implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async migrate(): Promise<void> {
|
async migrate(): Promise<void> {
|
||||||
|
if (this.enableVector) {
|
||||||
|
await this.ensureVectorExtension();
|
||||||
|
}
|
||||||
await runMigrations(this.url);
|
await runMigrations(this.url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,6 @@ export interface StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export type StorageConfig =
|
export type StorageConfig =
|
||||||
| { type: 'postgres'; url: string }
|
| { type: 'postgres'; url: string; enableVector?: boolean }
|
||||||
| { type: 'pglite'; dataDir?: string }
|
| { type: 'pglite'; dataDir?: string }
|
||||||
| { type: 'files'; dataDir: string; format?: 'json' | 'md' };
|
| { type: 'files'; dataDir: string; format?: 'json' | 'md' };
|
||||||
|
|||||||
Reference in New Issue
Block a user