feat(M4-009,M4-010,M4-011): routing rules CRUD, per-user overrides, agent capabilities (#320)
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #320.
This commit is contained in:
@@ -13,12 +13,18 @@ import { Server, Socket } from 'socket.io';
|
||||
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
|
||||
import type { Auth } from '@mosaic/auth';
|
||||
import type { Brain } from '@mosaic/brain';
|
||||
import type { SetThinkingPayload, SlashCommandPayload, SystemReloadPayload } from '@mosaic/types';
|
||||
import type {
|
||||
SetThinkingPayload,
|
||||
SlashCommandPayload,
|
||||
SystemReloadPayload,
|
||||
RoutingDecisionInfo,
|
||||
} from '@mosaic/types';
|
||||
import { AgentService, type ConversationHistoryMessage } from '../agent/agent.service.js';
|
||||
import { AUTH } from '../auth/auth.tokens.js';
|
||||
import { BRAIN } from '../brain/brain.tokens.js';
|
||||
import { CommandRegistryService } from '../commands/command-registry.service.js';
|
||||
import { CommandExecutorService } from '../commands/command-executor.service.js';
|
||||
import { RoutingEngineService } from '../agent/routing/routing-engine.service.js';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { ChatSocketMessageDto } from './chat.dto.js';
|
||||
import { validateSocketSession } from './chat.gateway-auth.js';
|
||||
@@ -33,8 +39,16 @@ interface ClientSession {
|
||||
toolCalls: Array<{ toolCallId: string; toolName: string; args: unknown; isError: boolean }>;
|
||||
/** Tool calls in-flight (started but not ended yet). */
|
||||
pendingToolCalls: Map<string, { toolName: string; args: unknown }>;
|
||||
/** Last routing decision made for this session (M4-008) */
|
||||
lastRoutingDecision?: RoutingDecisionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-conversation model overrides set via /model command (M4-007).
|
||||
* Keyed by conversationId, value is the model name to use.
|
||||
*/
|
||||
const modelOverrides = new Map<string, string>();
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: {
|
||||
origin: process.env['GATEWAY_CORS_ORIGIN'] ?? 'http://localhost:3000',
|
||||
@@ -54,6 +68,7 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
@Inject(BRAIN) private readonly brain: Brain,
|
||||
@Inject(CommandRegistryService) private readonly commandRegistry: CommandRegistryService,
|
||||
@Inject(CommandExecutorService) private readonly commandExecutor: CommandExecutorService,
|
||||
@Inject(RoutingEngineService) private readonly routingEngine: RoutingEngineService,
|
||||
) {}
|
||||
|
||||
afterInit(): void {
|
||||
@@ -97,15 +112,50 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
this.logger.log(`Message from ${client.id} in conversation ${conversationId}`);
|
||||
|
||||
// Ensure agent session exists for this conversation
|
||||
let sessionRoutingDecision: RoutingDecisionInfo | undefined;
|
||||
try {
|
||||
let agentSession = this.agentService.getSession(conversationId);
|
||||
if (!agentSession) {
|
||||
// When resuming an existing conversation, load prior messages to inject as context (M1-004)
|
||||
const conversationHistory = await this.loadConversationHistory(conversationId, userId);
|
||||
|
||||
// Determine provider/model via routing engine or per-session /model override (M4-012 / M4-007)
|
||||
let resolvedProvider = data.provider;
|
||||
let resolvedModelId = data.modelId;
|
||||
|
||||
const modelOverride = modelOverrides.get(conversationId);
|
||||
if (modelOverride) {
|
||||
// /model override bypasses routing engine (M4-007)
|
||||
resolvedModelId = modelOverride;
|
||||
this.logger.log(
|
||||
`Using /model override "${modelOverride}" for conversation=${conversationId}`,
|
||||
);
|
||||
} else if (!resolvedProvider && !resolvedModelId) {
|
||||
// No explicit provider/model from client — use routing engine (M4-012)
|
||||
try {
|
||||
const routingDecision = await this.routingEngine.resolve(data.content, userId);
|
||||
resolvedProvider = routingDecision.provider;
|
||||
resolvedModelId = routingDecision.model;
|
||||
sessionRoutingDecision = {
|
||||
model: routingDecision.model,
|
||||
provider: routingDecision.provider,
|
||||
ruleName: routingDecision.ruleName,
|
||||
reason: routingDecision.reason,
|
||||
};
|
||||
this.logger.log(
|
||||
`Routing decision for conversation=${conversationId}: ${routingDecision.provider}/${routingDecision.model} (rule="${routingDecision.ruleName}")`,
|
||||
);
|
||||
} catch (routingErr) {
|
||||
this.logger.warn(
|
||||
`Routing engine failed for conversation=${conversationId}, using defaults`,
|
||||
routingErr instanceof Error ? routingErr.message : String(routingErr),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
agentSession = await this.agentService.createSession(conversationId, {
|
||||
provider: data.provider,
|
||||
modelId: data.modelId,
|
||||
provider: resolvedProvider,
|
||||
modelId: resolvedModelId,
|
||||
agentConfigId: data.agentId,
|
||||
userId,
|
||||
conversationHistory: conversationHistory.length > 0 ? conversationHistory : undefined,
|
||||
@@ -167,18 +217,23 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
this.relayEvent(client, conversationId, event);
|
||||
});
|
||||
|
||||
// Preserve routing decision from the existing client session if we didn't get a new one
|
||||
const prevClientSession = this.clientSessions.get(client.id);
|
||||
const routingDecisionToStore = sessionRoutingDecision ?? prevClientSession?.lastRoutingDecision;
|
||||
|
||||
this.clientSessions.set(client.id, {
|
||||
conversationId,
|
||||
cleanup,
|
||||
assistantText: '',
|
||||
toolCalls: [],
|
||||
pendingToolCalls: new Map(),
|
||||
lastRoutingDecision: routingDecisionToStore,
|
||||
});
|
||||
|
||||
// Track channel connection
|
||||
this.agentService.addChannel(conversationId, `websocket:${client.id}`);
|
||||
|
||||
// Send session info so the client knows the model/provider
|
||||
// Send session info so the client knows the model/provider (M4-008: include routing decision)
|
||||
{
|
||||
const agentSession = this.agentService.getSession(conversationId);
|
||||
if (agentSession) {
|
||||
@@ -189,6 +244,7 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
modelId: agentSession.modelId,
|
||||
thinkingLevel: piSession.thinkingLevel,
|
||||
availableThinkingLevels: piSession.getAvailableThinkingLevels(),
|
||||
...(routingDecisionToStore ? { routingDecision: routingDecisionToStore } : {}),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -263,6 +319,28 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
this.logger.log('Broadcasted system:reload to all connected clients');
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a per-conversation model override (M4-007).
|
||||
* When set, the routing engine is bypassed and the specified model is used.
|
||||
* Pass null to clear the override and resume automatic routing.
|
||||
*/
|
||||
setModelOverride(conversationId: string, modelName: string | null): void {
|
||||
if (modelName) {
|
||||
modelOverrides.set(conversationId, modelName);
|
||||
this.logger.log(`Model override set: conversation=${conversationId} model="${modelName}"`);
|
||||
} else {
|
||||
modelOverrides.delete(conversationId);
|
||||
this.logger.log(`Model override cleared: conversation=${conversationId}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the active model override for a conversation, or undefined if none.
|
||||
*/
|
||||
getModelOverride(conversationId: string): string | undefined {
|
||||
return modelOverrides.get(conversationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure a conversation record exists in the DB.
|
||||
* Creates it if absent — safe to call concurrently since a duplicate insert
|
||||
|
||||
Reference in New Issue
Block a user