feat(M6-006,M6-007,M7-001,M7-002): admin jobs API, job event logging, channel adapter interface, message protocol (#325)
Some checks failed
ci/woodpecker/push/ci Pipeline failed

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #325.
This commit is contained in:
2026-03-23 01:21:03 +00:00
committed by jason.woltje
parent 701bb69e6c
commit a532fd43b2
13 changed files with 1458 additions and 31 deletions

View File

@@ -0,0 +1,377 @@
/**
* M5-008: Session hardening verification tests.
*
* Verifies:
* 1. /model command switches model → session:info reflects updated modelId
* 2. /agent command switches agent config → system prompt / agentName changes
* 3. Session resume binds to a conversation (history injected via conversationHistory option)
* 4. Session metrics track token usage and message count correctly
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type {
AgentSession,
AgentSessionOptions,
ConversationHistoryMessage,
} from '../agent/agent.service.js';
import type { SessionInfoDto, SessionMetrics, SessionTokenMetrics } from '../agent/session.dto.js';
// ---------------------------------------------------------------------------
// Helpers — minimal AgentSession fixture
// ---------------------------------------------------------------------------
function makeMetrics(overrides?: Partial<SessionMetrics>): SessionMetrics {
return {
tokens: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
modelSwitches: 0,
messageCount: 0,
lastActivityAt: new Date().toISOString(),
...overrides,
};
}
function makeSession(overrides?: Partial<AgentSession>): AgentSession {
return {
id: 'session-001',
provider: 'anthropic',
modelId: 'claude-3-5-sonnet-20241022',
piSession: {} as AgentSession['piSession'],
listeners: new Set(),
unsubscribe: vi.fn(),
createdAt: Date.now(),
promptCount: 0,
channels: new Set(),
skillPromptAdditions: [],
sandboxDir: '/tmp',
allowedTools: null,
metrics: makeMetrics(),
...overrides,
};
}
function sessionToInfo(session: AgentSession): SessionInfoDto {
return {
id: session.id,
provider: session.provider,
modelId: session.modelId,
...(session.agentName ? { agentName: session.agentName } : {}),
createdAt: new Date(session.createdAt).toISOString(),
promptCount: session.promptCount,
channels: Array.from(session.channels),
durationMs: Date.now() - session.createdAt,
metrics: { ...session.metrics },
};
}
// ---------------------------------------------------------------------------
// Replicated AgentService methods (tested in isolation without full DI setup)
// ---------------------------------------------------------------------------
function updateSessionModel(session: AgentSession, modelId: string): void {
session.modelId = modelId;
session.metrics.modelSwitches += 1;
session.metrics.lastActivityAt = new Date().toISOString();
}
function applyAgentConfig(
session: AgentSession,
agentConfigId: string,
agentName: string,
modelId?: string,
): void {
session.agentConfigId = agentConfigId;
session.agentName = agentName;
if (modelId) {
updateSessionModel(session, modelId);
}
}
function recordTokenUsage(session: AgentSession, tokens: SessionTokenMetrics): void {
session.metrics.tokens.input += tokens.input;
session.metrics.tokens.output += tokens.output;
session.metrics.tokens.cacheRead += tokens.cacheRead;
session.metrics.tokens.cacheWrite += tokens.cacheWrite;
session.metrics.tokens.total += tokens.total;
session.metrics.lastActivityAt = new Date().toISOString();
}
function recordMessage(session: AgentSession): void {
session.metrics.messageCount += 1;
session.metrics.lastActivityAt = new Date().toISOString();
}
// ---------------------------------------------------------------------------
// 1. /model command — switches model → session:info updated
// ---------------------------------------------------------------------------
describe('/model command — model switch reflected in session:info', () => {
let session: AgentSession;
beforeEach(() => {
session = makeSession();
});
it('updates modelId when /model is called with a model name', () => {
updateSessionModel(session, 'claude-opus-4-5-20251001');
expect(session.modelId).toBe('claude-opus-4-5-20251001');
});
it('increments modelSwitches metric after /model command', () => {
expect(session.metrics.modelSwitches).toBe(0);
updateSessionModel(session, 'gpt-4o');
expect(session.metrics.modelSwitches).toBe(1);
updateSessionModel(session, 'claude-3-5-sonnet-20241022');
expect(session.metrics.modelSwitches).toBe(2);
});
it('session:info DTO reflects the new modelId after switch', () => {
updateSessionModel(session, 'claude-haiku-3-5-20251001');
const info = sessionToInfo(session);
expect(info.modelId).toBe('claude-haiku-3-5-20251001');
expect(info.metrics.modelSwitches).toBe(1);
});
it('lastActivityAt is updated after model switch', () => {
const before = session.metrics.lastActivityAt;
// Ensure at least 1ms passes
vi.setSystemTime(Date.now() + 1);
updateSessionModel(session, 'new-model');
vi.useRealTimers();
expect(session.metrics.lastActivityAt).not.toBe(before);
});
});
// ---------------------------------------------------------------------------
// 2. /agent command — switches agent config → system prompt / agentName updated
// ---------------------------------------------------------------------------
describe('/agent command — agent config applied to session', () => {
let session: AgentSession;
beforeEach(() => {
session = makeSession();
});
it('sets agentConfigId and agentName on the session', () => {
applyAgentConfig(session, 'agent-uuid-001', 'CodeReviewer');
expect(session.agentConfigId).toBe('agent-uuid-001');
expect(session.agentName).toBe('CodeReviewer');
});
it('also updates modelId when agent config carries a model', () => {
applyAgentConfig(session, 'agent-uuid-002', 'DataAnalyst', 'gpt-4o-mini');
expect(session.agentName).toBe('DataAnalyst');
expect(session.modelId).toBe('gpt-4o-mini');
expect(session.metrics.modelSwitches).toBe(1);
});
it('does NOT update modelId when agent config has no model', () => {
const originalModel = session.modelId;
applyAgentConfig(session, 'agent-uuid-003', 'Planner', undefined);
expect(session.modelId).toBe(originalModel);
expect(session.metrics.modelSwitches).toBe(0);
});
it('session:info DTO reflects agentName after /agent switch', () => {
applyAgentConfig(session, 'agent-uuid-004', 'DevBot');
const info = sessionToInfo(session);
expect(info.agentName).toBe('DevBot');
});
it('multiple /agent calls update to the latest agent', () => {
applyAgentConfig(session, 'agent-001', 'FirstAgent');
applyAgentConfig(session, 'agent-002', 'SecondAgent');
expect(session.agentConfigId).toBe('agent-002');
expect(session.agentName).toBe('SecondAgent');
});
});
// ---------------------------------------------------------------------------
// 3. Session resume — binds to conversation via conversationHistory
// ---------------------------------------------------------------------------
describe('Session resume — binds to conversation', () => {
it('conversationHistory option is preserved in session options', () => {
const history: ConversationHistoryMessage[] = [
{
role: 'user',
content: 'Hello, what is TypeScript?',
createdAt: new Date('2026-01-01T00:01:00Z'),
},
{
role: 'assistant',
content: 'TypeScript is a typed superset of JavaScript.',
createdAt: new Date('2026-01-01T00:01:05Z'),
},
];
const options: AgentSessionOptions = {
conversationHistory: history,
provider: 'anthropic',
modelId: 'claude-3-5-sonnet-20241022',
};
expect(options.conversationHistory).toHaveLength(2);
expect(options.conversationHistory![0]!.role).toBe('user');
expect(options.conversationHistory![1]!.role).toBe('assistant');
});
it('session with conversationHistory option carries the conversation binding', () => {
const CONV_ID = 'conv-resume-001';
const history: ConversationHistoryMessage[] = [
{ role: 'user', content: 'Prior question', createdAt: new Date('2026-01-01T00:01:00Z') },
];
// Simulate what ChatGateway does: pass conversationId + history to createSession
const options: AgentSessionOptions = {
conversationHistory: history,
};
// The session ID is the conversationId in the gateway
const session = makeSession({ id: CONV_ID });
expect(session.id).toBe(CONV_ID);
expect(options.conversationHistory).toHaveLength(1);
});
it('empty conversationHistory is valid (new conversation)', () => {
const options: AgentSessionOptions = {
conversationHistory: [],
};
expect(options.conversationHistory).toHaveLength(0);
});
it('resumed session preserves all message roles', () => {
const history: ConversationHistoryMessage[] = [
{ role: 'system', content: 'You are a helpful assistant.', createdAt: new Date() },
{ role: 'user', content: 'Question 1', createdAt: new Date() },
{ role: 'assistant', content: 'Answer 1', createdAt: new Date() },
{ role: 'user', content: 'Question 2', createdAt: new Date() },
];
const roles = history.map((m) => m.role);
expect(roles).toEqual(['system', 'user', 'assistant', 'user']);
});
});
// ---------------------------------------------------------------------------
// 4. Session metrics — token usage and message count
// ---------------------------------------------------------------------------
describe('Session metrics — token usage and message count', () => {
let session: AgentSession;
beforeEach(() => {
session = makeSession();
});
it('starts with zero metrics', () => {
expect(session.metrics.tokens.input).toBe(0);
expect(session.metrics.tokens.output).toBe(0);
expect(session.metrics.tokens.total).toBe(0);
expect(session.metrics.messageCount).toBe(0);
expect(session.metrics.modelSwitches).toBe(0);
});
it('accumulates token usage across multiple turns', () => {
recordTokenUsage(session, {
input: 100,
output: 50,
cacheRead: 0,
cacheWrite: 0,
total: 150,
});
recordTokenUsage(session, {
input: 200,
output: 80,
cacheRead: 10,
cacheWrite: 5,
total: 295,
});
expect(session.metrics.tokens.input).toBe(300);
expect(session.metrics.tokens.output).toBe(130);
expect(session.metrics.tokens.cacheRead).toBe(10);
expect(session.metrics.tokens.cacheWrite).toBe(5);
expect(session.metrics.tokens.total).toBe(445);
});
it('increments message count with each recordMessage call', () => {
expect(session.metrics.messageCount).toBe(0);
recordMessage(session);
expect(session.metrics.messageCount).toBe(1);
recordMessage(session);
recordMessage(session);
expect(session.metrics.messageCount).toBe(3);
});
it('session:info DTO exposes correct metrics snapshot', () => {
recordTokenUsage(session, {
input: 500,
output: 100,
cacheRead: 20,
cacheWrite: 10,
total: 630,
});
recordMessage(session);
recordMessage(session);
updateSessionModel(session, 'claude-haiku-3-5-20251001');
const info = sessionToInfo(session);
expect(info.metrics.tokens.input).toBe(500);
expect(info.metrics.tokens.output).toBe(100);
expect(info.metrics.tokens.total).toBe(630);
expect(info.metrics.messageCount).toBe(2);
expect(info.metrics.modelSwitches).toBe(1);
});
it('metrics are independent per session', () => {
const sessionA = makeSession({ id: 'session-A' });
const sessionB = makeSession({ id: 'session-B' });
recordTokenUsage(sessionA, { input: 100, output: 50, cacheRead: 0, cacheWrite: 0, total: 150 });
recordMessage(sessionA);
// Session B should remain at zero
expect(sessionB.metrics.tokens.input).toBe(0);
expect(sessionB.metrics.messageCount).toBe(0);
// Session A should have updated values
expect(sessionA.metrics.tokens.input).toBe(100);
expect(sessionA.metrics.messageCount).toBe(1);
});
it('lastActivityAt is updated after recording tokens', () => {
const before = session.metrics.lastActivityAt;
vi.setSystemTime(new Date(Date.now() + 100));
recordTokenUsage(session, { input: 10, output: 5, cacheRead: 0, cacheWrite: 0, total: 15 });
vi.useRealTimers();
expect(session.metrics.lastActivityAt).not.toBe(before);
});
it('lastActivityAt is updated after recording a message', () => {
const before = session.metrics.lastActivityAt;
vi.setSystemTime(new Date(Date.now() + 100));
recordMessage(session);
vi.useRealTimers();
expect(session.metrics.lastActivityAt).not.toBe(before);
});
});

View File

@@ -0,0 +1,128 @@
import {
Controller,
Get,
HttpCode,
HttpStatus,
Inject,
NotFoundException,
Optional,
Param,
Post,
Query,
UseGuards,
} from '@nestjs/common';
import { AdminGuard } from './admin.guard.js';
import { QueueService } from '../queue/queue.service.js';
import type { JobDto, JobListDto, JobStatus, QueueListDto } from '../queue/queue-admin.dto.js';
@Controller('api/admin/jobs')
@UseGuards(AdminGuard)
export class AdminJobsController {
constructor(
@Optional()
@Inject(QueueService)
private readonly queueService: QueueService | null,
) {}
/**
* GET /api/admin/jobs
* List jobs across all queues. Optional ?status=active|completed|failed|waiting|delayed
*/
@Get()
async listJobs(@Query('status') status?: string): Promise<JobListDto> {
if (!this.queueService) {
return { jobs: [], total: 0 };
}
const validStatuses: JobStatus[] = ['active', 'completed', 'failed', 'waiting', 'delayed'];
const normalised = status as JobStatus | undefined;
if (normalised && !validStatuses.includes(normalised)) {
return { jobs: [], total: 0 };
}
const jobs: JobDto[] = await this.queueService.listJobs(normalised);
return { jobs, total: jobs.length };
}
/**
* POST /api/admin/jobs/:id/retry
* Retry a specific failed job. The id is "<queue>__<bullmq-job-id>".
*/
@Post(':id/retry')
@HttpCode(HttpStatus.OK)
async retryJob(@Param('id') id: string): Promise<{ ok: boolean; message: string }> {
if (!this.queueService) {
throw new NotFoundException('Queue service is not available');
}
const result = await this.queueService.retryJob(id);
if (!result.ok) {
throw new NotFoundException(result.message);
}
return result;
}
/**
* GET /api/admin/jobs/queues
* Return status for all managed queues.
*/
@Get('queues')
async listQueues(): Promise<QueueListDto> {
if (!this.queueService) {
return { queues: [] };
}
const health = await this.queueService.getHealthStatus();
const queues = Object.entries(health.queues).map(([name, stats]) => ({
name,
waiting: stats.waiting,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
delayed: 0,
paused: stats.paused,
}));
return { queues };
}
/**
* POST /api/admin/jobs/queues/:name/pause
* Pause the named queue.
*/
@Post('queues/:name/pause')
@HttpCode(HttpStatus.OK)
async pauseQueue(@Param('name') name: string): Promise<{ ok: boolean; message: string }> {
if (!this.queueService) {
throw new NotFoundException('Queue service is not available');
}
const result = await this.queueService.pauseQueue(name);
if (!result.ok) {
throw new NotFoundException(result.message);
}
return result;
}
/**
* POST /api/admin/jobs/queues/:name/resume
* Resume the named queue.
*/
@Post('queues/:name/resume')
@HttpCode(HttpStatus.OK)
async resumeQueue(@Param('name') name: string): Promise<{ ok: boolean; message: string }> {
if (!this.queueService) {
throw new NotFoundException('Queue service is not available');
}
const result = await this.queueService.resumeQueue(name);
if (!result.ok) {
throw new NotFoundException(result.message);
}
return result;
}
}

View File

@@ -1,10 +1,11 @@
import { Module } from '@nestjs/common';
import { AdminController } from './admin.controller.js';
import { AdminHealthController } from './admin-health.controller.js';
import { AdminJobsController } from './admin-jobs.controller.js';
import { AdminGuard } from './admin.guard.js';
@Module({
controllers: [AdminController, AdminHealthController],
controllers: [AdminController, AdminHealthController, AdminJobsController],
providers: [AdminGuard],
})
export class AdminModule {}

View File

@@ -22,6 +22,7 @@ import { PreferencesModule } from './preferences/preferences.module.js';
import { GCModule } from './gc/gc.module.js';
import { ReloadModule } from './reload/reload.module.js';
import { WorkspaceModule } from './workspace/workspace.module.js';
import { QueueModule } from './queue/queue.module.js';
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
@Module({
@@ -46,6 +47,7 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
PreferencesModule,
CommandsModule,
GCModule,
QueueModule,
ReloadModule,
WorkspaceModule,
],

View File

@@ -5,59 +5,72 @@ import {
type OnModuleInit,
type OnModuleDestroy,
} from '@nestjs/common';
import cron from 'node-cron';
import { SummarizationService } from './summarization.service.js';
import { SessionGCService } from '../gc/session-gc.service.js';
import {
QueueService,
QUEUE_SUMMARIZATION,
QUEUE_GC,
QUEUE_TIER_MANAGEMENT,
} from '../queue/queue.service.js';
import type { Worker } from 'bullmq';
import type { MosaicJobData } from '../queue/queue.service.js';
@Injectable()
export class CronService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CronService.name);
private readonly tasks: cron.ScheduledTask[] = [];
private readonly registeredWorkers: Worker<MosaicJobData>[] = [];
constructor(
@Inject(SummarizationService) private readonly summarization: SummarizationService,
@Inject(SessionGCService) private readonly sessionGC: SessionGCService,
@Inject(QueueService) private readonly queueService: QueueService,
) {}
onModuleInit(): void {
async onModuleInit(): Promise<void> {
const summarizationSchedule = process.env['SUMMARIZATION_CRON'] ?? '0 */6 * * *'; // every 6 hours
const tierManagementSchedule = process.env['TIER_MANAGEMENT_CRON'] ?? '0 3 * * *'; // daily at 3am
const gcSchedule = process.env['SESSION_GC_CRON'] ?? '0 4 * * *'; // daily at 4am
this.tasks.push(
cron.schedule(summarizationSchedule, () => {
this.summarization.runSummarization().catch((err) => {
this.logger.error(`Scheduled summarization failed: ${err}`);
});
}),
// M6-003: Summarization repeatable job
await this.queueService.addRepeatableJob(
QUEUE_SUMMARIZATION,
'summarization',
{},
summarizationSchedule,
);
const summarizationWorker = this.queueService.registerWorker(QUEUE_SUMMARIZATION, async () => {
await this.summarization.runSummarization();
});
this.registeredWorkers.push(summarizationWorker);
this.tasks.push(
cron.schedule(tierManagementSchedule, () => {
this.summarization.runTierManagement().catch((err) => {
this.logger.error(`Scheduled tier management failed: ${err}`);
});
}),
// M6-005: Tier management repeatable job
await this.queueService.addRepeatableJob(
QUEUE_TIER_MANAGEMENT,
'tier-management',
{},
tierManagementSchedule,
);
const tierWorker = this.queueService.registerWorker(QUEUE_TIER_MANAGEMENT, async () => {
await this.summarization.runTierManagement();
});
this.registeredWorkers.push(tierWorker);
this.tasks.push(
cron.schedule(gcSchedule, () => {
this.sessionGC.sweepOrphans().catch((err) => {
this.logger.error(`Session GC sweep failed: ${err}`);
});
}),
);
// M6-004: GC repeatable job
await this.queueService.addRepeatableJob(QUEUE_GC, 'session-gc', {}, gcSchedule);
const gcWorker = this.queueService.registerWorker(QUEUE_GC, async () => {
await this.sessionGC.sweepOrphans();
});
this.registeredWorkers.push(gcWorker);
this.logger.log(
`Cron scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`,
`BullMQ jobs scheduled: summarization="${summarizationSchedule}", tier="${tierManagementSchedule}", gc="${gcSchedule}"`,
);
}
onModuleDestroy(): void {
for (const task of this.tasks) {
task.stop();
}
this.tasks.length = 0;
this.logger.log('Cron tasks stopped');
async onModuleDestroy(): Promise<void> {
// Workers are closed by QueueService.onModuleDestroy — nothing extra needed here.
this.registeredWorkers.length = 0;
this.logger.log('CronService destroyed (workers managed by QueueService)');
}
}

View File

@@ -7,10 +7,11 @@ import { LogController } from './log.controller.js';
import { SummarizationService } from './summarization.service.js';
import { CronService } from './cron.service.js';
import { GCModule } from '../gc/gc.module.js';
import { QueueModule } from '../queue/queue.module.js';
@Global()
@Module({
imports: [GCModule],
imports: [GCModule, QueueModule],
providers: [
{
provide: LOG_SERVICE,

View File

@@ -0,0 +1,34 @@
export type JobStatus = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed';
export interface JobDto {
id: string;
name: string;
queue: string;
status: JobStatus;
attempts: number;
maxAttempts: number;
createdAt?: string;
processedAt?: string;
finishedAt?: string;
failedReason?: string;
data: Record<string, unknown>;
}
export interface JobListDto {
jobs: JobDto[];
total: number;
}
export interface QueueStatusDto {
name: string;
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: boolean;
}
export interface QueueListDto {
queues: QueueStatusDto[];
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { QueueService } from './queue.service.js';
@Global()
@Module({
providers: [QueueService],
exports: [QueueService],
})
export class QueueModule {}

View File

@@ -0,0 +1,386 @@
import {
Inject,
Injectable,
Logger,
Optional,
type OnModuleInit,
type OnModuleDestroy,
} from '@nestjs/common';
import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq';
import type { LogService } from '@mosaic/log';
import { LOG_SERVICE } from '../log/log.tokens.js';
import type { JobDto, JobStatus } from './queue-admin.dto.js';
// ---------------------------------------------------------------------------
// Typed job definitions
// ---------------------------------------------------------------------------
export interface SummarizationJobData {
triggeredBy?: string;
}
export interface GCJobData {
triggeredBy?: string;
}
export interface TierManagementJobData {
triggeredBy?: string;
}
export type MosaicJobData = SummarizationJobData | GCJobData | TierManagementJobData;
// ---------------------------------------------------------------------------
// Queue health status
// ---------------------------------------------------------------------------
export interface QueueHealthStatus {
queues: Record<
string,
{
waiting: number;
active: number;
failed: number;
completed: number;
paused: boolean;
}
>;
healthy: boolean;
}
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
export const QUEUE_SUMMARIZATION = 'mosaic:summarization';
export const QUEUE_GC = 'mosaic:gc';
export const QUEUE_TIER_MANAGEMENT = 'mosaic:tier-management';
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
function getConnection(): ConnectionOptions {
const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
// BullMQ ConnectionOptions accepts a URL string (ioredis-compatible)
return url as unknown as ConnectionOptions;
}
// ---------------------------------------------------------------------------
// Job handler type
// ---------------------------------------------------------------------------
export type JobHandler<T = MosaicJobData> = (job: Job<T>) => Promise<void>;
/** System session ID used for job-event log entries (no real user session). */
const SYSTEM_SESSION_ID = 'system';
// ---------------------------------------------------------------------------
// QueueService
// ---------------------------------------------------------------------------
@Injectable()
export class QueueService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(QueueService.name);
private readonly connection: ConnectionOptions;
private readonly queues = new Map<string, Queue<MosaicJobData>>();
private readonly workers = new Map<string, Worker<MosaicJobData>>();
constructor(
@Optional()
@Inject(LOG_SERVICE)
private readonly logService: LogService | null,
) {
this.connection = getConnection();
}
onModuleInit(): void {
this.logger.log('QueueService initialised (BullMQ)');
}
async onModuleDestroy(): Promise<void> {
await this.closeAll();
}
// -------------------------------------------------------------------------
// Queue helpers
// -------------------------------------------------------------------------
/**
* Get or create a BullMQ Queue for the given queue name.
*/
getQueue<T extends MosaicJobData = MosaicJobData>(name: string): Queue<T> {
let queue = this.queues.get(name) as Queue<T> | undefined;
if (!queue) {
queue = new Queue<T>(name, { connection: this.connection });
this.queues.set(name, queue as unknown as Queue<MosaicJobData>);
}
return queue;
}
/**
* Add a BullMQ repeatable job (cron-style).
* Uses `jobId` as a deterministic key so duplicate registrations are idempotent.
*/
async addRepeatableJob<T extends MosaicJobData>(
queueName: string,
jobName: string,
data: T,
cronExpression: string,
): Promise<void> {
const queue = this.getQueue<T>(queueName);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await (queue as Queue<any>).add(jobName, data, {
repeat: { pattern: cronExpression },
jobId: `${queueName}:${jobName}:repeatable`,
});
this.logger.log(
`Repeatable job "${jobName}" registered on "${queueName}" (cron: ${cronExpression})`,
);
}
/**
* Register a Worker for the given queue name with error handling and
* exponential backoff.
*/
registerWorker<T extends MosaicJobData>(queueName: string, handler: JobHandler<T>): Worker<T> {
const worker = new Worker<T>(
queueName,
async (job) => {
this.logger.debug(`Processing job "${job.name}" (id=${job.id}) on queue "${queueName}"`);
await this.logJobEvent(
queueName,
job.name,
job.id ?? 'unknown',
'started',
job.attemptsMade + 1,
);
await handler(job);
},
{
connection: this.connection,
// Exponential backoff: base 5s, factor 2, max 5 attempts
settings: {
backoffStrategy: (attemptsMade: number) => {
return Math.min(5000 * Math.pow(2, attemptsMade - 1), 60_000);
},
},
},
);
worker.on('completed', (job) => {
this.logger.log(`Job "${job.name}" (id=${job.id}) completed on queue "${queueName}"`);
this.logJobEvent(
queueName,
job.name,
job.id ?? 'unknown',
'completed',
job.attemptsMade,
).catch((err) => this.logger.warn(`Failed to write completed job log: ${String(err)}`));
});
worker.on('failed', (job, err) => {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error(
`Job "${job?.name ?? 'unknown'}" (id=${job?.id ?? 'unknown'}) failed on queue "${queueName}": ${errMsg}`,
);
this.logJobEvent(
queueName,
job?.name ?? 'unknown',
job?.id ?? 'unknown',
'failed',
job?.attemptsMade ?? 0,
errMsg,
).catch((e) => this.logger.warn(`Failed to write failed job log: ${String(e)}`));
});
this.workers.set(queueName, worker as unknown as Worker<MosaicJobData>);
return worker;
}
/**
* Return queue health statistics for all managed queues.
*/
async getHealthStatus(): Promise<QueueHealthStatus> {
const queues: QueueHealthStatus['queues'] = {};
let healthy = true;
for (const [name, queue] of this.queues) {
try {
const [waiting, active, failed, completed, paused] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getFailedCount(),
queue.getCompletedCount(),
queue.isPaused(),
]);
queues[name] = { waiting, active, failed, completed, paused };
} catch (err) {
this.logger.error(`Failed to fetch health for queue "${name}": ${err}`);
healthy = false;
queues[name] = { waiting: 0, active: 0, failed: 0, completed: 0, paused: false };
}
}
return { queues, healthy };
}
// -------------------------------------------------------------------------
// Admin API helpers (M6-006)
// -------------------------------------------------------------------------
/**
* List jobs across all managed queues, optionally filtered by status.
* BullMQ jobs are fetched by state type from each queue.
*/
async listJobs(status?: JobStatus): Promise<JobDto[]> {
const jobs: JobDto[] = [];
const states: JobStatus[] = status
? [status]
: ['active', 'completed', 'failed', 'waiting', 'delayed'];
for (const [queueName, queue] of this.queues) {
try {
for (const state of states) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const raw = await (queue as Queue<any>).getJobs([state as any]);
for (const j of raw) {
jobs.push(this.toJobDto(queueName, j, state));
}
}
} catch (err) {
this.logger.warn(`Failed to list jobs for queue "${queueName}": ${String(err)}`);
}
}
return jobs;
}
/**
* Retry a specific failed job by its BullMQ job ID (format: "queueName:id").
* The caller passes "<queueName>__<jobId>" as the composite ID because BullMQ
* job IDs are not globally unique — they are scoped to their queue.
*/
async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> {
const sep = compositeId.lastIndexOf('__');
if (sep === -1) {
return { ok: false, message: 'Invalid job id format. Expected "<queue>__<jobId>".' };
}
const queueName = compositeId.slice(0, sep);
const jobId = compositeId.slice(sep + 2);
const queue = this.queues.get(queueName);
if (!queue) {
return { ok: false, message: `Queue "${queueName}" not found.` };
}
const job = await queue.getJob(jobId);
if (!job) {
return { ok: false, message: `Job "${jobId}" not found in queue "${queueName}".` };
}
const state = await job.getState();
if (state !== 'failed') {
return { ok: false, message: `Job "${jobId}" is not in failed state (current: ${state}).` };
}
await job.retry('failed');
await this.logJobEvent(queueName, job.name, jobId, 'retried', (job.attemptsMade ?? 0) + 1);
return { ok: true, message: `Job "${jobId}" on queue "${queueName}" queued for retry.` };
}
/**
* Pause a queue by name.
*/
async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> {
const queue = this.queues.get(name);
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
await queue.pause();
this.logger.log(`Queue paused: ${name}`);
return { ok: true, message: `Queue "${name}" paused.` };
}
/**
* Resume a paused queue by name.
*/
async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> {
const queue = this.queues.get(name);
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
await queue.resume();
this.logger.log(`Queue resumed: ${name}`);
return { ok: true, message: `Queue "${name}" resumed.` };
}
private toJobDto(queueName: string, job: Job<MosaicJobData>, status: JobStatus): JobDto {
return {
id: `${queueName}__${job.id ?? 'unknown'}`,
name: job.name,
queue: queueName,
status,
attempts: job.attemptsMade,
maxAttempts: job.opts?.attempts ?? 1,
createdAt: job.timestamp ? new Date(job.timestamp).toISOString() : undefined,
processedAt: job.processedOn ? new Date(job.processedOn).toISOString() : undefined,
finishedAt: job.finishedOn ? new Date(job.finishedOn).toISOString() : undefined,
failedReason: job.failedReason,
data: (job.data as Record<string, unknown>) ?? {},
};
}
// -------------------------------------------------------------------------
// Job event logging (M6-007)
// -------------------------------------------------------------------------
/** Write a log entry to agent_logs for BullMQ job lifecycle events. */
private async logJobEvent(
queueName: string,
jobName: string,
jobId: string,
event: 'started' | 'completed' | 'retried' | 'failed',
attempts: number,
errorMessage?: string,
): Promise<void> {
if (!this.logService) return;
const level = event === 'failed' ? ('error' as const) : ('info' as const);
const content =
event === 'failed'
? `Job "${jobName}" (${jobId}) on queue "${queueName}" failed: ${errorMessage ?? 'unknown error'}`
: `Job "${jobName}" (${jobId}) on queue "${queueName}" ${event} (attempt ${attempts})`;
try {
await this.logService.logs.ingest({
sessionId: SYSTEM_SESSION_ID,
userId: 'system',
level,
category: 'general',
content,
metadata: {
jobId,
jobName,
queue: queueName,
event,
attempts,
...(errorMessage ? { errorMessage } : {}),
},
});
} catch (err) {
// Log errors must never crash job execution
this.logger.warn(`Failed to write job event log for job ${jobId}: ${String(err)}`);
}
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
private async closeAll(): Promise<void> {
const workerCloses = Array.from(this.workers.values()).map((w) =>
w.close().catch((err) => this.logger.error(`Worker close error: ${err}`)),
);
const queueCloses = Array.from(this.queues.values()).map((q) =>
q.close().catch((err) => this.logger.error(`Queue close error: ${err}`)),
);
await Promise.all([...workerCloses, ...queueCloses]);
this.workers.clear();
this.queues.clear();
this.logger.log('QueueService shut down');
}
}

View File

@@ -0,0 +1,2 @@
export const QUEUE_REDIS = 'QUEUE_REDIS';
export const QUEUE_SERVICE = 'QUEUE_SERVICE';