feat(routing): implement routing decision pipeline — M4-006 (#318)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #318.
This commit is contained in:
216
apps/gateway/src/agent/routing/routing-engine.service.ts
Normal file
216
apps/gateway/src/agent/routing/routing-engine.service.ts
Normal file
@@ -0,0 +1,216 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { routingRules, type Db, and, asc, eq, or } from '@mosaic/db';
|
||||
import { DB } from '../../database/database.module.js';
|
||||
import { ProviderService } from '../provider.service.js';
|
||||
import { classifyTask } from './task-classifier.js';
|
||||
import type {
|
||||
RoutingCondition,
|
||||
RoutingRule,
|
||||
RoutingDecision,
|
||||
TaskClassification,
|
||||
} from './routing.types.js';
|
||||
|
||||
// ─── Injection tokens ────────────────────────────────────────────────────────
|
||||
|
||||
export const PROVIDER_SERVICE = Symbol('ProviderService');
|
||||
|
||||
// ─── Fallback chain ──────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Ordered fallback providers tried when no rule matches or all matched
|
||||
* providers are unhealthy.
|
||||
*/
|
||||
const FALLBACK_CHAIN: Array<{ provider: string; model: string }> = [
|
||||
{ provider: 'anthropic', model: 'claude-sonnet-4-6' },
|
||||
{ provider: 'anthropic', model: 'claude-haiku-4-5' },
|
||||
{ provider: 'ollama', model: 'llama3.2' },
|
||||
];
|
||||
|
||||
// ─── Service ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@Injectable()
|
||||
export class RoutingEngineService {
|
||||
private readonly logger = new Logger(RoutingEngineService.name);
|
||||
|
||||
constructor(
|
||||
@Inject(DB) private readonly db: Db,
|
||||
@Inject(ProviderService) private readonly providerService: ProviderService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Classify the message, evaluate routing rules in priority order, and return
|
||||
* the best routing decision.
|
||||
*
|
||||
* @param message - Raw user message text used for classification.
|
||||
* @param userId - Optional user ID for loading user-scoped rules.
|
||||
* @param availableProviders - Optional pre-fetched provider health map to
|
||||
* avoid redundant health checks inside tight loops.
|
||||
*/
|
||||
async resolve(
|
||||
message: string,
|
||||
userId?: string,
|
||||
availableProviders?: Record<string, { status: string }>,
|
||||
): Promise<RoutingDecision> {
|
||||
const classification = classifyTask(message);
|
||||
this.logger.debug(
|
||||
`Classification: taskType=${classification.taskType} complexity=${classification.complexity} domain=${classification.domain}`,
|
||||
);
|
||||
|
||||
// Load health data once (re-use caller-supplied map if provided)
|
||||
const health = availableProviders ?? (await this.providerService.healthCheckAll());
|
||||
|
||||
// Load all applicable rules ordered by priority
|
||||
const rules = await this.loadRules(userId);
|
||||
|
||||
// Evaluate rules in priority order
|
||||
for (const rule of rules) {
|
||||
if (!rule.enabled) continue;
|
||||
|
||||
if (!this.matchConditions(rule, classification)) continue;
|
||||
|
||||
const providerStatus = health[rule.action.provider]?.status;
|
||||
const isHealthy = providerStatus === 'up' || providerStatus === 'ok';
|
||||
|
||||
if (!isHealthy) {
|
||||
this.logger.debug(
|
||||
`Rule "${rule.name}" matched but provider "${rule.action.provider}" is unhealthy (status: ${providerStatus ?? 'unknown'})`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Rule matched: "${rule.name}" → ${rule.action.provider}/${rule.action.model}`,
|
||||
);
|
||||
|
||||
return {
|
||||
provider: rule.action.provider,
|
||||
model: rule.action.model,
|
||||
agentConfigId: rule.action.agentConfigId,
|
||||
ruleName: rule.name,
|
||||
reason: `Matched routing rule "${rule.name}"`,
|
||||
};
|
||||
}
|
||||
|
||||
// No rule matched (or all matched providers were unhealthy) — apply fallback chain
|
||||
this.logger.debug('No rule matched; applying fallback chain');
|
||||
return this.applyFallbackChain(health);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether all conditions of a rule match the given task classification.
|
||||
* An empty conditions array always matches (catch-all / fallback rule).
|
||||
*/
|
||||
matchConditions(
|
||||
rule: Pick<RoutingRule, 'conditions'>,
|
||||
classification: TaskClassification,
|
||||
): boolean {
|
||||
if (rule.conditions.length === 0) return true;
|
||||
|
||||
return rule.conditions.every((condition) => this.evaluateCondition(condition, classification));
|
||||
}
|
||||
|
||||
// ─── Private helpers ───────────────────────────────────────────────────────
|
||||
|
||||
private evaluateCondition(
|
||||
condition: RoutingCondition,
|
||||
classification: TaskClassification,
|
||||
): boolean {
|
||||
// `costTier` is a valid condition field but is not part of TaskClassification
|
||||
// (it is supplied via userOverrides / request context). Treat unknown fields as
|
||||
// undefined so conditions referencing them simply do not match.
|
||||
const fieldValue = (classification as unknown as Record<string, unknown>)[condition.field];
|
||||
|
||||
switch (condition.operator) {
|
||||
case 'eq': {
|
||||
// Scalar equality: field value must equal condition value (string)
|
||||
if (typeof condition.value !== 'string') return false;
|
||||
return fieldValue === condition.value;
|
||||
}
|
||||
|
||||
case 'in': {
|
||||
// Set membership: condition value (array) contains field value
|
||||
if (!Array.isArray(condition.value)) return false;
|
||||
return condition.value.includes(fieldValue as string);
|
||||
}
|
||||
|
||||
case 'includes': {
|
||||
// Array containment: field value (array) includes condition value (string)
|
||||
if (!Array.isArray(fieldValue)) return false;
|
||||
if (typeof condition.value !== 'string') return false;
|
||||
return (fieldValue as string[]).includes(condition.value);
|
||||
}
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load routing rules from the database.
|
||||
* System rules + user-scoped rules (when userId is provided) are returned,
|
||||
* ordered by priority ascending.
|
||||
*/
|
||||
private async loadRules(userId?: string): Promise<RoutingRule[]> {
|
||||
const whereClause = userId
|
||||
? or(
|
||||
eq(routingRules.scope, 'system'),
|
||||
and(eq(routingRules.scope, 'user'), eq(routingRules.userId, userId)),
|
||||
)
|
||||
: eq(routingRules.scope, 'system');
|
||||
|
||||
const rows = await this.db
|
||||
.select()
|
||||
.from(routingRules)
|
||||
.where(whereClause)
|
||||
.orderBy(asc(routingRules.priority));
|
||||
|
||||
return rows.map((row) => ({
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
priority: row.priority,
|
||||
scope: row.scope as 'system' | 'user',
|
||||
userId: row.userId ?? undefined,
|
||||
conditions: (row.conditions as unknown as RoutingCondition[]) ?? [],
|
||||
action: row.action as unknown as {
|
||||
provider: string;
|
||||
model: string;
|
||||
agentConfigId?: string;
|
||||
systemPromptOverride?: string;
|
||||
toolAllowlist?: string[];
|
||||
},
|
||||
enabled: row.enabled,
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Walk the fallback chain and return the first healthy provider/model pair.
|
||||
* If none are healthy, return the first entry unconditionally (last resort).
|
||||
*/
|
||||
private applyFallbackChain(health: Record<string, { status: string }>): RoutingDecision {
|
||||
for (const candidate of FALLBACK_CHAIN) {
|
||||
const providerStatus = health[candidate.provider]?.status;
|
||||
const isHealthy = providerStatus === 'up' || providerStatus === 'ok';
|
||||
if (isHealthy) {
|
||||
this.logger.debug(`Fallback resolved: ${candidate.provider}/${candidate.model}`);
|
||||
return {
|
||||
provider: candidate.provider,
|
||||
model: candidate.model,
|
||||
ruleName: 'fallback',
|
||||
reason: `Fallback chain — no matching rule; selected ${candidate.provider}/${candidate.model}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// All providers in the fallback chain are unhealthy — use the first entry
|
||||
const lastResort = FALLBACK_CHAIN[0]!;
|
||||
this.logger.warn(
|
||||
`All fallback providers unhealthy; using last resort: ${lastResort.provider}/${lastResort.model}`,
|
||||
);
|
||||
return {
|
||||
provider: lastResort.provider,
|
||||
model: lastResort.model,
|
||||
ruleName: 'fallback',
|
||||
reason: `Fallback chain exhausted (all providers unhealthy); using ${lastResort.provider}/${lastResort.model}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user