feat(queue): add MCP server tools for queue operations (MQ-006)

This commit is contained in:
2026-03-06 09:36:18 -06:00
parent 9b2db93afc
commit 869f9b49df
6 changed files with 1121 additions and 1 deletions

View File

@@ -14,7 +14,9 @@
"test": "vitest run"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.27.1",
"commander": "^14.0.3",
"ioredis": "^5.10.0"
"ioredis": "^5.10.0",
"zod": "^4.3.6"
}
}

View File

@@ -51,3 +51,23 @@ export type {
QueueRepository,
QueueRepositorySession,
} from './cli.js';
export {
QUEUE_MCP_TOOL_DEFINITIONS,
buildQueueMcpServer,
startQueueMcpServer,
} from './mcp-server.js';
export type {
QueueMcpDependencies,
QueueMcpRepository,
QueueMcpSession,
} from './mcp-server.js';
export {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';

View File

@@ -0,0 +1,308 @@
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import type { CallToolResult, Implementation } from '@modelcontextprotocol/sdk/types.js';
import type { z } from 'zod';
import {
assertRedisHealthy,
createRedisClient,
runRedisHealthCheck,
type RedisHealthCheck,
} from './redis-connection.js';
import {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';
import {
RedisTaskRepository,
type RedisTaskClient,
} from './task-repository.js';
import { TASK_STATUSES, type Task, type TaskStatus } from './task.js';
export type QueueMcpRepository = Pick<
RedisTaskRepository,
'list' | 'get' | 'claim' | 'heartbeat' | 'release' | 'complete' | 'fail'
>;
export interface QueueMcpSession {
readonly repository: QueueMcpRepository;
readonly checkHealth: () => Promise<RedisHealthCheck>;
readonly close: () => Promise<void>;
}
export interface QueueMcpDependencies {
readonly openSession: () => Promise<QueueMcpSession>;
readonly serverInfo: Implementation;
}
type ToolSchema = z.ZodObject<Record<string, z.ZodTypeAny>>;
interface QueueMcpToolDefinition<TArgs extends z.ZodTypeAny> {
readonly name: string;
readonly description: string;
readonly inputSchema: TArgs;
readonly execute: (
session: QueueMcpSession,
input: z.output<TArgs>,
) => Promise<unknown>;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
quit(): Promise<string>;
}
const DEFAULT_SERVER_INFO: Implementation = {
name: 'mosaic-queue',
version: '0.0.1',
};
const DEFAULT_DEPENDENCIES: QueueMcpDependencies = {
openSession: openRedisMcpSession,
serverInfo: DEFAULT_SERVER_INFO,
};
export const QUEUE_MCP_TOOL_DEFINITIONS = [
{
name: 'queue_list',
description: 'List queue tasks with optional project/mission/status filters',
inputSchema: queueListToolInputSchema,
execute: async (session, input) => {
const tasks = await session.repository.list(input);
return {
tasks,
};
},
},
{
name: 'queue_get',
description: 'Get a single queue task by taskId',
inputSchema: queueGetToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.get(input.taskId);
return {
task,
};
},
},
{
name: 'queue_claim',
description: 'Atomically claim a task for an agent',
inputSchema: queueClaimToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.claim(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_heartbeat',
description: 'Refresh claim ownership TTL for a task',
inputSchema: queueHeartbeatToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.heartbeat(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_release',
description: 'Release a claimed task back to pending',
inputSchema: queueReleaseToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.release(input.taskId, {
agentId: input.agentId,
});
return {
task,
};
},
},
{
name: 'queue_complete',
description: 'Mark a claimed task as completed',
inputSchema: queueCompleteToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.complete(input.taskId, {
agentId: input.agentId,
summary: input.summary,
});
return {
task,
};
},
},
{
name: 'queue_fail',
description: 'Mark a claimed task as failed with a reason',
inputSchema: queueFailToolInputSchema,
execute: async (session, input) => {
const task = await session.repository.fail(input.taskId, {
agentId: input.agentId,
reason: input.reason,
});
return {
task,
};
},
},
{
name: 'queue_status',
description: 'Return queue health and task status counters',
inputSchema: queueStatusToolInputSchema,
execute: async (session) => {
const tasks = await session.repository.list({});
const health = await session.checkHealth();
const counts = countStatuses(tasks);
return {
health,
counts,
total: tasks.length,
};
},
},
] as const satisfies readonly QueueMcpToolDefinition<ToolSchema>[];
export function buildQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): McpServer {
const dependencies = resolveDependencies(dependencyOverrides);
const server = new McpServer(dependencies.serverInfo);
for (const definition of QUEUE_MCP_TOOL_DEFINITIONS) {
server.registerTool(
definition.name,
{
description: definition.description,
inputSchema: definition.inputSchema,
},
async (args) => {
return withSession(dependencies, async (session) => {
try {
const parsedArgs = definition.inputSchema.parse(args);
const response = await definition.execute(session, parsedArgs);
return toToolResult(response);
} catch (error) {
return toToolErrorResult(error);
}
});
},
);
}
return server;
}
export async function startQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): Promise<McpServer> {
const server = buildQueueMcpServer(dependencyOverrides);
const transport = new StdioServerTransport();
await server.connect(transport);
return server;
}
function resolveDependencies(
overrides: Partial<QueueMcpDependencies>,
): QueueMcpDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const serverInfo = overrides.serverInfo ?? DEFAULT_DEPENDENCIES.serverInfo;
return {
openSession: () => openSession(),
serverInfo,
};
}
async function withSession(
dependencies: QueueMcpDependencies,
handler: (session: QueueMcpSession) => Promise<CallToolResult>,
): Promise<CallToolResult> {
const session = await dependencies.openSession();
try {
return await handler(session);
} finally {
await session.close();
}
}
async function openRedisMcpSession(): Promise<QueueMcpSession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
checkHealth: async () => runRedisHealthCheck(redisClient),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
function toToolResult(payload: unknown): CallToolResult {
return {
content: [
{
type: 'text',
text: JSON.stringify(payload, null, 2),
},
],
};
}
function toToolErrorResult(error: unknown): CallToolResult {
return {
isError: true,
content: [
{
type: 'text',
text: formatError(error),
},
],
};
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function countStatuses(tasks: readonly Task[]): Record<TaskStatus, number> {
const counts = Object.fromEntries(TASK_STATUSES.map((status) => [status, 0])) as Record<
TaskStatus,
number
>;
for (const task of tasks) {
counts[task.status] += 1;
}
return counts;
}

View File

@@ -0,0 +1,44 @@
import { z } from 'zod';
import { TASK_STATUSES } from './task.js';
export const queueListToolInputSchema = z.object({
project: z.string().min(1).optional(),
mission: z.string().min(1).optional(),
status: z.enum(TASK_STATUSES).optional(),
});
export const queueGetToolInputSchema = z.object({
taskId: z.string().min(1),
});
export const queueClaimToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1),
ttlSeconds: z.number().int().positive(),
});
export const queueHeartbeatToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
ttlSeconds: z.number().int().positive().optional(),
});
export const queueReleaseToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
});
export const queueCompleteToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
summary: z.string().min(1).optional(),
});
export const queueFailToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
reason: z.string().min(1),
});
export const queueStatusToolInputSchema = z.object({});

View File

@@ -0,0 +1,50 @@
import { describe, expect, it } from 'vitest';
import {
QUEUE_MCP_TOOL_DEFINITIONS,
buildQueueMcpServer,
} from '../src/mcp-server.js';
describe('queue MCP server', () => {
it('declares all required phase-1 tools', () => {
const toolNames = QUEUE_MCP_TOOL_DEFINITIONS.map((tool) => tool.name).sort();
expect(toolNames).toEqual([
'queue_claim',
'queue_complete',
'queue_fail',
'queue_get',
'queue_heartbeat',
'queue_list',
'queue_release',
'queue_status',
]);
});
it('builds an MCP server instance', () => {
const server = buildQueueMcpServer({
openSession: () =>
Promise.resolve({
repository: {
list: () => Promise.resolve([]),
get: () => Promise.resolve(null),
claim: () => Promise.reject(new Error('not implemented')),
heartbeat: () => Promise.reject(new Error('not implemented')),
release: () => Promise.reject(new Error('not implemented')),
complete: () => Promise.reject(new Error('not implemented')),
fail: () => Promise.reject(new Error('not implemented')),
},
checkHealth: () =>
Promise.resolve({
checkedAt: 1,
latencyMs: 0,
ok: true,
response: 'PONG',
}),
close: () => Promise.resolve(),
}),
});
expect(server).toBeDefined();
});
});

696
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff