diff --git a/apps/gateway/src/admin/admin-health.controller.ts b/apps/gateway/src/admin/admin-health.controller.ts index 2331143..99fd93f 100644 --- a/apps/gateway/src/admin/admin-health.controller.ts +++ b/apps/gateway/src/admin/admin-health.controller.ts @@ -1,9 +1,11 @@ -import { Controller, Get, Inject, UseGuards } from '@nestjs/common'; +import { Controller, Get, Inject, Optional, UseGuards } from '@nestjs/common'; import { sql, type Db } from '@mosaicstack/db'; import { createQueue } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; import { DB } from '../database/database.module.js'; import { AgentService } from '../agent/agent.service.js'; import { ProviderService } from '../agent/provider.service.js'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { AdminGuard } from './admin.guard.js'; import type { HealthStatusDto, ServiceStatusDto } from './admin.dto.js'; @@ -14,6 +16,9 @@ export class AdminHealthController { @Inject(DB) private readonly db: Db, @Inject(AgentService) private readonly agentService: AgentService, @Inject(ProviderService) private readonly providerService: ProviderService, + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, ) {} @Get() @@ -55,6 +60,14 @@ export class AdminHealthController { } private async checkCache(): Promise { + // On Local tier there is no Redis. The cache is intentionally absent, which + // is a healthy state for this tier — report 'ok' rather than opening a new + // ioredis connection on every admin health check (which would spam + // ECONNREFUSED and create/destroy a connection per request). latencyMs 0 + // signals "no cache backend to measure" for this tier. + if (this.mosaicConfig?.queue?.type === 'local') { + return { status: 'ok', latencyMs: 0 }; + } const start = Date.now(); const handle = createQueue(); try { diff --git a/apps/gateway/src/commands/command-executor.service.ts b/apps/gateway/src/commands/command-executor.service.ts index 678f0c1..b8c1ce1 100644 --- a/apps/gateway/src/commands/command-executor.service.ts +++ b/apps/gateway/src/commands/command-executor.service.ts @@ -21,7 +21,10 @@ export class CommandExecutorService { @Inject(AgentService) private readonly agentService: AgentService, @Inject(SystemOverrideService) private readonly systemOverride: SystemOverrideService, @Inject(SessionGCService) private readonly sessionGC: SessionGCService, - @Inject(COMMANDS_REDIS) private readonly redis: QueueHandle['redis'], + // On Local tier COMMANDS_REDIS is null — provider login caching is skipped. + @Optional() + @Inject(COMMANDS_REDIS) + private readonly redis: QueueHandle['redis'] | null, @Inject(BRAIN) private readonly brain: Brain, @Optional() @Inject(forwardRef(() => ReloadService)) @@ -403,14 +406,16 @@ export class CommandExecutorService { }; } const pollToken = crypto.randomUUID(); - const key = `mosaic:auth:poll:${pollToken}`; - // Store pending state in Valkey (TTL 5 minutes) - await this.redis.set( - key, - JSON.stringify({ status: 'pending', provider: providerName, userId }), - 'EX', - 300, - ); + const pollKey = `mosaic:auth:poll:${pollToken}`; + if (this.redis) { + // Store pending state in Valkey (TTL 5 minutes) + await this.redis.set( + pollKey, + JSON.stringify({ status: 'pending', provider: providerName, userId }), + 'EX', + 300, + ); + } // In production this would construct an OAuth URL const loginUrl = `${process.env['MOSAIC_BASE_URL'] ?? 'http://localhost:3000'}/auth/provider/${providerName}?token=${pollToken}`; return { diff --git a/apps/gateway/src/commands/commands.module.ts b/apps/gateway/src/commands/commands.module.ts index 1c3a82c..1d38faa 100644 --- a/apps/gateway/src/commands/commands.module.ts +++ b/apps/gateway/src/commands/commands.module.ts @@ -1,5 +1,7 @@ -import { forwardRef, Inject, Module, type OnApplicationShutdown } from '@nestjs/common'; +import { forwardRef, Inject, Module, Optional, type OnApplicationShutdown } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { ChatModule } from '../chat/chat.module.js'; import { GCModule } from '../gc/gc.module.js'; import { ReloadModule } from '../reload/reload.module.js'; @@ -14,13 +16,17 @@ const COMMANDS_QUEUE_HANDLE = 'COMMANDS_QUEUE_HANDLE'; providers: [ { provide: COMMANDS_QUEUE_HANDLE, - useFactory: (): QueueHandle => { + useFactory: (config: MosaicConfig | null): QueueHandle | null => { + // On Local tier there is no Redis — skip the ioredis connection. + // CommandExecutorService falls back to no-cache for /provider login on local. + if (config?.queue?.type === 'local') return null; return createQueue(); }, + inject: [MOSAIC_CONFIG], }, { provide: COMMANDS_REDIS, - useFactory: (handle: QueueHandle) => handle.redis, + useFactory: (handle: QueueHandle | null) => handle?.redis ?? null, inject: [COMMANDS_QUEUE_HANDLE], }, CommandRegistryService, @@ -29,9 +35,13 @@ const COMMANDS_QUEUE_HANDLE = 'COMMANDS_QUEUE_HANDLE'; exports: [CommandRegistryService, CommandExecutorService], }) export class CommandsModule implements OnApplicationShutdown { - constructor(@Inject(COMMANDS_QUEUE_HANDLE) private readonly handle: QueueHandle) {} + constructor( + @Optional() + @Inject(COMMANDS_QUEUE_HANDLE) + private readonly handle: QueueHandle | null, + ) {} async onApplicationShutdown(): Promise { - await this.handle.close().catch(() => {}); + await this.handle?.close().catch(() => {}); } } diff --git a/apps/gateway/src/gc/gc.module.ts b/apps/gateway/src/gc/gc.module.ts index 1f426d1..bacc058 100644 --- a/apps/gateway/src/gc/gc.module.ts +++ b/apps/gateway/src/gc/gc.module.ts @@ -1,5 +1,7 @@ -import { Module, type OnApplicationShutdown, Inject } from '@nestjs/common'; +import { Module, type OnApplicationShutdown, Inject, Optional } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import { SessionGCService } from './session-gc.service.js'; import { REDIS } from './gc.tokens.js'; @@ -9,13 +11,17 @@ const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE'; providers: [ { provide: GC_QUEUE_HANDLE, - useFactory: (): QueueHandle => { + useFactory: (config: MosaicConfig | null): QueueHandle | null => { + // On Local tier there is no Redis — skip the ioredis connection entirely. + // The Valkey GC sweep is a no-op on Local (no session keys stored there). + if (config?.queue?.type === 'local') return null; return createQueue(); }, + inject: [MOSAIC_CONFIG], }, { provide: REDIS, - useFactory: (handle: QueueHandle) => handle.redis, + useFactory: (handle: QueueHandle | null) => handle?.redis ?? null, inject: [GC_QUEUE_HANDLE], }, SessionGCService, @@ -23,9 +29,13 @@ const GC_QUEUE_HANDLE = 'GC_QUEUE_HANDLE'; exports: [SessionGCService], }) export class GCModule implements OnApplicationShutdown { - constructor(@Inject(GC_QUEUE_HANDLE) private readonly handle: QueueHandle) {} + constructor( + @Optional() + @Inject(GC_QUEUE_HANDLE) + private readonly handle: QueueHandle | null, + ) {} async onApplicationShutdown(): Promise { - await this.handle.close().catch(() => {}); + await this.handle?.close().catch(() => {}); } } diff --git a/apps/gateway/src/gc/session-gc.service.ts b/apps/gateway/src/gc/session-gc.service.ts index 18d1e39..00282c1 100644 --- a/apps/gateway/src/gc/session-gc.service.ts +++ b/apps/gateway/src/gc/session-gc.service.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Logger, type OnModuleInit } from '@nestjs/common'; +import { Inject, Injectable, Logger, Optional, type OnModuleInit } from '@nestjs/common'; import type { QueueHandle } from '@mosaicstack/queue'; import type { LogService } from '@mosaicstack/log'; import { LOG_SERVICE } from '../log/log.tokens.js'; @@ -32,11 +32,21 @@ export class SessionGCService implements OnModuleInit { private readonly logger = new Logger(SessionGCService.name); constructor( - @Inject(REDIS) private readonly redis: QueueHandle['redis'], + // On Local tier there is no Redis — the GC module provides null for this token. + // NOTE: if a future feature stores Redis-backed state on Local tier, this guard + // would silently skip GC for those keys. Revisit when that happens. + @Optional() + @Inject(REDIS) + private readonly redis: QueueHandle['redis'] | null, @Inject(LOG_SERVICE) private readonly logService: LogService, ) {} onModuleInit(): void { + if (!this.redis) { + // Local tier: no Valkey — skip cold-start GC entirely (correct no-op). + this.logger.log('SessionGCService: Valkey GC skipped on local tier (no Redis configured)'); + return; + } // Fire-and-forget: run full GC asynchronously so it does not block the // NestJS bootstrap chain. Cold-start GC typically takes 100–500 ms // depending on Valkey key count; deferring it removes that latency from @@ -60,8 +70,10 @@ export class SessionGCService implements OnModuleInit { * Scan Valkey for all keys matching a pattern using SCAN (non-blocking). * KEYS is avoided because it blocks the Valkey event loop for the full scan * duration, which can cause latency spikes under production key volumes. + * Returns empty array when Redis is not available (Local tier). */ private async scanKeys(pattern: string): Promise { + if (!this.redis) return []; const collected: string[] = []; let cursor = '0'; do { @@ -78,12 +90,14 @@ export class SessionGCService implements OnModuleInit { async collect(sessionId: string): Promise { const result: GCResult = { sessionId, cleaned: {} }; - // 1. Valkey: delete all session-scoped keys - const pattern = `mosaic:session:${sessionId}:*`; - const valkeyKeys = await this.scanKeys(pattern); - if (valkeyKeys.length > 0) { - await this.redis.del(...valkeyKeys); - result.cleaned.valkeyKeys = valkeyKeys.length; + // 1. Valkey: delete all session-scoped keys (skipped on Local tier) + if (this.redis) { + const pattern = `mosaic:session:${sessionId}:*`; + const valkeyKeys = await this.scanKeys(pattern); + if (valkeyKeys.length > 0) { + await this.redis.del(...valkeyKeys); + result.cleaned.valkeyKeys = valkeyKeys.length; + } } // 2. PG: demote hot-tier agent_logs for this session to warm @@ -106,6 +120,7 @@ export class SessionGCService implements OnModuleInit { const cleaned: GCResult[] = []; // 1. Find all session-scoped Valkey keys (non-blocking SCAN) + // Returns empty on Local tier — no Valkey session keys exist there. const allSessionKeys = await this.scanKeys('mosaic:session:*'); // Extract unique session IDs from keys @@ -136,11 +151,15 @@ export class SessionGCService implements OnModuleInit { */ async fullCollect(): Promise { const start = Date.now(); + let valkeyKeysCount = 0; - // 1. Valkey: delete ALL session-scoped keys (non-blocking SCAN) - const sessionKeys = await this.scanKeys('mosaic:session:*'); - if (sessionKeys.length > 0) { - await this.redis.del(...sessionKeys); + if (this.redis) { + // 1. Valkey: delete ALL session-scoped keys (non-blocking SCAN) + const sessionKeys = await this.scanKeys('mosaic:session:*'); + if (sessionKeys.length > 0) { + await this.redis.del(...sessionKeys); + } + valkeyKeysCount = sessionKeys.length; } // 2. NOTE: channel keys are NOT collected on cold start @@ -154,7 +173,7 @@ export class SessionGCService implements OnModuleInit { const jobsPurged = 0; return { - valkeyKeys: sessionKeys.length, + valkeyKeys: valkeyKeysCount, logsDemoted, jobsPurged, tempFilesRemoved: 0, diff --git a/apps/gateway/src/log/cron.service.ts b/apps/gateway/src/log/cron.service.ts index aa9b82f..4b5ccb8 100644 --- a/apps/gateway/src/log/cron.service.ts +++ b/apps/gateway/src/log/cron.service.ts @@ -19,7 +19,7 @@ import type { MosaicJobData } from '../queue/queue.service.js'; @Injectable() export class CronService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CronService.name); - private readonly registeredWorkers: Worker[] = []; + private readonly registeredWorkers: Array> = []; constructor( @Inject(SummarizationService) private readonly summarization: SummarizationService, @@ -28,6 +28,16 @@ export class CronService implements OnModuleInit, OnModuleDestroy { ) {} async onModuleInit(): Promise { + // On Local tier BullMQ is disabled — skip all job scheduling. + // NOTE: this means summarization, tier management, and Valkey GC jobs do not + // run on Local installs. For a single-user local install this is acceptable. + // If periodic background work is needed on Local in the future, add a + // setInterval-based scheduler here. + if (!this.queueService.isEnabled()) { + this.logger.log('CronService: BullMQ disabled on local tier — no jobs will be scheduled'); + return; + } + const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am @@ -42,7 +52,7 @@ export class CronService implements OnModuleInit, OnModuleDestroy { const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => { await this.summarization.runSummarization(); }); - this.registeredWorkers.push(summarizationWorker); + if (summarizationWorker) this.registeredWorkers.push(summarizationWorker); // M6-005: Tier management repeatable job await this.queueService.addRepeatableJob( @@ -54,14 +64,14 @@ export class CronService implements OnModuleInit, OnModuleDestroy { const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => { await this.summarization.runTierManagement(); }); - this.registeredWorkers.push(tierWorker); + if (tierWorker) this.registeredWorkers.push(tierWorker); // M6-004: GC repeatable job await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule); const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => { await this.sessionGC.sweepOrphans(); }); - this.registeredWorkers.push(gcWorker); + if (gcWorker) this.registeredWorkers.push(gcWorker); this.logger.log( `BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`, diff --git a/apps/gateway/src/preferences/system-override.service.ts b/apps/gateway/src/preferences/system-override.service.ts index 5fa48da..e35ed63 100644 --- a/apps/gateway/src/preferences/system-override.service.ts +++ b/apps/gateway/src/preferences/system-override.service.ts @@ -1,5 +1,7 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger, Optional, type OnApplicationShutdown } from '@nestjs/common'; import { createQueue, type QueueHandle } from '@mosaicstack/queue'; +import type { MosaicConfig } from '@mosaicstack/config'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; const SESSION_SYSTEM_KEY = (sessionId: string) => `mosaic:session:${sessionId}:system`; const SESSION_SYSTEM_FRAGMENTS_KEY = (sessionId: string) => @@ -11,16 +13,54 @@ interface OverrideFragment { addedAt: number; } -@Injectable() -export class SystemOverrideService { - private readonly logger = new Logger(SystemOverrideService.name); - private readonly handle: QueueHandle; +interface LocalOverrideEntry { + condensed: string; + fragments: OverrideFragment[]; +} - constructor() { - this.handle = createQueue(); +@Injectable() +export class SystemOverrideService implements OnApplicationShutdown { + private readonly logger = new Logger(SystemOverrideService.name); + private readonly handle: QueueHandle | null; + /** + * In-memory fallback used on Local tier (no Redis). + * NOTE: state is ephemeral — lost on restart. For Local single-user installs + * this is acceptable; system overrides are re-applied at the next session. + * This is a deliberate behavior change from the Redis-backed 7-day TTL. + */ + private readonly localStore = new Map(); + + constructor( + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, + ) { + if (this.mosaicConfig?.queue?.type === 'local') { + this.handle = null; + } else { + this.handle = createQueue(); + } + } + + async onApplicationShutdown(): Promise { + // On non-local tiers the constructor opens an ioredis connection; close it + // on graceful shutdown to avoid leaking the handle (local tier is null). + await this.handle?.close().catch(() => {}); } async set(sessionId: string, override: string): Promise { + if (!this.handle) { + // Local tier: in-memory path + const entry = this.localStore.get(sessionId) ?? { condensed: '', fragments: [] }; + entry.fragments.push({ text: override, addedAt: Date.now() }); + entry.condensed = await this.condenseOverrides(entry.fragments.map((f) => f.text)); + this.localStore.set(sessionId, entry); + this.logger.debug( + `Set system override for session ${sessionId} (local, ${entry.fragments.length} fragment(s))`, + ); + return; + } + // Load existing fragments const existing = await this.handle.redis.get(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId)); const fragments: OverrideFragment[] = existing @@ -50,10 +90,17 @@ export class SystemOverrideService { } async get(sessionId: string): Promise { + if (!this.handle) { + return this.localStore.get(sessionId)?.condensed ?? null; + } return this.handle.redis.get(SESSION_SYSTEM_KEY(sessionId)); } async renew(sessionId: string): Promise { + if (!this.handle) { + // Local tier: no TTL to renew; entry persists until restart + return; + } const pipeline = this.handle.redis.pipeline(); pipeline.expire(SESSION_SYSTEM_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS); pipeline.expire(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS); @@ -61,6 +108,11 @@ export class SystemOverrideService { } async clear(sessionId: string): Promise { + if (!this.handle) { + this.localStore.delete(sessionId); + this.logger.debug(`Cleared system override for session ${sessionId} (local)`); + return; + } await this.handle.redis.del( SESSION_SYSTEM_KEY(sessionId), SESSION_SYSTEM_FRAGMENTS_KEY(sessionId), diff --git a/apps/gateway/src/queue/queue.service.ts b/apps/gateway/src/queue/queue.service.ts index a50a773..6e84340 100644 --- a/apps/gateway/src/queue/queue.service.ts +++ b/apps/gateway/src/queue/queue.service.ts @@ -8,7 +8,9 @@ import { } from '@nestjs/common'; import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq'; import type { LogService } from '@mosaicstack/log'; +import type { MosaicConfig } from '@mosaicstack/config'; import { LOG_SERVICE } from '../log/log.tokens.js'; +import { MOSAIC_CONFIG } from '../config/config.module.js'; import type { JobDto, JobStatus } from './queue-admin.dto.js'; // --------------------------------------------------------------------------- @@ -108,21 +110,42 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { private readonly connection: ConnectionOptions; private readonly queues = new Map>(); private readonly workers = new Map>(); + /** False on Local tier — BullMQ/Redis operations become no-ops. */ + private readonly enabled: boolean; constructor( @Optional() @Inject(LOG_SERVICE) private readonly logService: LogService | null, + @Optional() + @Inject(MOSAIC_CONFIG) + private readonly mosaicConfig: MosaicConfig | null, ) { - this.connection = getConnection(); + this.enabled = this.mosaicConfig?.queue?.type !== 'local'; + this.connection = this.enabled + ? getConnection() + : ({ host: '127.0.0.1', port: 6380 } as ConnectionOptions); + } + + /** Returns true when BullMQ/Redis is active (Standalone and Federated tiers). */ + isEnabled(): boolean { + return this.enabled; } onModuleInit(): void { - this.logger.log('QueueService initialised (BullMQ)'); + if (this.enabled) { + this.logger.log('QueueService initialised (BullMQ)'); + } else { + this.logger.log( + 'QueueService: BullMQ disabled for local tier — no Redis connections will be opened', + ); + } } async onModuleDestroy(): Promise { - await this.closeAll(); + if (this.enabled) { + await this.closeAll(); + } } // ------------------------------------------------------------------------- @@ -131,8 +154,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Get or create a BullMQ Queue for the given queue name. + * Returns null on Local tier where BullMQ is disabled. */ - getQueue(name: string): Queue { + getQueue(name: string): Queue | null { + if (!this.enabled) return null; let queue = this.queues.get(name) as Queue | undefined; if (!queue) { queue = new Queue(name, { connection: this.connection }); @@ -144,6 +169,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Add a BullMQ repeatable job (cron-style). * Uses `jobId` as a deterministic key so duplicate registrations are idempotent. + * No-op on Local tier. */ async addRepeatableJob( queueName: string, @@ -151,7 +177,13 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { data: T, cronExpression: string, ): Promise { - const queue = this.getQueue(queueName); + if (!this.enabled) { + this.logger.debug( + `Skipping repeatable job "${jobName}" on "${queueName}" (local tier — BullMQ disabled)`, + ); + return; + } + const queue = this.getQueue(queueName)!; // eslint-disable-next-line @typescript-eslint/no-explicit-any await (queue as Queue).add(jobName, data, { repeat: { pattern: cronExpression }, @@ -165,8 +197,18 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Register a Worker for the given queue name with error handling and * exponential backoff. + * Returns null on Local tier where BullMQ is disabled. */ - registerWorker(queueName: string, handler: JobHandler): Worker { + registerWorker( + queueName: string, + handler: JobHandler, + ): Worker | null { + if (!this.enabled) { + this.logger.debug( + `Skipping worker registration for "${queueName}" (local tier — BullMQ disabled)`, + ); + return null; + } const worker = new Worker( queueName, async (job) => { @@ -223,8 +265,12 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * Return queue health statistics for all managed queues. + * Returns an empty healthy result on Local tier. */ async getHealthStatus(): Promise { + if (!this.enabled) { + return { queues: {}, healthy: true }; + } const queues: QueueHealthStatus['queues'] = {}; let healthy = true; @@ -255,8 +301,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { /** * List jobs across all managed queues, optionally filtered by status. * BullMQ jobs are fetched by state type from each queue. + * Returns empty array on Local tier. */ async listJobs(status?: JobStatus): Promise { + if (!this.enabled) return []; const jobs: JobDto[] = []; const states: JobStatus[] = status ? [status] @@ -283,8 +331,10 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Retry a specific failed job by its BullMQ job ID (format: "queueName:id"). * The caller passes "__" as the composite ID because BullMQ * job IDs are not globally unique — they are scoped to their queue. + * Returns an error on Local tier. */ async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const sep = compositeId.lastIndexOf('__'); if (sep === -1) { return { ok: false, message: 'Invalid job id format. Expected "__".' }; @@ -316,6 +366,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Pause a queue by name. */ async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.pause(); @@ -327,6 +378,7 @@ export class QueueService implements OnModuleInit, OnModuleDestroy { * Resume a paused queue by name. */ async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> { + if (!this.enabled) return { ok: false, message: 'BullMQ is disabled on local tier.' }; const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.resume(); diff --git a/packages/mosaic/__tests__/integration/unified-wizard.test.ts b/packages/mosaic/__tests__/integration/unified-wizard.test.ts index 169f945..b755625 100644 --- a/packages/mosaic/__tests__/integration/unified-wizard.test.ts +++ b/packages/mosaic/__tests__/integration/unified-wizard.test.ts @@ -69,6 +69,8 @@ describe('Unified wizard (runWizard with default skipGateway)', () => { const prompter = new HeadlessPrompter({ 'Installation mode': 'quick', + 'Select your LLM provider': 'anthropic', + 'Anthropic API key': 'sk-ant-api03-test', 'What name should agents use?': 'TestBot', 'Communication style': 'direct', 'Your name': 'Tester', @@ -103,6 +105,8 @@ describe('Unified wizard (runWizard with default skipGateway)', () => { const prompter = new HeadlessPrompter({ 'Installation mode': 'quick', + 'Select your LLM provider': 'anthropic', + 'Anthropic API key': 'sk-ant-api03-test', 'What name should agents use?': 'TestBot', 'Communication style': 'direct', 'Your name': 'Tester', @@ -125,6 +129,8 @@ describe('Unified wizard (runWizard with default skipGateway)', () => { it('respects skipGateway: true', async () => { const prompter = new HeadlessPrompter({ 'Installation mode': 'quick', + 'Select your LLM provider': 'anthropic', + 'Anthropic API key': 'sk-ant-api03-test', 'What name should agents use?': 'TestBot', 'Communication style': 'direct', 'Your name': 'Tester', diff --git a/packages/mosaic/src/errors.ts b/packages/mosaic/src/errors.ts index d73f9ee..522be12 100644 --- a/packages/mosaic/src/errors.ts +++ b/packages/mosaic/src/errors.ts @@ -1,7 +1,7 @@ export class WizardCancelledError extends Error { override name = 'WizardCancelledError'; - constructor() { - super('Wizard cancelled by user'); + constructor(message = 'Wizard cancelled by user') { + super(message); } } diff --git a/packages/mosaic/src/stages/finalize.ts b/packages/mosaic/src/stages/finalize.ts index 4835f6e..01b9575 100644 --- a/packages/mosaic/src/stages/finalize.ts +++ b/packages/mosaic/src/stages/finalize.ts @@ -207,9 +207,16 @@ export async function finalizeStage( : 'none selected' : `install failed — ${skillsResult.failureReason ?? 'unknown error'}`; + const providerConfigured = + state.providerType && state.providerType !== 'none' && state.providerKey; + const providerSummary = providerConfigured + ? `Provider: ${state.providerType} (configured)` + : 'Provider: NONE — agent has no brain'; + const summary: string[] = [ `Agent: ${state.soul.agentName ?? 'Assistant'}`, `Style: ${state.soul.communicationStyle ?? 'direct'}`, + providerSummary, `Runtimes: ${state.runtimes.detected.join(', ') || 'none detected'}`, `Skills: ${skillsSummary}`, `Config: ${state.mosaicHome}`, @@ -239,5 +246,12 @@ export async function finalizeStage( p.note(nextSteps.map((s, i) => `${(i + 1).toString()}. ${s}`).join('\n'), 'Next Steps'); - p.outro('Mosaic is ready.'); + if (!providerConfigured) { + p.warn( + 'Installation complete, but no LLM provider is configured. ' + + 'Run `mosaic wizard` or `mosaic gateway install` to add an API key before using the agent.', + ); + } else { + p.outro('Mosaic is ready.'); + } } diff --git a/packages/mosaic/src/stages/provider-setup.spec.ts b/packages/mosaic/src/stages/provider-setup.spec.ts index 07ddea0..58bb5df 100644 --- a/packages/mosaic/src/stages/provider-setup.spec.ts +++ b/packages/mosaic/src/stages/provider-setup.spec.ts @@ -78,7 +78,7 @@ describe('providerSetupStage', () => { expect(state.providerType).toBe('none'); }); - it('prompts for key in interactive mode', async () => { + it('prompts for provider then key in interactive mode', async () => { delete process.env['MOSAIC_ASSUME_YES']; // Simulate a TTY const origIsTTY = process.stdin.isTTY; @@ -86,11 +86,13 @@ describe('providerSetupStage', () => { const state = makeState(); const p = buildPrompter({ + select: vi.fn().mockResolvedValue('anthropic'), text: vi.fn().mockResolvedValue('sk-ant-api03-interactive'), }); await providerSetupStage(p, state); + expect(p.select).toHaveBeenCalled(); expect(p.text).toHaveBeenCalled(); expect(state.providerKey).toBe('sk-ant-api03-interactive'); expect(state.providerType).toBe('anthropic'); @@ -98,20 +100,57 @@ describe('providerSetupStage', () => { Object.defineProperty(process.stdin, 'isTTY', { value: origIsTTY, configurable: true }); }); - it('handles empty key in interactive mode', async () => { + it('rejects empty and mismatched keys via the validate callback (Anthropic)', async () => { delete process.env['MOSAIC_ASSUME_YES']; const origIsTTY = process.stdin.isTTY; Object.defineProperty(process.stdin, 'isTTY', { value: true, configurable: true }); + let capturedValidate: ((v: string) => string | void) | undefined; const state = makeState(); const p = buildPrompter({ - text: vi.fn().mockResolvedValue(''), + select: vi.fn().mockResolvedValue('anthropic'), + text: vi + .fn() + .mockImplementation(async (opts: { validate?: (v: string) => string | void }) => { + capturedValidate = opts.validate; + return 'sk-ant-api03-ok'; + }), }); await providerSetupStage(p, state); - expect(state.providerType).toBe('none'); - expect(state.providerKey).toBeUndefined(); + expect(capturedValidate).toBeDefined(); + expect(capturedValidate?.('')).toBe('API key is required'); + expect(capturedValidate?.(' ')).toBe('API key is required'); + expect(capturedValidate?.('not-a-key')).toBe('Anthropic keys start with sk-ant-'); + expect(capturedValidate?.('sk-ant-valid')).toBeUndefined(); + expect(state.providerType).toBe('anthropic'); + + Object.defineProperty(process.stdin, 'isTTY', { value: origIsTTY, configurable: true }); + }); + + it('rejects an Anthropic key when OpenAI is selected', async () => { + delete process.env['MOSAIC_ASSUME_YES']; + const origIsTTY = process.stdin.isTTY; + Object.defineProperty(process.stdin, 'isTTY', { value: true, configurable: true }); + + let capturedValidate: ((v: string) => string | void) | undefined; + const state = makeState(); + const p = buildPrompter({ + select: vi.fn().mockResolvedValue('openai'), + text: vi + .fn() + .mockImplementation(async (opts: { validate?: (v: string) => string | void }) => { + capturedValidate = opts.validate; + return 'sk-proj-ok'; + }), + }); + + await providerSetupStage(p, state); + + expect(capturedValidate?.('sk-ant-api03-xyz')).toBe('OpenAI keys start with sk- (not sk-ant-)'); + expect(capturedValidate?.('sk-proj-xyz')).toBeUndefined(); + expect(state.providerType).toBe('openai'); Object.defineProperty(process.stdin, 'isTTY', { value: origIsTTY, configurable: true }); }); diff --git a/packages/mosaic/src/stages/provider-setup.ts b/packages/mosaic/src/stages/provider-setup.ts index 7917935..10b557f 100644 --- a/packages/mosaic/src/stages/provider-setup.ts +++ b/packages/mosaic/src/stages/provider-setup.ts @@ -1,12 +1,13 @@ import type { WizardPrompter } from '../prompter/interface.js'; import type { WizardState } from '../types.js'; -import { detectProviderType } from '../constants.js'; +import type { ProviderType } from '../types.js'; /** - * Provider setup stage — collects the user's LLM API key and detects the + * Provider setup stage — collects the user's LLM API key and validates the * provider type from the key prefix. * * In headless mode, reads from `MOSAIC_ANTHROPIC_API_KEY` or `MOSAIC_OPENAI_API_KEY`. + * Interactive mode requires the user to select a provider and enter a valid key. */ export async function providerSetupStage(p: WizardPrompter, state: WizardState): Promise { const isHeadless = process.env['MOSAIC_ASSUME_YES'] === '1' || !process.stdin.isTTY; @@ -16,39 +17,57 @@ export async function providerSetupStage(p: WizardPrompter, state: WizardState): const openaiKey = process.env['MOSAIC_OPENAI_API_KEY'] ?? ''; const key = anthropicKey || openaiKey; state.providerKey = key || undefined; - state.providerType = detectProviderType(key); + if (anthropicKey) { + state.providerType = 'anthropic'; + } else if (openaiKey) { + state.providerType = 'openai'; + } else { + state.providerType = 'none'; + p.warn( + 'No API key found (MOSAIC_ANTHROPIC_API_KEY / MOSAIC_OPENAI_API_KEY). ' + + 'Run `mosaic gateway install` to configure a key before using the agent.', + ); + } return; } p.separator(); p.note( 'Configure your LLM provider so the agent has a brain.\n' + - 'Anthropic (Claude) and OpenAI are supported.\n' + - 'You can skip this and add a key later via `mosaic configure`.', + 'Anthropic (Claude) and OpenAI are supported. You will need an API key to continue.', 'LLM Provider', ); - const key = await p.text({ - message: 'API key (paste your Anthropic or OpenAI key, or press Enter to skip)', - defaultValue: '', - placeholder: 'sk-ant-api03-... or sk-...', + const providerType = await p.select({ + message: 'Select your LLM provider', + options: [ + { value: 'anthropic', label: 'Anthropic (Claude)', hint: 'Keys start with sk-ant-' }, + { value: 'openai', label: 'OpenAI', hint: 'Keys start with sk-' }, + ], + initialValue: 'anthropic', }); - if (key) { - const provider = detectProviderType(key); - state.providerKey = key; - state.providerType = provider; + const key = await p.text({ + message: providerType === 'anthropic' ? 'Anthropic API key' : 'OpenAI API key', + placeholder: providerType === 'anthropic' ? 'sk-ant-api03-...' : 'sk-...', + validate: (value: string): string | void => { + if (!value || value.trim().length === 0) { + return 'API key is required'; + } + const trimmed = value.trim(); + if (providerType === 'anthropic' && !trimmed.startsWith('sk-ant-')) { + return 'Anthropic keys start with sk-ant-'; + } + if ( + providerType === 'openai' && + (!trimmed.startsWith('sk-') || trimmed.startsWith('sk-ant-')) + ) { + return 'OpenAI keys start with sk- (not sk-ant-)'; + } + }, + }); - if (provider === 'anthropic') { - p.log('Detected provider: Anthropic (Claude)'); - } else if (provider === 'openai') { - p.log('Detected provider: OpenAI'); - } else { - p.log('Provider auto-detection failed. Key will be stored as ANTHROPIC_API_KEY.'); - state.providerType = 'anthropic'; - } - } else { - state.providerType = 'none'; - p.log('No API key provided. You can add one later with `mosaic configure`.'); - } + state.providerKey = key.trim(); + state.providerType = providerType; + p.log(`Provider configured: ${providerType === 'anthropic' ? 'Anthropic (Claude)' : 'OpenAI'}`); } diff --git a/packages/mosaic/src/stages/quick-start.ts b/packages/mosaic/src/stages/quick-start.ts index b97d1a4..f3d6e15 100644 --- a/packages/mosaic/src/stages/quick-start.ts +++ b/packages/mosaic/src/stages/quick-start.ts @@ -2,6 +2,7 @@ import type { WizardPrompter } from '../prompter/interface.js'; import type { ConfigService } from '../config/config-service.js'; import type { WizardState } from '../types.js'; import { DEFAULTS } from '../constants.js'; +import { WizardCancelledError } from '../errors.js'; import { providerSetupStage } from './provider-setup.js'; import { runtimeSetupStage } from './runtime-setup.js'; import { hooksPreviewStage } from './hooks-preview.js'; @@ -38,6 +39,25 @@ export async function quickStartPath( // 1. Provider setup (first question) await providerSetupStage(prompter, state); + // Belt-and-suspenders guard: ensure a provider key was set before proceeding. + // The interactive path in providerSetupStage always requires a key, so this + // guard is effectively unreachable interactively. The headless path may + // produce providerType='none' when no env var is present: there we warn (the + // operator can configure a key later via `mosaic gateway install`) and let + // the scripted install continue — finalize.ts will NOT print "Mosaic is + // ready" without a configured provider, so no false-green is possible. + if (state.providerType === 'none' || !state.providerKey) { + const headlessRun = process.env['MOSAIC_ASSUME_YES'] === '1' || !process.stdin.isTTY; + if (!headlessRun) { + prompter.warn( + 'A provider API key is required to continue. ' + + 'Set MOSAIC_ANTHROPIC_API_KEY or MOSAIC_OPENAI_API_KEY and run the wizard again, ' + + 'or run `mosaic gateway install` to configure one after installation.', + ); + throw new WizardCancelledError('No LLM provider configured'); + } + } + // Apply sensible defaults for everything else state.soul.agentName ??= 'Mosaic'; state.soul.roleDescription ??= DEFAULTS.roleDescription;