Bug #1 — quick-start wizard let users skip the LLM provider/API key yet reported "Mosaic is ready". provider-setup now requires an interactive provider select + a validated key; quick-start guards the headless path; finalize won't print "Mosaic is ready" without a configured provider; removed references to the non-existent `mosaic configure` command. Bug #2 — the "local" storage tier still opened ioredis/BullMQ connections at bootstrap (ECONNREFUSED, gateway never healthy). Every Redis consumer (queue.service, gc.module, session-gc.service, commands.module, command-executor.service, system-override.service, cron.service, admin-health.controller) is now tier-aware and degrades gracefully on local tier. Standalone/Federated unaffected. Also fixed a pre-existing SystemOverrideService ioredis handle leak (added shutdown hook). Refs #675 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01RMoEx7hfdFGjUiCHuN1RRi
184 lines
6.4 KiB
TypeScript
184 lines
6.4 KiB
TypeScript
import { Inject, Injectable, Logger, Optional, type OnApplicationShutdown } from '@nestjs/common';
|
|
import { createQueue, type QueueHandle } from '@mosaicstack/queue';
|
|
import type { MosaicConfig } from '@mosaicstack/config';
|
|
import { MOSAIC_CONFIG } from '../config/config.module.js';
|
|
|
|
const SESSION_SYSTEM_KEY = (sessionId: string) => `mosaic:session:${sessionId}:system`;
|
|
const SESSION_SYSTEM_FRAGMENTS_KEY = (sessionId: string) =>
|
|
`mosaic:session:${sessionId}:system:fragments`;
|
|
const SYSTEM_OVERRIDE_TTL_SECONDS = 604800; // 7 days
|
|
|
|
interface OverrideFragment {
|
|
text: string;
|
|
addedAt: number;
|
|
}
|
|
|
|
interface LocalOverrideEntry {
|
|
condensed: string;
|
|
fragments: OverrideFragment[];
|
|
}
|
|
|
|
@Injectable()
|
|
export class SystemOverrideService implements OnApplicationShutdown {
|
|
private readonly logger = new Logger(SystemOverrideService.name);
|
|
private readonly handle: QueueHandle | null;
|
|
/**
|
|
* In-memory fallback used on Local tier (no Redis).
|
|
* NOTE: state is ephemeral — lost on restart. For Local single-user installs
|
|
* this is acceptable; system overrides are re-applied at the next session.
|
|
* This is a deliberate behavior change from the Redis-backed 7-day TTL.
|
|
*/
|
|
private readonly localStore = new Map<string, LocalOverrideEntry>();
|
|
|
|
constructor(
|
|
@Optional()
|
|
@Inject(MOSAIC_CONFIG)
|
|
private readonly mosaicConfig: MosaicConfig | null,
|
|
) {
|
|
if (this.mosaicConfig?.queue?.type === 'local') {
|
|
this.handle = null;
|
|
} else {
|
|
this.handle = createQueue();
|
|
}
|
|
}
|
|
|
|
async onApplicationShutdown(): Promise<void> {
|
|
// On non-local tiers the constructor opens an ioredis connection; close it
|
|
// on graceful shutdown to avoid leaking the handle (local tier is null).
|
|
await this.handle?.close().catch(() => {});
|
|
}
|
|
|
|
async set(sessionId: string, override: string): Promise<void> {
|
|
if (!this.handle) {
|
|
// Local tier: in-memory path
|
|
const entry = this.localStore.get(sessionId) ?? { condensed: '', fragments: [] };
|
|
entry.fragments.push({ text: override, addedAt: Date.now() });
|
|
entry.condensed = await this.condenseOverrides(entry.fragments.map((f) => f.text));
|
|
this.localStore.set(sessionId, entry);
|
|
this.logger.debug(
|
|
`Set system override for session ${sessionId} (local, ${entry.fragments.length} fragment(s))`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Load existing fragments
|
|
const existing = await this.handle.redis.get(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId));
|
|
const fragments: OverrideFragment[] = existing
|
|
? (JSON.parse(existing) as OverrideFragment[])
|
|
: [];
|
|
|
|
// Append new fragment
|
|
fragments.push({ text: override, addedAt: Date.now() });
|
|
|
|
// Condense fragments into one coherent override
|
|
const texts = fragments.map((f) => f.text);
|
|
const condensed = await this.condenseOverrides(texts);
|
|
|
|
// Store both: fragments array and condensed result
|
|
const pipeline = this.handle.redis.pipeline();
|
|
pipeline.setex(
|
|
SESSION_SYSTEM_FRAGMENTS_KEY(sessionId),
|
|
SYSTEM_OVERRIDE_TTL_SECONDS,
|
|
JSON.stringify(fragments),
|
|
);
|
|
pipeline.setex(SESSION_SYSTEM_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS, condensed);
|
|
await pipeline.exec();
|
|
|
|
this.logger.debug(
|
|
`Set system override for session ${sessionId} (${fragments.length} fragment(s), TTL=${SYSTEM_OVERRIDE_TTL_SECONDS}s)`,
|
|
);
|
|
}
|
|
|
|
async get(sessionId: string): Promise<string | null> {
|
|
if (!this.handle) {
|
|
return this.localStore.get(sessionId)?.condensed ?? null;
|
|
}
|
|
return this.handle.redis.get(SESSION_SYSTEM_KEY(sessionId));
|
|
}
|
|
|
|
async renew(sessionId: string): Promise<void> {
|
|
if (!this.handle) {
|
|
// Local tier: no TTL to renew; entry persists until restart
|
|
return;
|
|
}
|
|
const pipeline = this.handle.redis.pipeline();
|
|
pipeline.expire(SESSION_SYSTEM_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS);
|
|
pipeline.expire(SESSION_SYSTEM_FRAGMENTS_KEY(sessionId), SYSTEM_OVERRIDE_TTL_SECONDS);
|
|
await pipeline.exec();
|
|
}
|
|
|
|
async clear(sessionId: string): Promise<void> {
|
|
if (!this.handle) {
|
|
this.localStore.delete(sessionId);
|
|
this.logger.debug(`Cleared system override for session ${sessionId} (local)`);
|
|
return;
|
|
}
|
|
await this.handle.redis.del(
|
|
SESSION_SYSTEM_KEY(sessionId),
|
|
SESSION_SYSTEM_FRAGMENTS_KEY(sessionId),
|
|
);
|
|
this.logger.debug(`Cleared system override for session ${sessionId}`);
|
|
}
|
|
|
|
/**
|
|
* Merge an array of override fragments into one coherent string.
|
|
* If only one fragment exists, returns it as-is.
|
|
* For multiple fragments, calls Haiku to produce a merged instruction.
|
|
* Falls back to newline concatenation if the LLM call fails.
|
|
*/
|
|
async condenseOverrides(fragments: string[]): Promise<string> {
|
|
if (fragments.length === 0) return '';
|
|
if (fragments.length === 1) return fragments[0]!;
|
|
|
|
const numbered = fragments.map((f, i) => `${i + 1}. ${f}`).join('\n');
|
|
const prompt =
|
|
`Merge these system prompt instructions into one coherent paragraph. ` +
|
|
`If instructions conflict, favor the most recently added (last in the list). ` +
|
|
`Be concise — output only the merged instruction, nothing else.\n\n` +
|
|
`Instructions (oldest first):\n${numbered}`;
|
|
|
|
const apiKey = process.env['ANTHROPIC_API_KEY'];
|
|
if (!apiKey) {
|
|
this.logger.warn('ANTHROPIC_API_KEY not set — falling back to newline concatenation');
|
|
return fragments.join('\n');
|
|
}
|
|
|
|
try {
|
|
const response = await fetch('https://api.anthropic.com/v1/messages', {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'x-api-key': apiKey,
|
|
'anthropic-version': '2023-06-01',
|
|
},
|
|
body: JSON.stringify({
|
|
model: 'claude-haiku-4-5-20251001',
|
|
max_tokens: 1024,
|
|
messages: [{ role: 'user', content: prompt }],
|
|
}),
|
|
});
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
throw new Error(`Anthropic API error ${response.status}: ${errorText}`);
|
|
}
|
|
|
|
const data = (await response.json()) as {
|
|
content: Array<{ type: string; text: string }>;
|
|
};
|
|
|
|
const textBlock = data.content.find((c) => c.type === 'text');
|
|
if (!textBlock) {
|
|
throw new Error('No text block in Anthropic response');
|
|
}
|
|
|
|
return textBlock.text.trim();
|
|
} catch (err) {
|
|
this.logger.error(
|
|
`Condensation LLM call failed — falling back to newline concatenation: ${String(err)}`,
|
|
);
|
|
return fragments.join('\n');
|
|
}
|
|
}
|
|
}
|