Merge pull request 'fix: parse VALKEY_URL into RedisOptions for BullMQ — fixes ECONNREFUSED 6379' (#336) from fix/bullmq-valkey-url-port into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Some checks failed
ci/woodpecker/push/ci Pipeline failed
This commit was merged in pull request #336.
This commit is contained in:
@@ -51,16 +51,42 @@ export interface QueueHealthStatus {
|
||||
// Constants
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const QUEUE_SUMMARIZATION = 'mosaic:summarization';
|
||||
export const QUEUE_GC = 'mosaic:gc';
|
||||
export const QUEUE_TIER_MANAGEMENT = 'mosaic:tier-management';
|
||||
export const QUEUE_SUMMARIZATION = 'mosaic-summarization';
|
||||
export const QUEUE_GC = 'mosaic-gc';
|
||||
export const QUEUE_TIER_MANAGEMENT = 'mosaic-tier-management';
|
||||
|
||||
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
|
||||
|
||||
/**
|
||||
* Parse a Redis URL string into a BullMQ-compatible ConnectionOptions object.
|
||||
*
|
||||
* BullMQ v5 does `Object.assign({ port: 6379, host: '127.0.0.1' }, opts)` in
|
||||
* its RedisConnection constructor. If opts is a URL string, Object.assign only
|
||||
* copies character-index properties and the defaults survive — so 6379 wins.
|
||||
* We must parse the URL ourselves and return a plain RedisOptions object.
|
||||
*/
|
||||
function getConnection(): ConnectionOptions {
|
||||
const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
|
||||
// BullMQ ConnectionOptions accepts a URL string (ioredis-compatible)
|
||||
return url as unknown as ConnectionOptions;
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
const opts: ConnectionOptions = {
|
||||
host: parsed.hostname || '127.0.0.1',
|
||||
port: parsed.port ? parseInt(parsed.port, 10) : 6380,
|
||||
};
|
||||
if (parsed.password) {
|
||||
(opts as Record<string, unknown>)['password'] = decodeURIComponent(parsed.password);
|
||||
}
|
||||
if (parsed.pathname && parsed.pathname.length > 1) {
|
||||
const db = parseInt(parsed.pathname.slice(1), 10);
|
||||
if (!isNaN(db)) {
|
||||
(opts as Record<string, unknown>)['db'] = db;
|
||||
}
|
||||
}
|
||||
return opts;
|
||||
} catch {
|
||||
// Fallback: hope the value is already a host string ioredis understands
|
||||
return { host: '127.0.0.1', port: 6380 } as ConnectionOptions;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -9,10 +9,14 @@ const ocRequire = createRequire(import.meta.url);
|
||||
const sdkRoot = path.dirname(ocRequire.resolve('openclaw/dist/plugin-sdk/index.js'));
|
||||
|
||||
// Dynamic imports for runtime SDK functions
|
||||
const { registerAcpRuntimeBackend, unregisterAcpRuntimeBackend } = await import(
|
||||
const { registerAcpRuntimeBackend, unregisterAcpRuntimeBackend } = (await import(
|
||||
`${sdkRoot}/acp-runtime.js`
|
||||
) as {
|
||||
registerAcpRuntimeBackend: (backend: { id: string; runtime: any; healthy: () => boolean }) => void;
|
||||
)) as {
|
||||
registerAcpRuntimeBackend: (backend: {
|
||||
id: string;
|
||||
runtime: any;
|
||||
healthy: () => boolean;
|
||||
}) => void;
|
||||
unregisterAcpRuntimeBackend: (id: string) => void;
|
||||
};
|
||||
|
||||
|
||||
@@ -82,7 +82,15 @@ const MACP_CAPABILITIES: AcpRuntimeCapabilities = {
|
||||
|
||||
const DEFAULT_REPO_ROOT = '~/src/mosaic-stack';
|
||||
const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run';
|
||||
const PI_RUNNER_PATH = path.join(os.homedir(), 'src', 'mosaic-stack', 'tools', 'macp', 'dispatcher', 'pi_runner.ts');
|
||||
const PI_RUNNER_PATH = path.join(
|
||||
os.homedir(),
|
||||
'src',
|
||||
'mosaic-stack',
|
||||
'tools',
|
||||
'macp',
|
||||
'dispatcher',
|
||||
'pi_runner.ts',
|
||||
);
|
||||
|
||||
function expandHome(rawPath: string): string {
|
||||
if (rawPath === '~') {
|
||||
|
||||
Reference in New Issue
Block a user