feat(gateway): MosaicPlugin lifecycle + ReloadService + hot reload (P8-013) (#182)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #182.
This commit is contained in:
@@ -20,6 +20,7 @@ import { AdminModule } from './admin/admin.module.js';
|
||||
import { CommandsModule } from './commands/commands.module.js';
|
||||
import { PreferencesModule } from './preferences/preferences.module.js';
|
||||
import { GCModule } from './gc/gc.module.js';
|
||||
import { ReloadModule } from './reload/reload.module.js';
|
||||
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
||||
|
||||
@Module({
|
||||
@@ -44,6 +45,7 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
||||
PreferencesModule,
|
||||
CommandsModule,
|
||||
GCModule,
|
||||
ReloadModule,
|
||||
],
|
||||
controllers: [HealthController],
|
||||
providers: [
|
||||
|
||||
@@ -12,7 +12,7 @@ import {
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
|
||||
import type { Auth } from '@mosaic/auth';
|
||||
import type { SetThinkingPayload, SlashCommandPayload } from '@mosaic/types';
|
||||
import type { SetThinkingPayload, SlashCommandPayload, SystemReloadPayload } from '@mosaic/types';
|
||||
import { AgentService } from '../agent/agent.service.js';
|
||||
import { AUTH } from '../auth/auth.tokens.js';
|
||||
import { CommandRegistryService } from '../commands/command-registry.service.js';
|
||||
@@ -203,6 +203,11 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
||||
client.emit('command:result', result);
|
||||
}
|
||||
|
||||
broadcastReload(payload: SystemReloadPayload): void {
|
||||
this.server.emit('system:reload', payload);
|
||||
this.logger.log('Broadcasted system:reload to all connected clients');
|
||||
}
|
||||
|
||||
private relayEvent(client: Socket, conversationId: string, event: AgentSessionEvent): void {
|
||||
if (!client.connected) {
|
||||
this.logger.warn(
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { forwardRef, Module } from '@nestjs/common';
|
||||
import { CommandsModule } from '../commands/commands.module.js';
|
||||
import { ChatGateway } from './chat.gateway.js';
|
||||
import { ChatController } from './chat.controller.js';
|
||||
|
||||
@Module({
|
||||
imports: [CommandsModule],
|
||||
imports: [forwardRef(() => CommandsModule)],
|
||||
controllers: [ChatController],
|
||||
providers: [ChatGateway],
|
||||
exports: [ChatGateway],
|
||||
})
|
||||
export class ChatModule {}
|
||||
|
||||
@@ -45,6 +45,8 @@ function buildService(): CommandExecutorService {
|
||||
mockSystemOverride as never,
|
||||
mockSessionGC as never,
|
||||
mockRedis as never,
|
||||
null,
|
||||
null,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { forwardRef, Inject, Injectable, Logger, Optional } from '@nestjs/common';
|
||||
import type { QueueHandle } from '@mosaic/queue';
|
||||
import type { SlashCommandPayload, SlashCommandResultPayload } from '@mosaic/types';
|
||||
import { AgentService } from '../agent/agent.service.js';
|
||||
import { CommandRegistryService } from './command-registry.service.js';
|
||||
import { SystemOverrideService } from '../preferences/system-override.service.js';
|
||||
import { ChatGateway } from '../chat/chat.gateway.js';
|
||||
import { SessionGCService } from '../gc/session-gc.service.js';
|
||||
import { SystemOverrideService } from '../preferences/system-override.service.js';
|
||||
import { ReloadService } from '../reload/reload.service.js';
|
||||
import { COMMANDS_REDIS } from './commands.tokens.js';
|
||||
import { CommandRegistryService } from './command-registry.service.js';
|
||||
|
||||
@Injectable()
|
||||
export class CommandExecutorService {
|
||||
@@ -17,6 +19,12 @@ export class CommandExecutorService {
|
||||
@Inject(SystemOverrideService) private readonly systemOverride: SystemOverrideService,
|
||||
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
|
||||
@Inject(COMMANDS_REDIS) private readonly redis: QueueHandle['redis'],
|
||||
@Optional()
|
||||
@Inject(forwardRef(() => ReloadService))
|
||||
private readonly reloadService: ReloadService | null,
|
||||
@Optional()
|
||||
@Inject(forwardRef(() => ChatGateway))
|
||||
private readonly chatGateway: ChatGateway | null,
|
||||
) {}
|
||||
|
||||
async execute(payload: SlashCommandPayload, userId: string): Promise<SlashCommandResultPayload> {
|
||||
@@ -94,6 +102,24 @@ export class CommandExecutorService {
|
||||
};
|
||||
case 'tools':
|
||||
return await this.handleTools(conversationId, userId);
|
||||
case 'reload': {
|
||||
if (!this.reloadService) {
|
||||
return {
|
||||
command: 'reload',
|
||||
conversationId,
|
||||
success: false,
|
||||
message: 'ReloadService is not available.',
|
||||
};
|
||||
}
|
||||
const reloadResult = await this.reloadService.reload('command');
|
||||
this.chatGateway?.broadcastReload(reloadResult);
|
||||
return {
|
||||
command: 'reload',
|
||||
success: true,
|
||||
message: reloadResult.message,
|
||||
conversationId,
|
||||
};
|
||||
}
|
||||
default:
|
||||
return {
|
||||
command,
|
||||
|
||||
@@ -260,6 +260,14 @@ export class CommandRegistryService implements OnModuleInit {
|
||||
execution: 'socket',
|
||||
available: true,
|
||||
},
|
||||
{
|
||||
name: 'reload',
|
||||
description: 'Soft-reload gateway plugins and command manifest (admin)',
|
||||
aliases: [],
|
||||
scope: 'admin',
|
||||
execution: 'socket',
|
||||
available: true,
|
||||
},
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
import { Module, type OnApplicationShutdown, Inject } from '@nestjs/common';
|
||||
import { forwardRef, Inject, Module, type OnApplicationShutdown } from '@nestjs/common';
|
||||
import { createQueue, type QueueHandle } from '@mosaic/queue';
|
||||
import { CommandRegistryService } from './command-registry.service.js';
|
||||
import { CommandExecutorService } from './command-executor.service.js';
|
||||
import { ChatModule } from '../chat/chat.module.js';
|
||||
import { GCModule } from '../gc/gc.module.js';
|
||||
import { ReloadModule } from '../reload/reload.module.js';
|
||||
import { CommandExecutorService } from './command-executor.service.js';
|
||||
import { CommandRegistryService } from './command-registry.service.js';
|
||||
import { COMMANDS_REDIS } from './commands.tokens.js';
|
||||
|
||||
const COMMANDS_QUEUE_HANDLE = 'COMMANDS_QUEUE_HANDLE';
|
||||
|
||||
@Module({
|
||||
imports: [GCModule],
|
||||
imports: [GCModule, forwardRef(() => ReloadModule), forwardRef(() => ChatModule)],
|
||||
providers: [
|
||||
{
|
||||
provide: COMMANDS_QUEUE_HANDLE,
|
||||
|
||||
20
apps/gateway/src/reload/mosaic-plugin.interface.ts
Normal file
20
apps/gateway/src/reload/mosaic-plugin.interface.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
export interface MosaicPlugin {
|
||||
/** Called when the plugin is loaded/reloaded */
|
||||
onLoad(): Promise<void>;
|
||||
|
||||
/** Called before the plugin is unloaded during reload */
|
||||
onUnload(): Promise<void>;
|
||||
|
||||
/** Plugin identifier for registry */
|
||||
readonly pluginName: string;
|
||||
}
|
||||
|
||||
export function isMosaicPlugin(obj: unknown): obj is MosaicPlugin {
|
||||
return (
|
||||
typeof obj === 'object' &&
|
||||
obj !== null &&
|
||||
typeof (obj as MosaicPlugin).onLoad === 'function' &&
|
||||
typeof (obj as MosaicPlugin).onUnload === 'function' &&
|
||||
typeof (obj as MosaicPlugin).pluginName === 'string'
|
||||
);
|
||||
}
|
||||
22
apps/gateway/src/reload/reload.controller.ts
Normal file
22
apps/gateway/src/reload/reload.controller.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { Controller, HttpCode, HttpStatus, Inject, Post, UseGuards } from '@nestjs/common';
|
||||
import type { SystemReloadPayload } from '@mosaic/types';
|
||||
import { AdminGuard } from '../admin/admin.guard.js';
|
||||
import { ChatGateway } from '../chat/chat.gateway.js';
|
||||
import { ReloadService } from './reload.service.js';
|
||||
|
||||
@Controller('api/admin')
|
||||
@UseGuards(AdminGuard)
|
||||
export class ReloadController {
|
||||
constructor(
|
||||
@Inject(ReloadService) private readonly reloadService: ReloadService,
|
||||
@Inject(ChatGateway) private readonly chatGateway: ChatGateway,
|
||||
) {}
|
||||
|
||||
@Post('reload')
|
||||
@HttpCode(HttpStatus.OK)
|
||||
async triggerReload(): Promise<SystemReloadPayload> {
|
||||
const result = await this.reloadService.reload('rest');
|
||||
this.chatGateway.broadcastReload(result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
14
apps/gateway/src/reload/reload.module.ts
Normal file
14
apps/gateway/src/reload/reload.module.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { forwardRef, Module } from '@nestjs/common';
|
||||
import { AdminGuard } from '../admin/admin.guard.js';
|
||||
import { ChatModule } from '../chat/chat.module.js';
|
||||
import { CommandsModule } from '../commands/commands.module.js';
|
||||
import { ReloadController } from './reload.controller.js';
|
||||
import { ReloadService } from './reload.service.js';
|
||||
|
||||
@Module({
|
||||
imports: [forwardRef(() => CommandsModule), forwardRef(() => ChatModule)],
|
||||
controllers: [ReloadController],
|
||||
providers: [ReloadService, AdminGuard],
|
||||
exports: [ReloadService],
|
||||
})
|
||||
export class ReloadModule {}
|
||||
106
apps/gateway/src/reload/reload.service.spec.ts
Normal file
106
apps/gateway/src/reload/reload.service.spec.ts
Normal file
@@ -0,0 +1,106 @@
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { ReloadService } from './reload.service.js';
|
||||
|
||||
function createMockCommandRegistry() {
|
||||
return {
|
||||
getManifest: vi.fn().mockReturnValue({
|
||||
version: 1,
|
||||
commands: [],
|
||||
skills: [],
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function createService() {
|
||||
const registry = createMockCommandRegistry();
|
||||
const service = new ReloadService(registry as never);
|
||||
return { service, registry };
|
||||
}
|
||||
|
||||
describe('ReloadService', () => {
|
||||
it('reload() calls onUnload then onLoad for registered MosaicPlugin', async () => {
|
||||
const { service } = createService();
|
||||
|
||||
const callOrder: string[] = [];
|
||||
const mockPlugin = {
|
||||
pluginName: 'test-plugin',
|
||||
onLoad: vi.fn().mockImplementation(() => {
|
||||
callOrder.push('onLoad');
|
||||
return Promise.resolve();
|
||||
}),
|
||||
onUnload: vi.fn().mockImplementation(() => {
|
||||
callOrder.push('onUnload');
|
||||
return Promise.resolve();
|
||||
}),
|
||||
};
|
||||
|
||||
service.registerPlugin('test-plugin', mockPlugin);
|
||||
const result = await service.reload('command');
|
||||
|
||||
expect(mockPlugin.onUnload).toHaveBeenCalledOnce();
|
||||
expect(mockPlugin.onLoad).toHaveBeenCalledOnce();
|
||||
expect(callOrder).toEqual(['onUnload', 'onLoad']);
|
||||
expect(result.message).toContain('test-plugin');
|
||||
});
|
||||
|
||||
it('reload() continues if one plugin throws during onUnload', async () => {
|
||||
const { service } = createService();
|
||||
|
||||
const badPlugin = {
|
||||
pluginName: 'bad-plugin',
|
||||
onLoad: vi.fn().mockResolvedValue(undefined),
|
||||
onUnload: vi.fn().mockRejectedValue(new Error('unload failed')),
|
||||
};
|
||||
|
||||
service.registerPlugin('bad-plugin', badPlugin);
|
||||
const result = await service.reload('command');
|
||||
|
||||
expect(result.message).toContain('bad-plugin');
|
||||
expect(result.message).toContain('unload failed');
|
||||
});
|
||||
|
||||
it('reload() skips non-MosaicPlugin objects', async () => {
|
||||
const { service } = createService();
|
||||
|
||||
const notAPlugin = { foo: 'bar' };
|
||||
service.registerPlugin('not-a-plugin', notAPlugin);
|
||||
|
||||
// Should not throw
|
||||
const result = await service.reload('command');
|
||||
expect(result).toBeDefined();
|
||||
expect(result.message).not.toContain('not-a-plugin');
|
||||
});
|
||||
|
||||
it('reload() returns SystemReloadPayload with commands, skills, providers, message', async () => {
|
||||
const { service, registry } = createService();
|
||||
registry.getManifest.mockReturnValue({
|
||||
version: 1,
|
||||
commands: [
|
||||
{
|
||||
name: 'test',
|
||||
description: 'test cmd',
|
||||
aliases: [],
|
||||
scope: 'core',
|
||||
execution: 'socket',
|
||||
available: true,
|
||||
},
|
||||
],
|
||||
skills: [],
|
||||
});
|
||||
|
||||
const result = await service.reload('rest');
|
||||
|
||||
expect(result).toHaveProperty('commands');
|
||||
expect(result).toHaveProperty('skills');
|
||||
expect(result).toHaveProperty('providers');
|
||||
expect(result).toHaveProperty('message');
|
||||
expect(result.commands).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('registerPlugin() logs plugin registration', () => {
|
||||
const { service } = createService();
|
||||
|
||||
// Should not throw and should register
|
||||
expect(() => service.registerPlugin('my-plugin', {})).not.toThrow();
|
||||
});
|
||||
});
|
||||
92
apps/gateway/src/reload/reload.service.ts
Normal file
92
apps/gateway/src/reload/reload.service.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import {
|
||||
Inject,
|
||||
Injectable,
|
||||
Logger,
|
||||
type OnApplicationBootstrap,
|
||||
type OnApplicationShutdown,
|
||||
} from '@nestjs/common';
|
||||
import type { SystemReloadPayload } from '@mosaic/types';
|
||||
import { CommandRegistryService } from '../commands/command-registry.service.js';
|
||||
import { isMosaicPlugin } from './mosaic-plugin.interface.js';
|
||||
|
||||
@Injectable()
|
||||
export class ReloadService implements OnApplicationBootstrap, OnApplicationShutdown {
|
||||
private readonly logger = new Logger(ReloadService.name);
|
||||
private readonly plugins: Map<string, unknown> = new Map();
|
||||
private shutdownHandlerAttached = false;
|
||||
|
||||
constructor(
|
||||
@Inject(CommandRegistryService) private readonly commandRegistry: CommandRegistryService,
|
||||
) {}
|
||||
|
||||
onApplicationBootstrap(): void {
|
||||
if (!this.shutdownHandlerAttached) {
|
||||
process.on('SIGHUP', () => {
|
||||
this.logger.log('SIGHUP received — triggering soft reload');
|
||||
this.reload('sighup').catch((err: unknown) => {
|
||||
this.logger.error(`SIGHUP reload failed: ${err}`);
|
||||
});
|
||||
});
|
||||
this.shutdownHandlerAttached = true;
|
||||
}
|
||||
}
|
||||
|
||||
onApplicationShutdown(): void {
|
||||
process.removeAllListeners('SIGHUP');
|
||||
}
|
||||
|
||||
registerPlugin(name: string, plugin: unknown): void {
|
||||
this.plugins.set(name, plugin);
|
||||
this.logger.log(`Plugin registered: ${name}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Soft reload — unload plugins, reload plugins, broadcast.
|
||||
* Does NOT restart the HTTP server or drop connections.
|
||||
*/
|
||||
async reload(
|
||||
trigger: 'command' | 'rest' | 'sighup' | 'file-watch',
|
||||
): Promise<SystemReloadPayload> {
|
||||
this.logger.log(`Soft reload triggered by: ${trigger}`);
|
||||
const reloaded: string[] = [];
|
||||
const errors: string[] = [];
|
||||
|
||||
// 1. Unload all registered MosaicPlugin instances
|
||||
for (const [name, plugin] of this.plugins) {
|
||||
if (isMosaicPlugin(plugin)) {
|
||||
try {
|
||||
await plugin.onUnload();
|
||||
reloaded.push(name);
|
||||
} catch (err) {
|
||||
errors.push(`${name}: unload failed — ${err}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Reload all MosaicPlugin instances
|
||||
for (const [name, plugin] of this.plugins) {
|
||||
if (isMosaicPlugin(plugin)) {
|
||||
try {
|
||||
await plugin.onLoad();
|
||||
} catch (err) {
|
||||
errors.push(`${name}: load failed — ${err}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const manifest = this.commandRegistry.getManifest();
|
||||
|
||||
const errorSuffix = errors.length > 0 ? ` Errors: ${errors.join(', ')}` : '';
|
||||
const payload: SystemReloadPayload = {
|
||||
commands: manifest.commands,
|
||||
skills: manifest.skills,
|
||||
providers: [],
|
||||
message: `Reload complete (trigger=${trigger}). Plugins reloaded: [${reloaded.join(', ')}].${errorSuffix}`,
|
||||
};
|
||||
|
||||
this.logger.log(
|
||||
`Reload complete. Reloaded: [${reloaded.join(', ')}]. Errors: ${errors.length}`,
|
||||
);
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user