5 Commits

Author SHA1 Message Date
af514893cd feat(registry): Gitea npm registry publish config + Changesets
- .npmrc: @mosaic scope points to Gitea registry
- Changesets initialized with initial minor release
- publishConfig added to all packages
- Woodpecker publish step on main branch
2026-03-06 18:39:48 -06:00
04d13e510c ci: add Woodpecker pipeline with Valkey service for queue tests 2026-03-06 18:36:02 -06:00
d7f200edd6 feat(wave2): @mosaic/openclaw-context plugin migrated to monorepo (#3)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 00:33:20 +00:00
2828a83b66 feat(wave1): @mosaic/types populated + @mosaic/queue migrated (#2)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-07 00:31:39 +00:00
8a2fb6c1ec feat(wave1): @mosaic/types populated + @mosaic/queue migrated to use it (#1)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-06 22:44:05 +00:00
43 changed files with 7551 additions and 21 deletions

8
.changeset/README.md Normal file
View File

@@ -0,0 +1,8 @@
# Changesets
Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works
with multi-package repos, or single-package repos to help you version and publish your code. You can
find the full documentation for it [in our repository](https://github.com/changesets/changesets).
We have a quick list of common questions to get you started engaging with this project in
[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md).

11
.changeset/config.json Normal file
View File

@@ -0,0 +1,11 @@
{
"$schema": "https://unpkg.com/@changesets/config@3.1.3/schema.json",
"changelog": "@changesets/cli/changelog",
"commit": false,
"fixed": [],
"linked": [],
"access": "restricted",
"baseBranch": "main",
"updateInternalDependencies": "patch",
"ignore": []
}

View File

@@ -0,0 +1,7 @@
---
"@mosaic/types": minor
"@mosaic/queue": minor
"@mosaic/openclaw-context": minor
---
Initial release of the @mosaic/* monorepo packages.

1
.npmrc
View File

@@ -1 +1,2 @@
@mosaic:registry=https://git.mosaicstack.dev/api/packages/mosaic/npm
//git.mosaicstack.dev/api/packages/mosaic/npm/:_authToken=${GITEA_NPM_TOKEN}

View File

@@ -5,26 +5,45 @@ steps:
- corepack enable
- pnpm install --frozen-lockfile
- name: lint
image: node:22-alpine
commands:
- pnpm turbo lint
- name: typecheck
image: node:22-alpine
depends_on: [install]
commands:
- pnpm turbo typecheck
- name: lint
image: node:22-alpine
depends_on: [install]
commands:
- pnpm turbo lint
- name: build
image: node:22-alpine
depends_on: [typecheck]
commands:
- pnpm turbo build
- name: test
image: node:22-alpine
depends_on: [build]
commands:
- pnpm turbo test
services:
- name: valkey
image: valkey/valkey:8-alpine
ports: ["6379:6379"]
environment:
- ALLOW_EMPTY_PASSWORD=yes
- name: publish
image: node:22-alpine
depends_on: [test]
when:
branch: main
event: push
environment:
GITEA_NPM_TOKEN:
from_secret: gitea_npm_token
commands:
- corepack enable
- pnpm changeset version || true
- pnpm changeset publish

14
packages/queue/README.md Normal file
View File

@@ -0,0 +1,14 @@
# @mosaic/queue
Valkey-backed task queue package for Mosaic monorepo migration.
## Exports
- Queue repository logic and Redis connection helpers
- CLI runner (`mosaic-queue`)
- MCP server runner (`mosaic-queue-mcp`)
- Runtime task constants (`TASK_STATUSES`, `TASK_PRIORITIES`, `TASK_LANES`)
## Note
Task type definitions are expected to come from `@mosaic/types`.

View File

@@ -0,0 +1,11 @@
#!/usr/bin/env node
import { startQueueMcpServer } from '../src/mcp-server.js';
try {
await startQueueMcpServer();
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(message);
process.exitCode = 1;
}

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env node
import { runQueueCli } from '../src/cli.js';
const exitCode = await runQueueCli(process.argv);
process.exitCode = exitCode;

View File

@@ -0,0 +1,46 @@
{
"name": "@mosaic/queue",
"version": "0.1.0",
"description": "Valkey-backed task queue exposed via CLI and MCP",
"license": "MIT",
"type": "module",
"main": "dist/src/index.js",
"types": "dist/src/index.d.ts",
"exports": {
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
}
},
"bin": {
"mosaic-queue": "dist/bin/mosaic-queue.js",
"mosaic-queue-mcp": "dist/bin/mosaic-queue-mcp.js"
},
"files": [
"dist"
],
"scripts": {
"lint": "eslint \"src/**/*.ts\" \"tests/**/*.ts\" \"bin/**/*.ts\" \"vitest.config.ts\"",
"build": "tsc -p tsconfig.build.json",
"typecheck": "tsc -p tsconfig.json --noEmit",
"test": "vitest run",
"prepublishOnly": "pnpm lint && pnpm test && pnpm build"
},
"engines": {
"node": ">=20.0.0"
},
"publishConfig": {
"registry": "https://git.mosaicstack.dev/api/packages/mosaic/npm",
"access": "public"
},
"dependencies": {
"@mosaic/types": "workspace:*",
"@modelcontextprotocol/sdk": "^1.27.1",
"commander": "^14.0.3",
"ioredis": "^5.10.0",
"zod": "^4.3.6"
},
"devDependencies": {
"vitest": "^3"
}
}

344
packages/queue/src/cli.ts Normal file
View File

@@ -0,0 +1,344 @@
import {
Command,
CommanderError,
InvalidArgumentError,
Option,
} from 'commander';
import { assertRedisHealthy, createRedisClient } from './redis-connection.js';
import {
RedisTaskRepository,
type ClaimTaskInput,
type CompleteTaskInput,
type RedisTaskClient,
} from './task-repository.js';
import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
import type {
CreateTaskInput,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
} from '@mosaic/types';
export type QueueRepository = Pick<
RedisTaskRepository,
'create' | 'list' | 'get' | 'claim' | 'release' | 'complete'
>;
export interface QueueRepositorySession {
readonly repository: QueueRepository;
readonly close: () => Promise<void>;
}
export interface QueueCliDependencies {
readonly openSession: () => Promise<QueueRepositorySession>;
readonly stdout: (line: string) => void;
readonly stderr: (line: string) => void;
}
interface CreateCommandOptions {
readonly title: string;
readonly description?: string;
readonly priority?: TaskPriority;
readonly lane?: TaskLane;
readonly dependency?: string[];
}
interface ListCommandOptions {
readonly project?: string;
readonly mission?: string;
readonly status?: TaskStatus;
}
interface ClaimCommandOptions {
readonly agent: string;
readonly ttl: number;
}
interface ReleaseCommandOptions {
readonly agent?: string;
}
interface CompleteCommandOptions {
readonly agent?: string;
readonly summary?: string;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
ping(): Promise<string>;
quit(): Promise<string>;
}
const DEFAULT_DEPENDENCIES: QueueCliDependencies = {
openSession: openRedisSession,
stdout: (line: string) => {
console.log(line);
},
stderr: (line: string) => {
console.error(line);
},
};
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
const LANE_SET = new Set<TaskLane>(TASK_LANES);
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
export function buildQueueCli(
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Command {
const dependencies = resolveDependencies(dependencyOverrides);
const program = new Command();
program
.name('mosaic')
.description('mosaic queue command line interface')
.exitOverride();
program.configureOutput({
writeOut: (output: string) => dependencies.stdout(output.trimEnd()),
writeErr: (output: string) => dependencies.stderr(output.trimEnd()),
});
const queue = program.command('queue').description('Manage queue tasks');
queue
.command('create <project> <mission> <taskId>')
.description('Create a queue task')
.requiredOption('--title <title>', 'Task title')
.option('--description <description>', 'Task description')
.addOption(
new Option('--priority <priority>', 'Task priority')
.choices(TASK_PRIORITIES)
.argParser(parsePriority),
)
.addOption(
new Option('--lane <lane>', 'Task lane').choices(TASK_LANES).argParser(parseLane),
)
.option('--dependency <taskIds...>', 'Task dependencies')
.action(
async (
project: string,
mission: string,
taskId: string,
options: CreateCommandOptions,
) => {
await withSession(dependencies, async (repository) => {
const payload: CreateTaskInput = {
project,
mission,
taskId,
title: options.title,
description: options.description,
priority: options.priority,
dependencies: options.dependency,
lane: options.lane,
};
const task = await repository.create(payload);
dependencies.stdout(JSON.stringify(task, null, 2));
});
},
);
queue
.command('list')
.description('List queue tasks')
.option('--project <project>', 'Filter by project')
.option('--mission <mission>', 'Filter by mission')
.addOption(
new Option('--status <status>', 'Filter by status')
.choices(TASK_STATUSES)
.argParser(parseStatus),
)
.action(async (options: ListCommandOptions) => {
await withSession(dependencies, async (repository) => {
const filters: TaskListFilters = {
project: options.project,
mission: options.mission,
status: options.status,
};
const tasks = await repository.list(filters);
dependencies.stdout(JSON.stringify(tasks, null, 2));
});
});
queue
.command('show <taskId>')
.description('Show a single queue task')
.action(async (taskId: string) => {
await withSession(dependencies, async (repository) => {
const task = await repository.get(taskId);
if (task === null) {
throw new Error(`Task ${taskId} was not found.`);
}
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('claim <taskId>')
.description('Claim a pending task')
.requiredOption('--agent <agentId>', 'Agent identifier')
.requiredOption('--ttl <seconds>', 'Claim TTL in seconds', parsePositiveInteger)
.action(async (taskId: string, options: ClaimCommandOptions) => {
await withSession(dependencies, async (repository) => {
const claimInput: ClaimTaskInput = {
agentId: options.agent,
ttlSeconds: options.ttl,
};
const task = await repository.claim(taskId, claimInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('release <taskId>')
.description('Release a claimed task back to pending')
.option('--agent <agentId>', 'Expected owner agent id')
.action(async (taskId: string, options: ReleaseCommandOptions) => {
await withSession(dependencies, async (repository) => {
const task = await repository.release(taskId, {
agentId: options.agent,
});
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('complete <taskId>')
.description('Complete a claimed task')
.option('--agent <agentId>', 'Expected owner agent id')
.option('--summary <summary>', 'Optional completion summary')
.action(async (taskId: string, options: CompleteCommandOptions) => {
await withSession(dependencies, async (repository) => {
const completeInput: CompleteTaskInput = {
agentId: options.agent,
summary: options.summary,
};
const task = await repository.complete(taskId, completeInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
return program;
}
export async function runQueueCli(
argv: string[] = process.argv,
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Promise<number> {
const dependencies = resolveDependencies(dependencyOverrides);
const program = buildQueueCli(dependencies);
try {
await program.parseAsync(argv, {
from: 'node',
});
return 0;
} catch (error) {
if (error instanceof CommanderError) {
if (error.code === 'commander.helpDisplayed') {
return 0;
}
if (error.code.startsWith('commander.')) {
return error.exitCode;
}
}
dependencies.stderr(formatError(error));
return 1;
}
}
async function openRedisSession(): Promise<QueueRepositorySession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
async function withSession(
dependencies: QueueCliDependencies,
action: (repository: QueueRepository) => Promise<void>,
): Promise<void> {
const session = await dependencies.openSession();
try {
await action(session.repository);
} finally {
await session.close();
}
}
function resolveDependencies(
overrides: Partial<QueueCliDependencies>,
): QueueCliDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const stdout = overrides.stdout ?? DEFAULT_DEPENDENCIES.stdout;
const stderr = overrides.stderr ?? DEFAULT_DEPENDENCIES.stderr;
return {
openSession: () => openSession(),
stdout: (line: string) => stdout(line),
stderr: (line: string) => stderr(line),
};
}
function parsePositiveInteger(value: string): number {
const parsed = Number.parseInt(value, 10);
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new InvalidArgumentError(`Expected a positive integer, received "${value}"`);
}
return parsed;
}
function parsePriority(value: string): TaskPriority {
if (!PRIORITY_SET.has(value as TaskPriority)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_PRIORITIES.join(', ')}, received "${value}"`,
);
}
return value as TaskPriority;
}
function parseLane(value: string): TaskLane {
if (!LANE_SET.has(value as TaskLane)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_LANES.join(', ')}, received "${value}"`,
);
}
return value as TaskLane;
}
function parseStatus(value: string): TaskStatus {
if (!STATUS_SET.has(value as TaskStatus)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_STATUSES.join(', ')}, received "${value}"`,
);
}
return value as TaskStatus;
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -0,0 +1,69 @@
export const packageVersion = '0.1.0';
export {
assertRedisHealthy,
createRedisClient,
resolveRedisUrl,
runRedisHealthCheck,
} from './redis-connection.js';
export type {
CreateRedisClientOptions,
RedisClientConstructor,
RedisHealthCheck,
RedisPingClient,
} from './redis-connection.js';
export {
RedisTaskRepository,
TaskAlreadyExistsError,
TaskAtomicConflictError,
TaskNotFoundError,
TaskOwnershipError,
TaskSerializationError,
TaskTransitionError,
} from './task-repository.js';
export type {
ClaimTaskInput,
CompleteTaskInput,
FailTaskInput,
HeartbeatTaskInput,
RedisTaskClient,
RedisTaskRepositoryOptions,
RedisTaskTransaction,
ReleaseTaskInput,
} from './task-repository.js';
export { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
export type {
CreateTaskInput,
Task,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
TaskUpdateInput,
} from '@mosaic/types';
export { buildQueueCli, runQueueCli } from './cli.js';
export type {
QueueCliDependencies,
QueueRepository,
QueueRepositorySession,
} from './cli.js';
export {
QUEUE_MCP_TOOL_DEFINITIONS,
buildQueueMcpServer,
startQueueMcpServer,
} from './mcp-server.js';
export type {
QueueMcpDependencies,
QueueMcpRepository,
QueueMcpSession,
} from './mcp-server.js';
export {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';

View File

@@ -0,0 +1,310 @@
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import type { CallToolResult, Implementation } from '@modelcontextprotocol/sdk/types.js';
import type { z } from 'zod';
import {
assertRedisHealthy,
createRedisClient,
runRedisHealthCheck,
type RedisHealthCheck,
} from './redis-connection.js';
import {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';
import {
RedisTaskRepository,
type RedisTaskClient,
} from './task-repository.js';
import { TASK_STATUSES } from './task.js';
import type { Task, TaskStatus } from '@mosaic/types';
export type QueueMcpRepository = Pick<
RedisTaskRepository,
'list' | 'get' | 'claim' | 'heartbeat' | 'release' | 'complete' | 'fail'
>;
export interface QueueMcpSession {
readonly repository: QueueMcpRepository;
readonly checkHealth: () => Promise<RedisHealthCheck>;
readonly close: () => Promise<void>;
}
export interface QueueMcpDependencies {
readonly openSession: () => Promise<QueueMcpSession>;
readonly serverInfo: Implementation;
}
type ToolSchema = z.ZodTypeAny;
interface QueueMcpToolDefinition<TArgs extends z.ZodTypeAny> {
readonly name: string;
readonly description: string;
readonly inputSchema: TArgs;
readonly execute: (
session: QueueMcpSession,
input: z.output<TArgs>,
) => Promise<unknown>;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
ping(): Promise<string>;
quit(): Promise<string>;
}
const DEFAULT_SERVER_INFO: Implementation = {
name: 'mosaic-queue',
version: '0.1.0',
};
const DEFAULT_DEPENDENCIES: QueueMcpDependencies = {
openSession: openRedisMcpSession,
serverInfo: DEFAULT_SERVER_INFO,
};
export const QUEUE_MCP_TOOL_DEFINITIONS = [
{
name: 'queue_list',
description: 'List queue tasks with optional project/mission/status filters',
inputSchema: queueListToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueListToolInputSchema>) => {
const tasks = await session.repository.list(input);
return {
tasks,
};
},
},
{
name: 'queue_get',
description: 'Get a single queue task by taskId',
inputSchema: queueGetToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueGetToolInputSchema>) => {
const task = await session.repository.get(input.taskId);
return {
task,
};
},
},
{
name: 'queue_claim',
description: 'Atomically claim a task for an agent',
inputSchema: queueClaimToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueClaimToolInputSchema>) => {
const task = await session.repository.claim(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_heartbeat',
description: 'Refresh claim ownership TTL for a task',
inputSchema: queueHeartbeatToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueHeartbeatToolInputSchema>) => {
const task = await session.repository.heartbeat(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_release',
description: 'Release a claimed task back to pending',
inputSchema: queueReleaseToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueReleaseToolInputSchema>) => {
const task = await session.repository.release(input.taskId, {
agentId: input.agentId,
});
return {
task,
};
},
},
{
name: 'queue_complete',
description: 'Mark a claimed task as completed',
inputSchema: queueCompleteToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueCompleteToolInputSchema>) => {
const task = await session.repository.complete(input.taskId, {
agentId: input.agentId,
summary: input.summary,
});
return {
task,
};
},
},
{
name: 'queue_fail',
description: 'Mark a claimed task as failed with a reason',
inputSchema: queueFailToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueFailToolInputSchema>) => {
const task = await session.repository.fail(input.taskId, {
agentId: input.agentId,
reason: input.reason,
});
return {
task,
};
},
},
{
name: 'queue_status',
description: 'Return queue health and task status counters',
inputSchema: queueStatusToolInputSchema,
execute: async (session: QueueMcpSession) => {
const tasks = await session.repository.list({});
const health = await session.checkHealth();
const counts = countStatuses(tasks);
return {
health,
counts,
total: tasks.length,
};
},
},
] as const;
export function buildQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): McpServer {
const dependencies = resolveDependencies(dependencyOverrides);
const server = new McpServer(dependencies.serverInfo);
for (const definition of QUEUE_MCP_TOOL_DEFINITIONS) {
server.registerTool(
definition.name,
{
description: definition.description,
inputSchema: definition.inputSchema,
},
async (args: unknown) => {
return withSession(dependencies, async (session) => {
try {
const parsedArgs = definition.inputSchema.parse(args);
const response = await definition.execute(session, parsedArgs as never);
return toToolResult(response);
} catch (error) {
return toToolErrorResult(error);
}
});
},
);
}
return server;
}
export async function startQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): Promise<McpServer> {
const server = buildQueueMcpServer(dependencyOverrides);
const transport = new StdioServerTransport();
await server.connect(transport);
return server;
}
function resolveDependencies(
overrides: Partial<QueueMcpDependencies>,
): QueueMcpDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const serverInfo = overrides.serverInfo ?? DEFAULT_DEPENDENCIES.serverInfo;
return {
openSession: () => openSession(),
serverInfo,
};
}
async function withSession(
dependencies: QueueMcpDependencies,
handler: (session: QueueMcpSession) => Promise<CallToolResult>,
): Promise<CallToolResult> {
const session = await dependencies.openSession();
try {
return await handler(session);
} finally {
await session.close();
}
}
async function openRedisMcpSession(): Promise<QueueMcpSession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
checkHealth: async () => runRedisHealthCheck(redisClient),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
function toToolResult(payload: unknown): CallToolResult {
return {
content: [
{
type: 'text',
text: JSON.stringify(payload, null, 2),
},
],
};
}
function toToolErrorResult(error: unknown): CallToolResult {
return {
isError: true,
content: [
{
type: 'text',
text: formatError(error),
},
],
};
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function countStatuses(tasks: readonly Task[]): Record<TaskStatus, number> {
const counts = Object.fromEntries(TASK_STATUSES.map((status) => [status, 0])) as Record<
TaskStatus,
number
>;
for (const task of tasks) {
counts[task.status] += 1;
}
return counts;
}

View File

@@ -0,0 +1,44 @@
import { z } from 'zod';
import { TASK_STATUSES } from './task.js';
export const queueListToolInputSchema = z.object({
project: z.string().min(1).optional(),
mission: z.string().min(1).optional(),
status: z.enum(TASK_STATUSES).optional(),
});
export const queueGetToolInputSchema = z.object({
taskId: z.string().min(1),
});
export const queueClaimToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1),
ttlSeconds: z.number().int().positive(),
});
export const queueHeartbeatToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
ttlSeconds: z.number().int().positive().optional(),
});
export const queueReleaseToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
});
export const queueCompleteToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
summary: z.string().min(1).optional(),
});
export const queueFailToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
reason: z.string().min(1),
});
export const queueStatusToolInputSchema = z.object({});

View File

@@ -0,0 +1,95 @@
import Redis, { type Redis as RedisClient, type RedisOptions } from 'ioredis';
const ERR_MISSING_REDIS_URL =
'Missing required Valkey/Redis connection URL. Set VALKEY_URL or REDIS_URL.';
export interface RedisHealthCheck {
readonly checkedAt: number;
readonly latencyMs: number;
readonly ok: boolean;
readonly response?: string;
readonly error?: string;
}
export interface RedisPingClient {
ping(): Promise<string>;
}
export type RedisClientConstructor<TClient> = new (
url: string,
options?: RedisOptions,
) => TClient;
export interface CreateRedisClientOptions<TClient> {
readonly env?: NodeJS.ProcessEnv;
readonly redisConstructor?: RedisClientConstructor<TClient>;
readonly redisOptions?: RedisOptions;
}
export function resolveRedisUrl(env: NodeJS.ProcessEnv = process.env): string {
const resolvedUrl = env.VALKEY_URL ?? env.REDIS_URL;
if (typeof resolvedUrl !== 'string' || resolvedUrl.trim().length === 0) {
throw new Error(ERR_MISSING_REDIS_URL);
}
return resolvedUrl;
}
export function createRedisClient<TClient = RedisClient>(
options: CreateRedisClientOptions<TClient> = {},
): TClient {
const redisUrl = resolveRedisUrl(options.env);
const RedisCtor =
options.redisConstructor ??
(Redis as unknown as RedisClientConstructor<TClient>);
return new RedisCtor(redisUrl, {
maxRetriesPerRequest: null,
...options.redisOptions,
});
}
export async function runRedisHealthCheck(
client: RedisPingClient,
): Promise<RedisHealthCheck> {
const startedAt = process.hrtime.bigint();
try {
const response = await client.ping();
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
return {
checkedAt: Date.now(),
latencyMs: elapsedMs,
ok: true,
response,
};
} catch (error) {
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
const message =
error instanceof Error ? error.message : 'Unknown redis health check error';
return {
checkedAt: Date.now(),
latencyMs: elapsedMs,
ok: false,
error: message,
};
}
}
export async function assertRedisHealthy(
client: RedisPingClient,
): Promise<RedisHealthCheck> {
const health = await runRedisHealthCheck(client);
if (!health.ok) {
throw new Error(
`Redis health check failed after ${health.latencyMs}ms: ${health.error ?? 'unknown error'}`,
);
}
return health;
}

View File

@@ -0,0 +1,638 @@
import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
import type {
CreateTaskInput,
Task,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
TaskUpdateInput,
} from '@mosaic/types';
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
const LANE_SET = new Set<TaskLane>(TASK_LANES);
const DEFAULT_KEY_PREFIX = 'mosaic:queue';
const MAX_ATOMIC_RETRIES = 8;
const UPDATE_ALLOWED_STATUS_TRANSITIONS: Readonly<Record<TaskStatus, readonly TaskStatus[]>> = {
pending: ['blocked'],
blocked: ['pending'],
claimed: ['in-progress'],
'in-progress': ['claimed'],
completed: [],
failed: [],
};
interface RepositoryKeys {
readonly taskIds: string;
task(taskId: string): string;
}
export interface RedisTaskClient {
get(key: string): Promise<string | null>;
mget(...keys: string[]): Promise<(string | null)[]>;
set(key: string, value: string, mode?: 'NX' | 'XX'): Promise<'OK' | null>;
smembers(key: string): Promise<string[]>;
sadd(key: string, member: string): Promise<number>;
watch(...keys: string[]): Promise<'OK'>;
unwatch(): Promise<'OK'>;
multi(): RedisTaskTransaction;
}
export interface RedisTaskTransaction {
set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction;
sadd(key: string, member: string): RedisTaskTransaction;
exec(): Promise<readonly (readonly [Error | null, unknown])[] | null>;
}
export interface RedisTaskRepositoryOptions {
readonly client: RedisTaskClient;
readonly keyPrefix?: string;
readonly now?: () => number;
}
export interface ClaimTaskInput {
readonly agentId: string;
readonly ttlSeconds: number;
}
export interface ReleaseTaskInput {
readonly agentId?: string;
}
export interface HeartbeatTaskInput {
readonly agentId?: string;
readonly ttlSeconds?: number;
}
export interface CompleteTaskInput {
readonly agentId?: string;
readonly summary?: string;
}
export interface FailTaskInput {
readonly agentId?: string;
readonly reason: string;
}
export class TaskAlreadyExistsError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} already exists.`);
this.name = 'TaskAlreadyExistsError';
}
}
export class TaskNotFoundError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} was not found.`);
this.name = 'TaskNotFoundError';
}
}
export class TaskSerializationError extends Error {
public constructor(taskId: string, message: string) {
super(`Unable to deserialize task ${taskId}: ${message}`);
this.name = 'TaskSerializationError';
}
}
export class TaskTransitionError extends Error {
public constructor(taskId: string, status: TaskStatus, action: string) {
super(`Task ${taskId} cannot transition from ${status} via ${action}.`);
this.name = 'TaskTransitionError';
}
}
export class TaskOwnershipError extends Error {
public constructor(taskId: string, expectedAgentId: string, actualAgentId: string) {
super(
`Task ${taskId} is owned by ${actualAgentId}, not ${expectedAgentId}.`,
);
this.name = 'TaskOwnershipError';
}
}
export class TaskAtomicConflictError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} could not be updated atomically after multiple retries.`);
this.name = 'TaskAtomicConflictError';
}
}
export class RedisTaskRepository {
private readonly client: RedisTaskClient;
private readonly keys: RepositoryKeys;
private readonly now: () => number;
public constructor(options: RedisTaskRepositoryOptions) {
this.client = options.client;
this.keys = buildRepositoryKeys(options.keyPrefix ?? DEFAULT_KEY_PREFIX);
this.now = options.now ?? Date.now;
}
public async create(input: CreateTaskInput): Promise<Task> {
const timestamp = this.now();
const task: Task = {
id: input.taskId,
project: input.project,
mission: input.mission,
taskId: input.taskId,
title: input.title,
description: input.description,
status: 'pending',
priority: input.priority ?? 'medium',
dependencies: [...(input.dependencies ?? [])],
lane: input.lane ?? 'any',
retryCount: 0,
metadata: input.metadata,
createdAt: timestamp,
updatedAt: timestamp,
};
const taskKey = this.keys.task(task.taskId);
const serializedTask = JSON.stringify(task);
for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) {
await this.client.watch(taskKey);
try {
const transaction = this.client.multi();
transaction.set(taskKey, serializedTask, 'NX');
transaction.sadd(this.keys.taskIds, task.taskId);
const execResult = await transaction.exec();
if (execResult === null) {
continue;
}
const setResult = execResult[0];
if (setResult === undefined) {
throw new TaskAtomicConflictError(task.taskId);
}
const [setError, setReply] = setResult;
if (setError !== null) {
throw setError;
}
if (setReply !== 'OK') {
throw new TaskAlreadyExistsError(task.taskId);
}
const saddResult = execResult[1];
if (saddResult !== undefined && saddResult[0] !== null) {
throw saddResult[0];
}
return task;
} finally {
await this.client.unwatch();
}
}
throw new TaskAlreadyExistsError(task.taskId);
}
public async get(taskId: string): Promise<Task | null> {
const raw = await this.client.get(this.keys.task(taskId));
if (raw === null) {
return null;
}
return deserializeTask(taskId, raw);
}
public async list(filters: TaskListFilters = {}): Promise<Task[]> {
const taskIds = await this.client.smembers(this.keys.taskIds);
if (taskIds.length === 0) {
return [];
}
const taskKeys = taskIds.map((taskId) => this.keys.task(taskId));
const records = await this.client.mget(...taskKeys);
const tasks: Task[] = [];
for (const [index, rawTask] of records.entries()) {
if (rawTask === null || rawTask === undefined) {
continue;
}
const taskId = taskIds[index];
if (taskId === undefined) {
continue;
}
tasks.push(deserializeTask(taskId, rawTask));
}
return tasks
.filter((task) =>
matchesFilters(task, {
project: filters.project,
mission: filters.mission,
status: filters.status,
}),
)
.sort((left, right) => left.createdAt - right.createdAt);
}
public async update(taskId: string, patch: TaskUpdateInput): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
assertUpdatePatchIsAllowed(taskId, existing, patch);
return {
...existing,
...patch,
dependencies:
patch.dependencies === undefined ? existing.dependencies : [...patch.dependencies],
updatedAt: now,
};
});
}
public async claim(taskId: string, input: ClaimTaskInput): Promise<Task> {
if (input.ttlSeconds <= 0) {
throw new Error(`Task ${taskId} claim ttl must be greater than 0 seconds.`);
}
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!canClaimTask(existing, now)) {
throw new TaskTransitionError(taskId, existing.status, 'claim');
}
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'claimed',
claimedBy: input.agentId,
claimedAt: now,
claimTTL: input.ttlSeconds,
updatedAt: now,
};
});
}
public async release(taskId: string, input: ReleaseTaskInput = {}): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'release');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutClaimFields(existing);
return {
...base,
status: 'pending',
updatedAt: now,
};
});
}
public async heartbeat(
taskId: string,
input: HeartbeatTaskInput = {},
): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
if (isClaimExpired(existing, now)) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
assertTaskOwnership(taskId, existing, input.agentId);
const ttl = input.ttlSeconds ?? existing.claimTTL;
if (ttl === undefined || ttl <= 0) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
return {
...existing,
claimedAt: now,
claimTTL: ttl,
updatedAt: now,
};
});
}
public async complete(
taskId: string,
input: CompleteTaskInput = {},
): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'complete');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'completed',
completedAt: now,
...(input.summary === undefined ? {} : { completionSummary: input.summary }),
updatedAt: now,
};
});
}
public async fail(taskId: string, input: FailTaskInput): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'fail');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'failed',
failedAt: now,
failureReason: input.reason,
retryCount: existing.retryCount + 1,
updatedAt: now,
};
});
}
private async mutateTaskAtomically(
taskId: string,
mutation: (existing: Task, now: number) => Task,
): Promise<Task> {
const taskKey = this.keys.task(taskId);
for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) {
await this.client.watch(taskKey);
try {
const raw = await this.client.get(taskKey);
if (raw === null) {
throw new TaskNotFoundError(taskId);
}
const existing = deserializeTask(taskId, raw);
const updated = mutation(existing, this.now());
const transaction = this.client.multi();
transaction.set(taskKey, JSON.stringify(updated), 'XX');
transaction.sadd(this.keys.taskIds, taskId);
const execResult = await transaction.exec();
if (execResult === null) {
continue;
}
const setResult = execResult[0];
if (setResult === undefined) {
throw new TaskAtomicConflictError(taskId);
}
const [setError, setReply] = setResult;
if (setError !== null) {
throw setError;
}
if (setReply !== 'OK') {
throw new TaskNotFoundError(taskId);
}
const saddResult = execResult[1];
if (saddResult !== undefined && saddResult[0] !== null) {
throw saddResult[0];
}
return updated;
} finally {
await this.client.unwatch();
}
}
throw new TaskAtomicConflictError(taskId);
}
}
function matchesFilters(task: Task, filters: TaskListFilters): boolean {
if (filters.project !== undefined && task.project !== filters.project) {
return false;
}
if (filters.mission !== undefined && task.mission !== filters.mission) {
return false;
}
if (filters.status !== undefined && task.status !== filters.status) {
return false;
}
return true;
}
function assertUpdatePatchIsAllowed(taskId: string, task: Task, patch: TaskUpdateInput): void {
if (patch.status !== undefined && !canTransitionStatusViaUpdate(task.status, patch.status)) {
throw new TaskTransitionError(taskId, task.status, 'update');
}
if (
patch.claimedBy !== undefined ||
patch.claimedAt !== undefined ||
patch.claimTTL !== undefined ||
patch.completedAt !== undefined ||
patch.failedAt !== undefined ||
patch.failureReason !== undefined ||
patch.completionSummary !== undefined ||
patch.retryCount !== undefined
) {
throw new TaskTransitionError(taskId, task.status, 'update');
}
}
function canTransitionStatusViaUpdate(from: TaskStatus, to: TaskStatus): boolean {
if (from === to) {
return true;
}
return UPDATE_ALLOWED_STATUS_TRANSITIONS[from].includes(to);
}
function canClaimTask(task: Task, now: number): boolean {
if (task.status === 'pending') {
return true;
}
if (!isClaimedLikeStatus(task.status)) {
return false;
}
return isClaimExpired(task, now);
}
function isClaimedLikeStatus(status: TaskStatus): boolean {
return status === 'claimed' || status === 'in-progress';
}
function isClaimExpired(task: Task, now: number): boolean {
if (task.claimedAt === undefined || task.claimTTL === undefined) {
return false;
}
return task.claimedAt + task.claimTTL * 1000 <= now;
}
function assertTaskOwnership(
taskId: string,
task: Task,
expectedAgentId: string | undefined,
): void {
if (expectedAgentId === undefined || task.claimedBy === undefined) {
return;
}
if (task.claimedBy !== expectedAgentId) {
throw new TaskOwnershipError(taskId, expectedAgentId, task.claimedBy);
}
}
type TaskWithoutClaimFields = Omit<Task, 'claimedBy' | 'claimedAt' | 'claimTTL'>;
type TaskWithoutCompletionAndFailureFields = Omit<
Task,
'completedAt' | 'failedAt' | 'failureReason' | 'completionSummary'
>;
function withoutClaimFields(task: Task): TaskWithoutClaimFields {
const {
claimedBy: _claimedBy,
claimedAt: _claimedAt,
claimTTL: _claimTTL,
...taskWithoutClaimFields
} = task;
return taskWithoutClaimFields;
}
function withoutCompletionAndFailureFields(
task: TaskWithoutClaimFields,
): TaskWithoutCompletionAndFailureFields {
const {
completedAt: _completedAt,
failedAt: _failedAt,
failureReason: _failureReason,
completionSummary: _completionSummary,
...taskWithoutCompletionAndFailureFields
} = task;
return taskWithoutCompletionAndFailureFields;
}
function deserializeTask(taskId: string, raw: string): Task {
let parsed: unknown;
try {
parsed = JSON.parse(raw);
} catch (error) {
throw new TaskSerializationError(
taskId,
error instanceof Error ? error.message : 'invalid JSON',
);
}
if (!isRecord(parsed)) {
throw new TaskSerializationError(taskId, 'task payload is not an object');
}
const requiredStringKeys = ['id', 'project', 'mission', 'taskId', 'title'] as const;
const requiredNumberKeys = ['retryCount', 'createdAt', 'updatedAt'] as const;
for (const key of requiredStringKeys) {
if (typeof parsed[key] !== 'string') {
throw new TaskSerializationError(taskId, `missing string field "${key}"`);
}
}
for (const key of requiredNumberKeys) {
if (typeof parsed[key] !== 'number') {
throw new TaskSerializationError(taskId, `missing numeric field "${key}"`);
}
}
if (!STATUS_SET.has(parsed.status as TaskStatus)) {
throw new TaskSerializationError(taskId, 'invalid status value');
}
if (!PRIORITY_SET.has(parsed.priority as TaskPriority)) {
throw new TaskSerializationError(taskId, 'invalid priority value');
}
if (!LANE_SET.has(parsed.lane as TaskLane)) {
throw new TaskSerializationError(taskId, 'invalid lane value');
}
if (!Array.isArray(parsed.dependencies)) {
throw new TaskSerializationError(taskId, 'dependencies must be an array');
}
if (!parsed.dependencies.every((dependency): dependency is string => typeof dependency === 'string')) {
throw new TaskSerializationError(taskId, 'dependencies must contain only strings');
}
return {
id: parsed.id as string,
project: parsed.project as string,
mission: parsed.mission as string,
taskId: parsed.taskId as string,
title: parsed.title as string,
status: parsed.status as TaskStatus,
priority: parsed.priority as TaskPriority,
dependencies: parsed.dependencies as string[],
lane: parsed.lane as TaskLane,
retryCount: parsed.retryCount as number,
createdAt: parsed.createdAt as number,
updatedAt: parsed.updatedAt as number,
...(typeof parsed.description === 'string'
? { description: parsed.description }
: {}),
...(typeof parsed.claimedBy === 'string' ? { claimedBy: parsed.claimedBy } : {}),
...(typeof parsed.claimedAt === 'number' ? { claimedAt: parsed.claimedAt } : {}),
...(typeof parsed.claimTTL === 'number' ? { claimTTL: parsed.claimTTL } : {}),
...(typeof parsed.completedAt === 'number'
? { completedAt: parsed.completedAt }
: {}),
...(typeof parsed.failedAt === 'number' ? { failedAt: parsed.failedAt } : {}),
...(typeof parsed.failureReason === 'string'
? { failureReason: parsed.failureReason }
: {}),
...(typeof parsed.completionSummary === 'string'
? { completionSummary: parsed.completionSummary }
: {}),
...(isRecord(parsed.metadata) ? { metadata: parsed.metadata } : {}),
};
}
function buildRepositoryKeys(keyPrefix: string): RepositoryKeys {
return {
taskIds: `${keyPrefix}:task-ids`,
task: (taskId: string) => `${keyPrefix}:task:${taskId}`,
};
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}

View File

@@ -0,0 +1,12 @@
export const TASK_STATUSES = [
'pending',
'claimed',
'in-progress',
'completed',
'failed',
'blocked',
] as const;
export const TASK_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const;
export const TASK_LANES = ['planning', 'coding', 'any'] as const;

View File

@@ -0,0 +1,219 @@
import { describe, expect, it, vi } from 'vitest';
import { runQueueCli, type QueueCliDependencies, type QueueRepository } from '../src/cli.js';
import type { Task } from '@mosaic/types';
function createRepositoryMock(): QueueRepository {
return {
create: vi.fn(() =>
Promise.resolve<Task>({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'pending',
priority: 'medium',
dependencies: [],
lane: 'any',
retryCount: 0,
createdAt: 1,
updatedAt: 1,
}),
),
list: vi.fn(() => Promise.resolve([])),
get: vi.fn(() => Promise.resolve(null)),
claim: vi.fn(() =>
Promise.resolve<Task>({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'claimed',
priority: 'medium',
dependencies: [],
lane: 'any',
claimedBy: 'agent-a',
claimedAt: 2,
claimTTL: 60,
retryCount: 0,
createdAt: 1,
updatedAt: 2,
}),
),
release: vi.fn(() =>
Promise.resolve<Task>({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'pending',
priority: 'medium',
dependencies: [],
lane: 'any',
retryCount: 0,
createdAt: 1,
updatedAt: 3,
}),
),
complete: vi.fn(() =>
Promise.resolve<Task>({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'completed',
priority: 'medium',
dependencies: [],
lane: 'any',
completionSummary: 'done',
retryCount: 0,
createdAt: 1,
updatedAt: 4,
completedAt: 4,
}),
),
};
}
function createDependencies(
repository: QueueRepository,
): QueueCliDependencies & { outputs: string[]; errors: string[] } {
const outputs: string[] = [];
const errors: string[] = [];
const close = vi.fn(() => Promise.resolve(undefined));
return {
openSession: () =>
Promise.resolve({
repository,
close,
}),
stdout: (line) => {
outputs.push(line);
},
stderr: (line) => {
errors.push(line);
},
outputs,
errors,
};
}
describe('runQueueCli', () => {
it('creates a task from command options', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const exitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'create',
'queue',
'phase1',
'MQ-005',
'--title',
'Build CLI',
'--priority',
'high',
'--lane',
'coding',
'--dependency',
'MQ-002',
'MQ-003',
],
dependencies,
);
expect(exitCode).toBe(0);
expect(repository.create).toHaveBeenCalledWith({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
description: undefined,
priority: 'high',
dependencies: ['MQ-002', 'MQ-003'],
lane: 'coding',
});
});
it('lists tasks with filters', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const exitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'list',
'--project',
'queue',
'--mission',
'phase1',
'--status',
'pending',
],
dependencies,
);
expect(exitCode).toBe(0);
expect(repository.list).toHaveBeenCalledWith({
project: 'queue',
mission: 'phase1',
status: 'pending',
});
});
it('claims and completes tasks with typed options', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const claimExitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'claim',
'MQ-005',
'--agent',
'agent-a',
'--ttl',
'60',
],
dependencies,
);
const completeExitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'complete',
'MQ-005',
'--agent',
'agent-a',
'--summary',
'done',
],
dependencies,
);
expect(claimExitCode).toBe(0);
expect(completeExitCode).toBe(0);
expect(repository.claim).toHaveBeenCalledWith('MQ-005', {
agentId: 'agent-a',
ttlSeconds: 60,
});
expect(repository.complete).toHaveBeenCalledWith('MQ-005', {
agentId: 'agent-a',
summary: 'done',
});
});
});

View File

@@ -0,0 +1,50 @@
import { describe, expect, it } from 'vitest';
import {
QUEUE_MCP_TOOL_DEFINITIONS,
buildQueueMcpServer,
} from '../src/mcp-server.js';
describe('queue MCP server', () => {
it('declares all required phase-1 tools', () => {
const toolNames = QUEUE_MCP_TOOL_DEFINITIONS.map((tool) => tool.name).sort();
expect(toolNames).toEqual([
'queue_claim',
'queue_complete',
'queue_fail',
'queue_get',
'queue_heartbeat',
'queue_list',
'queue_release',
'queue_status',
]);
});
it('builds an MCP server instance', () => {
const server = buildQueueMcpServer({
openSession: () =>
Promise.resolve({
repository: {
list: () => Promise.resolve([]),
get: () => Promise.resolve(null),
claim: () => Promise.reject(new Error('not implemented')),
heartbeat: () => Promise.reject(new Error('not implemented')),
release: () => Promise.reject(new Error('not implemented')),
complete: () => Promise.reject(new Error('not implemented')),
fail: () => Promise.reject(new Error('not implemented')),
},
checkHealth: () =>
Promise.resolve({
checkedAt: 1,
latencyMs: 0,
ok: true,
response: 'PONG',
}),
close: () => Promise.resolve(),
}),
});
expect(server).toBeDefined();
});
});

View File

@@ -0,0 +1,90 @@
import { describe, expect, it } from 'vitest';
import {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from '../src/mcp-tool-schemas.js';
describe('MCP tool schemas', () => {
it('validates queue_list filters', () => {
const parsed = queueListToolInputSchema.parse({
project: 'queue',
mission: 'phase1',
status: 'pending',
});
expect(parsed).toEqual({
project: 'queue',
mission: 'phase1',
status: 'pending',
});
});
it('requires a taskId for queue_get', () => {
expect(() => queueGetToolInputSchema.parse({})).toThrowError();
});
it('requires positive ttlSeconds for queue_claim', () => {
expect(() =>
queueClaimToolInputSchema.parse({
taskId: 'MQ-007',
agentId: 'agent-a',
ttlSeconds: 0,
}),
).toThrowError();
});
it('accepts optional fields for queue_heartbeat and queue_release', () => {
const heartbeat = queueHeartbeatToolInputSchema.parse({
taskId: 'MQ-007',
ttlSeconds: 30,
});
const release = queueReleaseToolInputSchema.parse({
taskId: 'MQ-007',
});
expect(heartbeat).toEqual({
taskId: 'MQ-007',
ttlSeconds: 30,
});
expect(release).toEqual({
taskId: 'MQ-007',
});
});
it('validates queue_complete and queue_fail payloads', () => {
const complete = queueCompleteToolInputSchema.parse({
taskId: 'MQ-007',
agentId: 'agent-a',
summary: 'done',
});
const fail = queueFailToolInputSchema.parse({
taskId: 'MQ-007',
reason: 'boom',
});
expect(complete).toEqual({
taskId: 'MQ-007',
agentId: 'agent-a',
summary: 'done',
});
expect(fail).toEqual({
taskId: 'MQ-007',
reason: 'boom',
});
});
it('accepts an empty payload for queue_status', () => {
const parsed = queueStatusToolInputSchema.parse({});
expect(parsed).toEqual({});
});
});

View File

@@ -0,0 +1,76 @@
import { describe, expect, it } from 'vitest';
import {
createRedisClient,
resolveRedisUrl,
runRedisHealthCheck,
} from '../src/redis-connection.js';
describe('resolveRedisUrl', () => {
it('prefers VALKEY_URL when both env vars are present', () => {
const url = resolveRedisUrl({
VALKEY_URL: 'redis://valkey.local:6379',
REDIS_URL: 'redis://redis.local:6379',
});
expect(url).toBe('redis://valkey.local:6379');
});
it('falls back to REDIS_URL when VALKEY_URL is missing', () => {
const url = resolveRedisUrl({
REDIS_URL: 'redis://redis.local:6379',
});
expect(url).toBe('redis://redis.local:6379');
});
it('throws loudly when no redis environment variable exists', () => {
expect(() => resolveRedisUrl({})).toThrowError(
/Missing required Valkey\/Redis connection URL/i,
);
});
});
describe('createRedisClient', () => {
it('uses env URL for client creation with no hardcoded defaults', () => {
class FakeRedis {
public readonly url: string;
public constructor(url: string) {
this.url = url;
}
}
const client = createRedisClient({
env: {
VALKEY_URL: 'redis://queue.local:6379',
},
redisConstructor: FakeRedis,
});
expect(client.url).toBe('redis://queue.local:6379');
});
});
describe('runRedisHealthCheck', () => {
it('returns healthy status when ping succeeds', async () => {
const health = await runRedisHealthCheck({
ping: () => Promise.resolve('PONG'),
});
expect(health.ok).toBe(true);
expect(health.response).toBe('PONG');
expect(health.latencyMs).toBeTypeOf('number');
expect(health.latencyMs).toBeGreaterThanOrEqual(0);
});
it('returns unhealthy status when ping fails', async () => {
const health = await runRedisHealthCheck({
ping: () => Promise.reject(new Error('connection refused')),
});
expect(health.ok).toBe(false);
expect(health.error).toMatch(/connection refused/i);
expect(health.latencyMs).toBeTypeOf('number');
});
});

View File

@@ -0,0 +1,9 @@
import { describe, expect, it } from 'vitest';
import { packageVersion } from '../src/index.js';
describe('package bootstrap', () => {
it('exposes package version constant', () => {
expect(packageVersion).toBe('0.1.0');
});
});

View File

@@ -0,0 +1,459 @@
import { describe, expect, it } from 'vitest';
import {
RedisTaskRepository,
TaskAlreadyExistsError,
TaskOwnershipError,
TaskTransitionError,
type RedisTaskClient,
type RedisTaskTransaction,
} from '../src/task-repository.js';
type QueuedOperation =
| {
readonly type: 'set';
readonly key: string;
readonly value: string;
readonly mode?: 'NX' | 'XX';
}
| {
readonly type: 'sadd';
readonly key: string;
readonly member: string;
};
class InMemoryRedisBackend {
public readonly kv = new Map<string, string>();
public readonly sets = new Map<string, Set<string>>();
public readonly revisions = new Map<string, number>();
public getRevision(key: string): number {
return this.revisions.get(key) ?? 0;
}
public bumpRevision(key: string): void {
this.revisions.set(key, this.getRevision(key) + 1);
}
}
class InMemoryRedisTransaction implements RedisTaskTransaction {
private readonly operations: QueuedOperation[] = [];
public constructor(
private readonly backend: InMemoryRedisBackend,
private readonly watchedRevisions: ReadonlyMap<string, number>,
) {}
public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction {
this.operations.push({
type: 'set',
key,
value,
mode,
});
return this;
}
public sadd(key: string, member: string): RedisTaskTransaction {
this.operations.push({
type: 'sadd',
key,
member,
});
return this;
}
public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> {
for (const [key, revision] of this.watchedRevisions.entries()) {
if (this.backend.getRevision(key) !== revision) {
return Promise.resolve(null);
}
}
const results: (readonly [Error | null, unknown])[] = [];
for (const operation of this.operations) {
if (operation.type === 'set') {
const exists = this.backend.kv.has(operation.key);
if (operation.mode === 'NX' && exists) {
results.push([null, null]);
continue;
}
if (operation.mode === 'XX' && !exists) {
results.push([null, null]);
continue;
}
this.backend.kv.set(operation.key, operation.value);
this.backend.bumpRevision(operation.key);
results.push([null, 'OK']);
continue;
}
const set = this.backend.sets.get(operation.key) ?? new Set<string>();
const before = set.size;
set.add(operation.member);
this.backend.sets.set(operation.key, set);
this.backend.bumpRevision(operation.key);
results.push([null, set.size === before ? 0 : 1]);
}
return Promise.resolve(results);
}
}
class InMemoryAtomicRedisClient implements RedisTaskClient {
private watchedRevisions = new Map<string, number>();
public constructor(private readonly backend: InMemoryRedisBackend) {}
public get(key: string): Promise<string | null> {
return Promise.resolve(this.backend.kv.get(key) ?? null);
}
public mget(...keys: string[]): Promise<(string | null)[]> {
return Promise.resolve(keys.map((key) => this.backend.kv.get(key) ?? null));
}
public set(
key: string,
value: string,
mode?: 'NX' | 'XX',
): Promise<'OK' | null> {
const exists = this.backend.kv.has(key);
if (mode === 'NX' && exists) {
return Promise.resolve(null);
}
if (mode === 'XX' && !exists) {
return Promise.resolve(null);
}
this.backend.kv.set(key, value);
this.backend.bumpRevision(key);
return Promise.resolve('OK');
}
public smembers(key: string): Promise<string[]> {
return Promise.resolve([...(this.backend.sets.get(key) ?? new Set<string>())]);
}
public sadd(key: string, member: string): Promise<number> {
const values = this.backend.sets.get(key) ?? new Set<string>();
const before = values.size;
values.add(member);
this.backend.sets.set(key, values);
this.backend.bumpRevision(key);
return Promise.resolve(values.size === before ? 0 : 1);
}
public watch(...keys: string[]): Promise<'OK'> {
this.watchedRevisions = new Map(
keys.map((key) => [key, this.backend.getRevision(key)]),
);
return Promise.resolve('OK');
}
public unwatch(): Promise<'OK'> {
this.watchedRevisions.clear();
return Promise.resolve('OK');
}
public multi(): RedisTaskTransaction {
const watchedSnapshot = new Map(this.watchedRevisions);
this.watchedRevisions.clear();
return new InMemoryRedisTransaction(this.backend, watchedSnapshot);
}
}
class StrictAtomicRedisClient extends InMemoryAtomicRedisClient {
public override set(
key: string,
value: string,
mode?: 'NX' | 'XX',
): Promise<'OK' | null> {
void key;
void value;
void mode;
throw new Error('Direct set() is not allowed in strict atomic tests.');
}
public override sadd(key: string, member: string): Promise<number> {
void key;
void member;
throw new Error('Direct sadd() is not allowed in strict atomic tests.');
}
}
function createRepositoryPair(now: () => number): [RedisTaskRepository, RedisTaskRepository] {
const backend = new InMemoryRedisBackend();
return [
new RedisTaskRepository({
client: new InMemoryAtomicRedisClient(backend),
now,
}),
new RedisTaskRepository({
client: new InMemoryAtomicRedisClient(backend),
now,
}),
];
}
function createStrictRepositoryPair(
now: () => number,
): [RedisTaskRepository, RedisTaskRepository] {
const backend = new InMemoryRedisBackend();
return [
new RedisTaskRepository({
client: new StrictAtomicRedisClient(backend),
now,
}),
new RedisTaskRepository({
client: new StrictAtomicRedisClient(backend),
now,
}),
];
}
describe('RedisTaskRepository atomic transitions', () => {
it('creates atomically under concurrent create race', async () => {
const [repositoryA, repositoryB] = createStrictRepositoryPair(
() => 1_700_000_000_000,
);
const [createA, createB] = await Promise.allSettled([
repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-CREATE',
title: 'create race',
}),
repositoryB.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-CREATE',
title: 'create race duplicate',
}),
]);
const fulfilled = [createA, createB].filter(
(result) => result.status === 'fulfilled',
);
const rejected = [createA, createB].filter(
(result) => result.status === 'rejected',
);
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect(rejected[0]?.reason).toBeInstanceOf(TaskAlreadyExistsError);
});
it('claims a pending task once and blocks concurrent double-claim', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repositoryA, repositoryB] = createRepositoryPair(now);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004',
title: 'Atomic claim',
});
const [claimA, claimB] = await Promise.allSettled([
repositoryA.claim('MQ-004', { agentId: 'agent-a', ttlSeconds: 60 }),
repositoryB.claim('MQ-004', { agentId: 'agent-b', ttlSeconds: 60 }),
]);
const fulfilled = [claimA, claimB].filter((result) => result.status === 'fulfilled');
const rejected = [claimA, claimB].filter((result) => result.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
});
it('allows claim takeover after TTL expiry', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repositoryA, repositoryB] = createRepositoryPair(now);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-EXP',
title: 'TTL expiry',
});
await repositoryA.claim('MQ-004-EXP', {
agentId: 'agent-a',
ttlSeconds: 1,
});
timestamp += 2_000;
const takeover = await repositoryB.claim('MQ-004-EXP', {
agentId: 'agent-b',
ttlSeconds: 60,
});
expect(takeover.claimedBy).toBe('agent-b');
});
it('releases a claimed task back to pending', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-REL',
title: 'Release test',
});
await repository.claim('MQ-004-REL', {
agentId: 'agent-a',
ttlSeconds: 60,
});
const released = await repository.release('MQ-004-REL', {
agentId: 'agent-a',
});
expect(released.status).toBe('pending');
expect(released.claimedBy).toBeUndefined();
expect(released.claimedAt).toBeUndefined();
});
it('heartbeats, completes, and fails with valid transitions', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repository] = createRepositoryPair(now);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-HCF',
title: 'Transition test',
});
await repository.claim('MQ-004-HCF', {
agentId: 'agent-a',
ttlSeconds: 60,
});
timestamp += 1_000;
const heartbeat = await repository.heartbeat('MQ-004-HCF', {
agentId: 'agent-a',
ttlSeconds: 120,
});
expect(heartbeat.claimTTL).toBe(120);
expect(heartbeat.claimedAt).toBe(1_700_000_001_000);
const completed = await repository.complete('MQ-004-HCF', {
agentId: 'agent-a',
summary: 'done',
});
expect(completed.status).toBe('completed');
expect(completed.completionSummary).toBe('done');
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-FAIL',
title: 'Failure test',
});
await repository.claim('MQ-004-FAIL', {
agentId: 'agent-a',
ttlSeconds: 60,
});
const failed = await repository.fail('MQ-004-FAIL', {
agentId: 'agent-a',
reason: 'boom',
});
expect(failed.status).toBe('failed');
expect(failed.failureReason).toBe('boom');
expect(failed.retryCount).toBe(1);
});
it('rejects invalid transitions', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-INV',
title: 'Invalid transitions',
});
await expect(
repository.complete('MQ-004-INV', {
agentId: 'agent-a',
}),
).rejects.toBeInstanceOf(TaskTransitionError);
});
it('enforces claim ownership for release and complete', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-OWN',
title: 'Ownership checks',
});
await repository.claim('MQ-004-OWN', {
agentId: 'agent-a',
ttlSeconds: 60,
});
await expect(
repository.release('MQ-004-OWN', {
agentId: 'agent-b',
}),
).rejects.toBeInstanceOf(TaskOwnershipError);
await expect(
repository.complete('MQ-004-OWN', {
agentId: 'agent-b',
}),
).rejects.toBeInstanceOf(TaskOwnershipError);
});
it('merges concurrent non-conflicting update patches atomically', async () => {
const [repositoryA, repositoryB] = createRepositoryPair(() => 1_700_000_000_000);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-UPD',
title: 'Original title',
description: 'Original description',
});
await Promise.all([
repositoryA.update('MQ-004-UPD', {
title: 'Updated title',
}),
repositoryB.update('MQ-004-UPD', {
description: 'Updated description',
}),
]);
const latest = await repositoryA.get('MQ-004-UPD');
expect(latest).not.toBeNull();
expect(latest?.title).toBe('Updated title');
expect(latest?.description).toBe('Updated description');
});
});

View File

@@ -0,0 +1,332 @@
import { describe, expect, it } from 'vitest';
import {
RedisTaskRepository,
TaskAlreadyExistsError,
TaskTransitionError,
type RedisTaskClient,
type RedisTaskTransaction,
} from '../src/task-repository.js';
class NoopRedisTransaction implements RedisTaskTransaction {
private readonly operations: (
| {
readonly type: 'set';
readonly key: string;
readonly value: string;
readonly mode?: 'NX' | 'XX';
}
| {
readonly type: 'sadd';
readonly key: string;
readonly member: string;
}
)[] = [];
public constructor(
private readonly kv: Map<string, string>,
private readonly sets: Map<string, Set<string>>,
) {}
public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction {
this.operations.push({
type: 'set',
key,
value,
mode,
});
return this;
}
public sadd(key: string, member: string): RedisTaskTransaction {
this.operations.push({
type: 'sadd',
key,
member,
});
return this;
}
public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> {
const results: (readonly [Error | null, unknown])[] = [];
for (const operation of this.operations) {
if (operation.type === 'set') {
const exists = this.kv.has(operation.key);
if (operation.mode === 'NX' && exists) {
results.push([null, null]);
continue;
}
if (operation.mode === 'XX' && !exists) {
results.push([null, null]);
continue;
}
this.kv.set(operation.key, operation.value);
results.push([null, 'OK']);
continue;
}
const values = this.sets.get(operation.key) ?? new Set<string>();
const beforeSize = values.size;
values.add(operation.member);
this.sets.set(operation.key, values);
results.push([null, values.size === beforeSize ? 0 : 1]);
}
return Promise.resolve(results);
}
}
class InMemoryRedisClient implements RedisTaskClient {
private readonly kv = new Map<string, string>();
private readonly sets = new Map<string, Set<string>>();
public get(key: string): Promise<string | null> {
return Promise.resolve(this.kv.get(key) ?? null);
}
public mget(...keys: string[]): Promise<(string | null)[]> {
return Promise.resolve(keys.map((key) => this.kv.get(key) ?? null));
}
public set(
key: string,
value: string,
mode?: 'NX' | 'XX',
): Promise<'OK' | null> {
const exists = this.kv.has(key);
if (mode === 'NX' && exists) {
return Promise.resolve(null);
}
if (mode === 'XX' && !exists) {
return Promise.resolve(null);
}
this.kv.set(key, value);
return Promise.resolve('OK');
}
public smembers(key: string): Promise<string[]> {
return Promise.resolve([...(this.sets.get(key) ?? new Set<string>())]);
}
public sadd(key: string, member: string): Promise<number> {
const values = this.sets.get(key) ?? new Set<string>();
const beforeSize = values.size;
values.add(member);
this.sets.set(key, values);
return Promise.resolve(values.size === beforeSize ? 0 : 1);
}
public watch(): Promise<'OK'> {
return Promise.resolve('OK');
}
public unwatch(): Promise<'OK'> {
return Promise.resolve('OK');
}
public multi(): RedisTaskTransaction {
return new NoopRedisTransaction(this.kv, this.sets);
}
}
class MgetTrackingRedisClient extends InMemoryRedisClient {
public getCalls = 0;
public mgetCalls = 0;
public lastMgetKeys: string[] = [];
public override get(key: string): Promise<string | null> {
this.getCalls += 1;
return super.get(key);
}
public override mget(...keys: string[]): Promise<(string | null)[]> {
this.mgetCalls += 1;
this.lastMgetKeys = [...keys];
return super.mget(...keys);
}
}
describe('RedisTaskRepository CRUD', () => {
it('creates and fetches a task with defaults', async () => {
const repository = new RedisTaskRepository({
client: new InMemoryRedisClient(),
now: () => 1_700_000_000_000,
});
const created = await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003',
title: 'Implement task CRUD',
});
const fetched = await repository.get('MQ-003');
expect(created.id).toBe('MQ-003');
expect(created.status).toBe('pending');
expect(created.priority).toBe('medium');
expect(created.lane).toBe('any');
expect(created.dependencies).toEqual([]);
expect(created.createdAt).toBe(1_700_000_000_000);
expect(fetched).toEqual(created);
});
it('throws when creating a duplicate task id', async () => {
const repository = new RedisTaskRepository({
client: new InMemoryRedisClient(),
});
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003',
title: 'First task',
});
await expect(
repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003',
title: 'Duplicate',
}),
).rejects.toBeInstanceOf(TaskAlreadyExistsError);
});
it('lists tasks and filters by project, mission, and status', async () => {
const repository = new RedisTaskRepository({
client: new InMemoryRedisClient(),
});
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003A',
title: 'Pending task',
});
await repository.create({
project: 'queue',
mission: 'phase2',
taskId: 'MQ-003B',
title: 'Claimed task',
});
await repository.claim('MQ-003B', {
agentId: 'agent-a',
ttlSeconds: 60,
});
const byProject = await repository.list({
project: 'queue',
});
const byMission = await repository.list({
mission: 'phase2',
});
const byStatus = await repository.list({
status: 'claimed',
});
expect(byProject).toHaveLength(2);
expect(byMission.map((task) => task.taskId)).toEqual(['MQ-003B']);
expect(byStatus.map((task) => task.taskId)).toEqual(['MQ-003B']);
});
it('lists 3+ tasks with a single mget call', async () => {
const client = new MgetTrackingRedisClient();
const repository = new RedisTaskRepository({
client,
});
await repository.create({
project: 'queue',
mission: 'phase-list',
taskId: 'MQ-MGET-001',
title: 'Task one',
});
await repository.create({
project: 'queue',
mission: 'phase-list',
taskId: 'MQ-MGET-002',
title: 'Task two',
});
await repository.create({
project: 'queue',
mission: 'phase-list',
taskId: 'MQ-MGET-003',
title: 'Task three',
});
const tasks = await repository.list();
expect(tasks).toHaveLength(3);
expect(client.mgetCalls).toBe(1);
expect(client.getCalls).toBe(0);
expect(client.lastMgetKeys).toHaveLength(3);
});
it('updates mutable fields and preserves immutable fields', async () => {
const repository = new RedisTaskRepository({
client: new InMemoryRedisClient(),
now: () => 1_700_000_000_001,
});
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003',
title: 'Original title',
description: 'Original description',
});
const updated = await repository.update('MQ-003', {
title: 'Updated title',
description: 'Updated description',
priority: 'high',
lane: 'coding',
dependencies: ['MQ-002'],
metadata: {
source: 'unit-test',
},
});
expect(updated.title).toBe('Updated title');
expect(updated.description).toBe('Updated description');
expect(updated.priority).toBe('high');
expect(updated.lane).toBe('coding');
expect(updated.dependencies).toEqual(['MQ-002']);
expect(updated.metadata).toEqual({ source: 'unit-test' });
expect(updated.project).toBe('queue');
expect(updated.taskId).toBe('MQ-003');
expect(updated.updatedAt).toBe(1_700_000_000_001);
});
it('rejects status transitions through update()', async () => {
const repository = new RedisTaskRepository({
client: new InMemoryRedisClient(),
});
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-003-TRANSITION',
title: 'Transition guard',
});
await expect(
repository.update('MQ-003-TRANSITION', {
status: 'completed',
}),
).rejects.toBeInstanceOf(TaskTransitionError);
});
});

View File

@@ -0,0 +1,10 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"noEmit": false,
"rootDir": ".",
"outDir": "dist"
},
"include": ["src/**/*.ts", "bin/**/*.ts"],
"exclude": ["tests/**/*"]
}

View File

@@ -0,0 +1,8 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"noEmit": true,
"rootDir": "."
},
"include": ["src/**/*.ts", "tests/**/*.ts", "bin/**/*.ts", "vitest.config.ts"]
}

View File

@@ -0,0 +1,8 @@
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
environment: 'node',
include: ['tests/**/*.test.ts'],
},
});

View File

@@ -1,18 +1,29 @@
{
"name": "@mosaic/types",
"version": "0.1.0",
"private": false,
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"files": [
"dist",
"README.md"
"dist"
],
"scripts": {
"build": "tsc -p tsconfig.json",
"test": "echo \"No tests for @mosaic/types\"",
"lint": "echo \"No lint configured for @mosaic/types\"",
"typecheck": "tsc -p tsconfig.json --noEmit"
"typecheck": "tsc --noEmit",
"lint": "echo 'ok'",
"test": "echo 'ok'"
},
"devDependencies": {
"typescript": "^5"
},
"publishConfig": {
"registry": "https://git.mosaicstack.dev/api/packages/mosaic/npm",
"access": "public"
}
}

View File

@@ -1,2 +1,342 @@
// @mosaic/types - shared type definitions
export type Placeholder = Record<string, never>;
// @mosaic/types
// Shared foundational types extracted from queue, bootstrap, and context packages.
// === Queue ===
export type TaskStatus =
| 'pending'
| 'claimed'
| 'in-progress'
| 'completed'
| 'failed'
| 'blocked';
export type TaskPriority = 'critical' | 'high' | 'medium' | 'low';
export type TaskLane = 'planning' | 'coding' | 'any';
export interface Task {
readonly id: string;
readonly project: string;
readonly mission: string;
readonly taskId: string;
readonly title: string;
readonly description?: string;
readonly status: TaskStatus;
readonly priority: TaskPriority;
readonly dependencies: readonly string[];
readonly lane: TaskLane;
readonly claimedBy?: string;
readonly claimedAt?: number;
readonly claimTTL?: number;
readonly completedAt?: number;
readonly failedAt?: number;
readonly failureReason?: string;
readonly completionSummary?: string;
readonly retryCount: number;
readonly metadata?: Readonly<Record<string, unknown>>;
readonly createdAt: number;
readonly updatedAt: number;
}
export interface CreateTaskInput {
readonly project: string;
readonly mission: string;
readonly taskId: string;
readonly title: string;
readonly description?: string;
readonly priority?: TaskPriority;
readonly dependencies?: readonly string[];
readonly lane?: TaskLane;
readonly metadata?: Readonly<Record<string, unknown>>;
}
export interface TaskListFilters {
readonly project?: string;
readonly mission?: string;
readonly status?: TaskStatus;
}
export interface TaskUpdateInput {
readonly title?: string;
readonly description?: string;
readonly status?: TaskStatus;
readonly priority?: TaskPriority;
readonly dependencies?: readonly string[];
readonly lane?: TaskLane;
readonly claimedBy?: string;
readonly claimedAt?: number;
readonly claimTTL?: number;
readonly completedAt?: number;
readonly failedAt?: number;
readonly failureReason?: string;
readonly completionSummary?: string;
readonly retryCount?: number;
readonly metadata?: Readonly<Record<string, unknown>>;
}
export interface ClaimTaskInput {
readonly agentId: string;
readonly ttlSeconds: number;
}
export interface ReleaseTaskInput {
readonly agentId?: string;
}
export interface HeartbeatTaskInput {
readonly agentId?: string;
readonly ttlSeconds?: number;
}
export interface CompleteTaskInput {
readonly agentId?: string;
readonly summary?: string;
}
export interface FailTaskInput {
readonly agentId?: string;
readonly reason: string;
}
// === Agent ===
export type AgentMessageRole =
| 'user'
| 'assistant'
| 'tool'
| 'system'
| (string & {});
export interface AgentMessage {
readonly role: AgentMessageRole;
readonly content: unknown;
readonly timestamp?: number;
readonly [key: string]: unknown;
}
export type OpenBrainThoughtMetadata = Readonly<
Record<string, unknown> & {
sessionId?: string;
turn?: number;
role?: string;
type?: string;
}
>;
export interface OpenBrainThought {
readonly id: string;
readonly content: string;
readonly source: string;
readonly metadata?: OpenBrainThoughtMetadata;
readonly createdAt?: string;
readonly updatedAt?: string;
readonly score?: number;
readonly [key: string]: unknown;
}
export interface OpenBrainThoughtInput {
readonly content: string;
readonly source: string;
readonly metadata?: OpenBrainThoughtMetadata;
}
export interface OpenBrainSearchInput {
readonly query: string;
readonly limit: number;
readonly source?: string;
}
// === Context Engine ===
export interface AssembleResult {
readonly messages: readonly AgentMessage[];
readonly estimatedTokens: number;
readonly systemPromptAddition?: string;
}
export interface CompactResultData {
readonly summary?: string;
readonly firstKeptEntryId?: string;
readonly tokensBefore: number;
readonly tokensAfter?: number;
readonly details?: unknown;
}
export interface CompactResult {
readonly ok: boolean;
readonly compacted: boolean;
readonly reason?: string;
readonly result?: CompactResultData;
}
export interface IngestResult {
readonly ingested: boolean;
}
export interface IngestBatchResult {
readonly ingestedCount: number;
}
export interface BootstrapResult {
readonly bootstrapped: boolean;
readonly importedMessages?: number;
readonly reason?: string;
}
export interface ContextEngineInfo {
readonly id: string;
readonly name: string;
readonly version?: string;
readonly ownsCompaction?: boolean;
}
export interface SubagentSpawnPreparation {
readonly rollback: () => void | Promise<void>;
}
export type SubagentEndReason = 'deleted' | 'completed' | 'swept' | 'released';
export interface ContextEngineBootstrapParams {
readonly sessionId: string;
readonly sessionFile: string;
}
export interface ContextEngineIngestParams {
readonly sessionId: string;
readonly message: AgentMessage;
readonly isHeartbeat?: boolean;
}
export interface ContextEngineIngestBatchParams {
readonly sessionId: string;
readonly messages: readonly AgentMessage[];
readonly isHeartbeat?: boolean;
}
export interface ContextEngineAfterTurnParams {
readonly sessionId: string;
readonly sessionFile: string;
readonly messages: readonly AgentMessage[];
readonly prePromptMessageCount: number;
readonly autoCompactionSummary?: string;
readonly isHeartbeat?: boolean;
readonly tokenBudget?: number;
readonly legacyCompactionParams?: Readonly<Record<string, unknown>>;
}
export interface ContextEngineAssembleParams {
readonly sessionId: string;
readonly messages: readonly AgentMessage[];
readonly tokenBudget?: number;
}
export interface ContextEngineCompactParams {
readonly sessionId: string;
readonly sessionFile: string;
readonly tokenBudget?: number;
readonly force?: boolean;
readonly currentTokenCount?: number;
readonly compactionTarget?: 'budget' | 'threshold';
readonly customInstructions?: string;
readonly legacyParams?: Readonly<Record<string, unknown>>;
}
export interface ContextEnginePrepareSubagentSpawnParams {
readonly parentSessionKey: string;
readonly childSessionKey: string;
readonly ttlMs?: number;
}
export interface ContextEngineSubagentEndedParams {
readonly childSessionKey: string;
readonly reason: SubagentEndReason;
}
export interface ContextEngine {
readonly info: ContextEngineInfo;
bootstrap?(params: ContextEngineBootstrapParams): Promise<BootstrapResult>;
ingest(params: ContextEngineIngestParams): Promise<IngestResult>;
ingestBatch?(params: ContextEngineIngestBatchParams): Promise<IngestBatchResult>;
afterTurn?(params: ContextEngineAfterTurnParams): Promise<void>;
assemble(params: ContextEngineAssembleParams): Promise<AssembleResult>;
compact(params: ContextEngineCompactParams): Promise<CompactResult>;
prepareSubagentSpawn?(
params: ContextEnginePrepareSubagentSpawnParams,
): Promise<SubagentSpawnPreparation | undefined>;
onSubagentEnded?(params: ContextEngineSubagentEndedParams): Promise<void>;
dispose?(): Promise<void>;
}
export type ContextEngineFactory = () => ContextEngine | Promise<ContextEngine>;
export interface PluginLogger {
readonly debug?: (...args: unknown[]) => void;
readonly info?: (...args: unknown[]) => void;
readonly warn?: (...args: unknown[]) => void;
readonly error?: (...args: unknown[]) => void;
}
export interface OpenClawPluginApi {
readonly pluginConfig?: Readonly<Record<string, unknown>>;
readonly logger?: PluginLogger;
readonly registerContextEngine: (id: string, factory: ContextEngineFactory) => void;
}
// === Wizard ===
// Wizard bootstrap/setup types are package-specific but intentionally exported
// for cross-package tooling and install orchestration.
export type WizardMode = 'quick' | 'advanced';
export type InstallAction = 'fresh' | 'keep' | 'reconfigure' | 'reset';
export type CommunicationStyle = 'direct' | 'friendly' | 'formal';
export type RuntimeName = 'claude' | 'codex' | 'opencode';
export interface SoulConfig {
readonly agentName?: string;
readonly roleDescription?: string;
readonly communicationStyle?: CommunicationStyle;
readonly accessibility?: string;
readonly customGuardrails?: string;
}
export interface UserConfig {
readonly userName?: string;
readonly pronouns?: string;
readonly timezone?: string;
readonly background?: string;
readonly accessibilitySection?: string;
readonly communicationPrefs?: string;
readonly personalBoundaries?: string;
readonly projectsTable?: string;
}
export interface GitProvider {
readonly name: string;
readonly url: string;
readonly cli: string;
readonly purpose: string;
}
export interface ToolsConfig {
readonly gitProviders?: readonly GitProvider[];
readonly credentialsLocation?: string;
readonly customToolsSection?: string;
}
export interface RuntimeState {
readonly detected: readonly RuntimeName[];
readonly mcpConfigured: boolean;
}
export interface WizardState {
readonly mosaicHome: string;
readonly sourceDir: string;
readonly mode: WizardMode;
readonly installAction: InstallAction;
readonly soul: SoulConfig;
readonly user: UserConfig;
readonly tools: ToolsConfig;
readonly runtimes: RuntimeState;
readonly selectedSkills: readonly string[];
}

View File

@@ -0,0 +1,97 @@
# @mosaic/openclaw-context
OpenBrain-backed `ContextEngine` plugin for OpenClaw.
This plugin stores session context in OpenBrain over REST so context can be reassembled from recent history plus semantic matches instead of relying only on in-session compaction state.
## Features
- Registers context engine id: `openbrain`
- Typed OpenBrain REST client with Bearer auth
- Session-aware ingest + batch ingest
- Context assembly from recent + semantic search under token budget
- Compaction summaries archived to OpenBrain
- Subagent seed/result handoff helpers
## Requirements
- OpenClaw with plugin/context-engine support (`openclaw >= 2026.3.2`)
- Reachable OpenBrain REST API
- OpenBrain API key
## Install (local workspace plugin)
```bash
pnpm install
pnpm build
```
Then reference this plugin in your OpenClaw config.
## OpenBrain Setup (self-host or hosted)
You must provide both of these in plugin config:
- `baseUrl`: your OpenBrain API root (example: `https://brain.your-domain.com`)
- `apiKey`: Bearer token for your OpenBrain instance
No host or key fallback is built in. Missing `baseUrl` or `apiKey` throws `OpenBrainConfigError` at `bootstrap()`.
## Configuration
Plugin entry id: `openclaw-openbrain-context`
Context engine slot id: `openbrain`
### Config fields
- `baseUrl` (required, string): OpenBrain API base URL
- `apiKey` (required, string): OpenBrain Bearer token
- `source` (optional, string, default `openclaw`): source prefix; engine stores thoughts under `<source>:<sessionId>`
- `recentMessages` (optional, integer, default `20`): recent thoughts to fetch for bootstrap/assemble
- `semanticSearchLimit` (optional, integer, default `10`): semantic matches fetched in assemble
- `subagentRecentMessages` (optional, integer, default `8`): context lines used for subagent seed/result exchange
## Environment Variable Pattern
Use OpenClaw variable interpolation in `openclaw.json`:
```json
{
"apiKey": "${OPENBRAIN_API_KEY}"
}
```
Then set it in your shell/runtime environment before starting OpenClaw.
## Example `openclaw.json`
```json
{
"plugins": {
"slots": {
"contextEngine": "openbrain"
},
"entries": {
"openclaw-openbrain-context": {
"enabled": true,
"config": {
"baseUrl": "https://brain.example.com",
"apiKey": "${OPENBRAIN_API_KEY}",
"source": "openclaw",
"recentMessages": 20,
"semanticSearchLimit": 10,
"subagentRecentMessages": 8
}
}
}
}
}
```
## Development
```bash
pnpm lint
pnpm build
pnpm test
```

View File

@@ -0,0 +1,58 @@
{
"id": "openclaw-openbrain-context",
"name": "OpenBrain Context Engine",
"description": "OpenBrain-backed ContextEngine plugin for OpenClaw",
"version": "0.0.1",
"kind": "context-engine",
"configSchema": {
"type": "object",
"additionalProperties": false,
"required": ["baseUrl", "apiKey"],
"properties": {
"baseUrl": {
"type": "string",
"minLength": 1,
"description": "Base URL of your OpenBrain REST API"
},
"apiKey": {
"type": "string",
"minLength": 1,
"description": "Bearer token used to authenticate against OpenBrain"
},
"source": {
"type": "string",
"minLength": 1,
"default": "openclaw",
"description": "Source prefix stored in OpenBrain (session id is appended)"
},
"recentMessages": {
"type": "integer",
"minimum": 1,
"default": 20,
"description": "How many recent thoughts to fetch during assemble/bootstrap"
},
"semanticSearchLimit": {
"type": "integer",
"minimum": 1,
"default": 10,
"description": "How many semantic matches to request during assemble"
},
"subagentRecentMessages": {
"type": "integer",
"minimum": 1,
"default": 8,
"description": "How many thoughts to use when seeding/summarizing subagents"
}
}
},
"uiHints": {
"baseUrl": {
"label": "OpenBrain Base URL",
"placeholder": "https://brain.example.com"
},
"apiKey": {
"label": "OpenBrain API Key",
"sensitive": true
}
}
}

View File

@@ -0,0 +1,42 @@
{
"name": "@mosaic/openclaw-context",
"version": "0.1.0",
"type": "module",
"description": "OpenClaw → OpenBrain context engine plugin",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"files": [
"dist",
"openclaw.plugin.json"
],
"scripts": {
"build": "tsc -p tsconfig.json",
"typecheck": "tsc --noEmit",
"lint": "eslint src/",
"test": "vitest run"
},
"dependencies": {
"@mosaic/types": "workspace:*"
},
"devDependencies": {
"typescript": "^5",
"vitest": "^2",
"@types/node": "^22"
},
"keywords": [
"openclaw",
"openbrain",
"context-engine",
"plugin"
],
"publishConfig": {
"registry": "https://git.mosaicstack.dev/api/packages/mosaic/npm",
"access": "public"
}
}

View File

@@ -0,0 +1,3 @@
export const OPENBRAIN_CONTEXT_ENGINE_ID = "openbrain";
export const OPENBRAIN_PLUGIN_ID = "openclaw-openbrain-context";
export const OPENBRAIN_PLUGIN_VERSION = "0.0.1";

View File

@@ -0,0 +1,774 @@
import { OPENBRAIN_CONTEXT_ENGINE_ID, OPENBRAIN_PLUGIN_VERSION } from "./constants.js";
import { OpenBrainConfigError } from "./errors.js";
import type {
AgentMessage,
AssembleResult,
BootstrapResult,
CompactResult,
ContextEngine,
ContextEngineInfo,
IngestBatchResult,
IngestResult,
PluginLogger,
SubagentEndReason,
SubagentSpawnPreparation,
} from "./openclaw-types.js";
import {
OpenBrainClient,
type OpenBrainClientLike,
type OpenBrainSearchInput,
type OpenBrainThought,
type OpenBrainThoughtMetadata,
} from "./openbrain-client.js";
export type OpenBrainContextEngineConfig = {
baseUrl?: string;
apiKey?: string;
recentMessages?: number;
semanticSearchLimit?: number;
source?: string;
subagentRecentMessages?: number;
};
type ResolvedOpenBrainContextEngineConfig = {
baseUrl: string;
apiKey: string;
recentMessages: number;
semanticSearchLimit: number;
source: string;
subagentRecentMessages: number;
};
export type OpenBrainContextEngineDeps = {
createClient?: (config: ResolvedOpenBrainContextEngineConfig) => OpenBrainClientLike;
now?: () => number;
logger?: PluginLogger;
};
type SubagentState = {
parentSessionKey: string;
seedThoughtId?: string;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function parsePositiveInteger(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const rounded = Math.floor(value);
return rounded > 0 ? rounded : fallback;
}
function normalizeRole(role: unknown): string {
if (typeof role !== "string" || role.length === 0) {
return "assistant";
}
if (role === "user" || role === "assistant" || role === "tool" || role === "system") {
return role;
}
return "assistant";
}
function serializeContent(value: unknown): string {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value
.map((part) => serializeContent(part))
.filter((part) => part.length > 0)
.join("\n")
.trim();
}
if (isRecord(value) && typeof value.text === "string") {
return value.text;
}
if (value === undefined || value === null) {
return "";
}
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
function estimateTextTokens(text: string): number {
const normalized = text.trim();
if (normalized.length === 0) {
return 1;
}
return Math.max(1, Math.ceil(normalized.length / 4) + 4);
}
function thoughtTimestamp(thought: OpenBrainThought, fallbackTimestamp: number): number {
const createdAt =
thought.createdAt ??
(typeof thought.created_at === "string" ? thought.created_at : undefined);
if (createdAt === undefined) {
return fallbackTimestamp;
}
const parsed = Date.parse(createdAt);
return Number.isFinite(parsed) ? parsed : fallbackTimestamp;
}
function thoughtFingerprint(thought: OpenBrainThought): string {
const role = typeof thought.metadata?.role === "string" ? thought.metadata.role : "assistant";
return `${role}\n${thought.content}`;
}
function truncateLine(value: string, maxLength: number): string {
if (value.length <= maxLength) {
return value;
}
return `${value.slice(0, maxLength - 3)}...`;
}
export class OpenBrainContextEngine implements ContextEngine {
readonly info: ContextEngineInfo = {
id: OPENBRAIN_CONTEXT_ENGINE_ID,
name: "OpenBrain Context Engine",
version: OPENBRAIN_PLUGIN_VERSION,
ownsCompaction: true,
};
private readonly rawConfig: unknown;
private readonly createClientFn:
| ((config: ResolvedOpenBrainContextEngineConfig) => OpenBrainClientLike)
| undefined;
private readonly now: () => number;
private readonly logger: PluginLogger | undefined;
private config: ResolvedOpenBrainContextEngineConfig | undefined;
private client: OpenBrainClientLike | undefined;
private readonly sessionTurns = new Map<string, number>();
private readonly subagentState = new Map<string, SubagentState>();
private disposed = false;
constructor(rawConfig: unknown, deps?: OpenBrainContextEngineDeps) {
this.rawConfig = rawConfig;
this.createClientFn = deps?.createClient;
this.now = deps?.now ?? (() => Date.now());
this.logger = deps?.logger;
}
async bootstrap(params: { sessionId: string; sessionFile: string }): Promise<BootstrapResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = await client.listRecent({
limit: config.recentMessages,
source,
});
const sessionThoughts = this.filterSessionThoughts(recentThoughts, params.sessionId);
let maxTurn = -1;
for (const thought of sessionThoughts) {
const turn = thought.metadata?.turn;
if (typeof turn === "number" && Number.isFinite(turn) && turn > maxTurn) {
maxTurn = turn;
}
}
this.sessionTurns.set(params.sessionId, maxTurn + 1);
return {
bootstrapped: true,
importedMessages: sessionThoughts.length,
};
}
async ingest(params: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
}): Promise<IngestResult> {
this.assertNotDisposed();
const client = this.getClient();
const content = serializeContent(params.message.content).trim();
if (content.length === 0) {
return { ingested: false };
}
const metadata: OpenBrainThoughtMetadata = {
sessionId: params.sessionId,
turn: this.nextTurn(params.sessionId),
role: normalizeRole(params.message.role),
type: "message",
};
if (params.isHeartbeat === true) {
metadata.isHeartbeat = true;
}
await client.createThought({
content,
source: this.sourceForSession(params.sessionId),
metadata,
});
return { ingested: true };
}
async ingestBatch(params: {
sessionId: string;
messages: AgentMessage[];
isHeartbeat?: boolean;
}): Promise<IngestBatchResult> {
this.assertNotDisposed();
const maxConcurrency = 5;
let ingestedCount = 0;
for (let i = 0; i < params.messages.length; i += maxConcurrency) {
const chunk = params.messages.slice(i, i + maxConcurrency);
const results = await Promise.all(
chunk.map((message) => {
const ingestParams: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
} = {
sessionId: params.sessionId,
message,
};
if (params.isHeartbeat !== undefined) {
ingestParams.isHeartbeat = params.isHeartbeat;
}
return this.ingest(ingestParams);
}),
);
for (const result of results) {
if (result.ingested) {
ingestedCount += 1;
}
}
}
return { ingestedCount };
}
async assemble(params: {
sessionId: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.recentMessages,
source,
}),
params.sessionId,
);
const semanticThoughts = await this.searchSemanticThoughts({
client,
source,
config,
sessionId: params.sessionId,
messages: params.messages,
});
const mergedThoughts = this.mergeThoughts(recentThoughts, semanticThoughts);
const mergedMessages =
mergedThoughts.length > 0
? mergedThoughts.map((thought, index) => this.toAgentMessage(thought, index))
: params.messages;
const tokenBudget = params.tokenBudget;
const budgetedMessages =
typeof tokenBudget === "number" && tokenBudget > 0
? this.trimToBudget(mergedMessages, tokenBudget)
: mergedMessages;
return {
messages: budgetedMessages,
estimatedTokens: this.estimateTokensForMessages(budgetedMessages),
};
}
async compact(params: {
sessionId: string;
sessionFile: string;
tokenBudget?: number;
force?: boolean;
currentTokenCount?: number;
compactionTarget?: "budget" | "threshold";
customInstructions?: string;
legacyParams?: Record<string, unknown>;
}): Promise<CompactResult> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const source = this.sourceForSession(params.sessionId);
const recentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: Math.max(config.recentMessages, config.subagentRecentMessages),
source,
}),
params.sessionId,
);
if (recentThoughts.length === 0) {
return {
ok: true,
compacted: false,
reason: "no-session-context",
result: {
tokensBefore: 0,
tokensAfter: 0,
},
};
}
const summarizedThoughts = this.selectSummaryThoughts(recentThoughts);
const summary = this.buildSummary(
params.customInstructions !== undefined
? {
sessionId: params.sessionId,
thoughts: summarizedThoughts,
customInstructions: params.customInstructions,
}
: {
sessionId: params.sessionId,
thoughts: summarizedThoughts,
},
);
const summaryTokens = estimateTextTokens(summary);
const tokensBefore = this.estimateTokensForThoughts(summarizedThoughts);
await client.createThought({
content: summary,
source,
metadata: {
sessionId: params.sessionId,
turn: this.nextTurn(params.sessionId),
role: "assistant",
type: "summary",
},
});
const summaryThoughtIds = Array.from(
new Set(
summarizedThoughts
.map((thought) => thought.id.trim())
.filter((id) => id.length > 0),
),
);
await Promise.all(summaryThoughtIds.map((thoughtId) => client.deleteThought(thoughtId)));
return {
ok: true,
compacted: true,
reason: "summary-archived",
result: {
summary,
tokensBefore,
tokensAfter: summaryTokens,
},
};
}
async prepareSubagentSpawn(params: {
parentSessionKey: string;
childSessionKey: string;
ttlMs?: number;
}): Promise<SubagentSpawnPreparation | undefined> {
this.assertNotDisposed();
const config = this.getConfig();
const client = this.getClient();
const parentThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.subagentRecentMessages,
source: this.sourceForSession(params.parentSessionKey),
}),
params.parentSessionKey,
);
const seedContent = this.buildSubagentSeedContent({
parentSessionKey: params.parentSessionKey,
childSessionKey: params.childSessionKey,
thoughts: parentThoughts,
});
const createdThought = await client.createThought({
content: seedContent,
source: this.sourceForSession(params.childSessionKey),
metadata: {
sessionId: params.childSessionKey,
role: "assistant",
type: "summary",
parentSessionId: params.parentSessionKey,
ttlMs: params.ttlMs,
},
});
this.subagentState.set(params.childSessionKey, {
parentSessionKey: params.parentSessionKey,
seedThoughtId: createdThought.id,
});
return {
rollback: async () => {
const state = this.subagentState.get(params.childSessionKey);
this.subagentState.delete(params.childSessionKey);
if (state?.seedThoughtId !== undefined && state.seedThoughtId.length > 0) {
await client.deleteThought(state.seedThoughtId);
}
},
};
}
async onSubagentEnded(params: {
childSessionKey: string;
reason: SubagentEndReason;
}): Promise<void> {
this.assertNotDisposed();
const state = this.subagentState.get(params.childSessionKey);
if (state === undefined) {
return;
}
const client = this.getClient();
const config = this.getConfig();
const childThoughts = this.filterSessionThoughts(
await client.listRecent({
limit: config.subagentRecentMessages,
source: this.sourceForSession(params.childSessionKey),
}),
params.childSessionKey,
);
const summary = this.buildSubagentResultSummary({
childSessionKey: params.childSessionKey,
reason: params.reason,
thoughts: childThoughts,
});
await client.createThought({
content: summary,
source: this.sourceForSession(state.parentSessionKey),
metadata: {
sessionId: state.parentSessionKey,
turn: this.nextTurn(state.parentSessionKey),
role: "tool",
type: "subagent-result",
childSessionId: params.childSessionKey,
reason: params.reason,
},
});
this.subagentState.delete(params.childSessionKey);
}
async dispose(): Promise<void> {
this.sessionTurns.clear();
this.subagentState.clear();
this.disposed = true;
}
private searchSemanticThoughts(params: {
client: OpenBrainClientLike;
source: string;
config: ResolvedOpenBrainContextEngineConfig;
sessionId: string;
messages: AgentMessage[];
}): Promise<OpenBrainThought[]> {
const query = this.pickSemanticQuery(params.messages);
if (query === undefined || query.length === 0 || params.config.semanticSearchLimit <= 0) {
return Promise.resolve([]);
}
const request: OpenBrainSearchInput = {
query,
limit: params.config.semanticSearchLimit,
source: params.source,
};
return params.client
.search(request)
.then((results) => this.filterSessionThoughts(results, params.sessionId))
.catch((error) => {
this.logger?.warn?.("OpenBrain semantic search failed", error);
return [];
});
}
private pickSemanticQuery(messages: AgentMessage[]): string | undefined {
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
if (normalizeRole(message.role) !== "user") {
continue;
}
const content = serializeContent(message.content).trim();
if (content.length > 0) {
return content;
}
}
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
const content = serializeContent(message.content).trim();
if (content.length > 0) {
return content;
}
}
return undefined;
}
private mergeThoughts(recentThoughts: OpenBrainThought[], semanticThoughts: OpenBrainThought[]): OpenBrainThought[] {
const merged: OpenBrainThought[] = [];
const seenIds = new Set<string>();
const seenFingerprints = new Set<string>();
for (const thought of [...recentThoughts, ...semanticThoughts]) {
const id = thought.id.trim();
const fingerprint = thoughtFingerprint(thought);
if (id.length > 0 && seenIds.has(id)) {
continue;
}
if (seenFingerprints.has(fingerprint)) {
continue;
}
if (id.length > 0) {
seenIds.add(id);
}
seenFingerprints.add(fingerprint);
merged.push(thought);
}
return merged;
}
private filterSessionThoughts(thoughts: OpenBrainThought[], sessionId: string): OpenBrainThought[] {
return thoughts.filter((thought) => {
const thoughtSessionId = thought.metadata?.sessionId;
if (typeof thoughtSessionId === "string" && thoughtSessionId.length > 0) {
return thoughtSessionId === sessionId;
}
return thought.source === this.sourceForSession(sessionId);
});
}
private toAgentMessage(thought: OpenBrainThought, index: number): AgentMessage {
return {
role: normalizeRole(thought.metadata?.role),
content: thought.content,
timestamp: thoughtTimestamp(thought, this.now() + index),
};
}
private trimToBudget(messages: AgentMessage[], tokenBudget: number): AgentMessage[] {
if (messages.length === 0 || tokenBudget <= 0) {
return [];
}
let total = 0;
const budgeted: AgentMessage[] = [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const message = messages[i];
if (message === undefined) {
continue;
}
const tokens = estimateTextTokens(serializeContent(message.content));
if (total + tokens > tokenBudget) {
break;
}
total += tokens;
budgeted.unshift(message);
}
if (budgeted.length === 0) {
const lastMessage = messages[messages.length - 1];
return lastMessage === undefined ? [] : [lastMessage];
}
return budgeted;
}
private estimateTokensForMessages(messages: AgentMessage[]): number {
return messages.reduce((total, message) => {
return total + estimateTextTokens(serializeContent(message.content));
}, 0);
}
private estimateTokensForThoughts(thoughts: OpenBrainThought[]): number {
return thoughts.reduce((total, thought) => total + estimateTextTokens(thought.content), 0);
}
private buildSummary(params: {
sessionId: string;
thoughts: OpenBrainThought[];
customInstructions?: string;
}): string {
const lines = params.thoughts.map((thought) => {
const role = normalizeRole(thought.metadata?.role);
const content = truncateLine(thought.content.replace(/\s+/g, " ").trim(), 180);
return `- ${role}: ${content}`;
});
const header = `Context summary for session ${params.sessionId}`;
const instruction =
params.customInstructions !== undefined && params.customInstructions.trim().length > 0
? `Custom instructions: ${params.customInstructions.trim()}\n`
: "";
return `${header}\n${instruction}${lines.join("\n")}`;
}
private selectSummaryThoughts(thoughts: OpenBrainThought[]): OpenBrainThought[] {
const ordered = [...thoughts].sort((a, b) => {
return thoughtTimestamp(a, 0) - thoughtTimestamp(b, 0);
});
const maxLines = Math.min(ordered.length, 10);
return ordered.slice(Math.max(ordered.length - maxLines, 0));
}
private buildSubagentSeedContent(params: {
parentSessionKey: string;
childSessionKey: string;
thoughts: OpenBrainThought[];
}): string {
const lines = params.thoughts.slice(-5).map((thought) => {
const role = normalizeRole(thought.metadata?.role);
return `- ${role}: ${truncateLine(thought.content.replace(/\s+/g, " ").trim(), 160)}`;
});
const contextBlock = lines.length > 0 ? lines.join("\n") : "- (no prior context found)";
return [
`Subagent context seed`,
`Parent session: ${params.parentSessionKey}`,
`Child session: ${params.childSessionKey}`,
contextBlock,
].join("\n");
}
private buildSubagentResultSummary(params: {
childSessionKey: string;
reason: SubagentEndReason;
thoughts: OpenBrainThought[];
}): string {
const lines = params.thoughts.slice(-5).map((thought) => {
const role = normalizeRole(thought.metadata?.role);
return `- ${role}: ${truncateLine(thought.content.replace(/\s+/g, " ").trim(), 160)}`;
});
const contextBlock = lines.length > 0 ? lines.join("\n") : "- (no child messages found)";
return [
`Subagent ended (${params.reason})`,
`Child session: ${params.childSessionKey}`,
contextBlock,
].join("\n");
}
private sourceForSession(sessionId: string): string {
return `${this.getConfig().source}:${sessionId}`;
}
private nextTurn(sessionId: string): number {
const next = this.sessionTurns.get(sessionId) ?? 0;
this.sessionTurns.set(sessionId, next + 1);
return next;
}
private getClient(): OpenBrainClientLike {
if (this.client !== undefined) {
return this.client;
}
const config = this.getConfig();
this.client =
this.createClientFn?.(config) ??
new OpenBrainClient({
baseUrl: config.baseUrl,
apiKey: config.apiKey,
});
return this.client;
}
private getConfig(): ResolvedOpenBrainContextEngineConfig {
if (this.config !== undefined) {
return this.config;
}
const raw = isRecord(this.rawConfig) ? this.rawConfig : {};
const baseUrl = typeof raw.baseUrl === "string" ? raw.baseUrl.trim() : "";
if (baseUrl.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: baseUrl");
}
const apiKey = typeof raw.apiKey === "string" ? raw.apiKey.trim() : "";
if (apiKey.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: apiKey");
}
this.config = {
baseUrl,
apiKey,
recentMessages: parsePositiveInteger(raw.recentMessages, 20),
semanticSearchLimit: parsePositiveInteger(raw.semanticSearchLimit, 10),
source: typeof raw.source === "string" && raw.source.trim().length > 0 ? raw.source.trim() : "openclaw",
subagentRecentMessages: parsePositiveInteger(raw.subagentRecentMessages, 8),
};
return this.config;
}
private assertNotDisposed(): void {
if (this.disposed) {
throw new Error("OpenBrainContextEngine has already been disposed");
}
}
}

View File

@@ -0,0 +1,40 @@
export class OpenBrainError extends Error {
constructor(message: string, cause?: unknown) {
super(message);
this.name = "OpenBrainError";
if (cause !== undefined) {
(this as Error & { cause?: unknown }).cause = cause;
}
}
}
export class OpenBrainConfigError extends OpenBrainError {
constructor(message: string) {
super(message);
this.name = "OpenBrainConfigError";
}
}
export class OpenBrainHttpError extends OpenBrainError {
readonly status: number;
readonly endpoint: string;
readonly responseBody: string | undefined;
constructor(params: { endpoint: string; status: number; responseBody: string | undefined }) {
super(`OpenBrain request failed (${params.status}) for ${params.endpoint}`);
this.name = "OpenBrainHttpError";
this.status = params.status;
this.endpoint = params.endpoint;
this.responseBody = params.responseBody;
}
}
export class OpenBrainRequestError extends OpenBrainError {
readonly endpoint: string;
constructor(params: { endpoint: string; cause: unknown }) {
super(`OpenBrain request failed for ${params.endpoint}`, params.cause);
this.name = "OpenBrainRequestError";
this.endpoint = params.endpoint;
}
}

View File

@@ -0,0 +1,31 @@
import {
OPENBRAIN_CONTEXT_ENGINE_ID,
OPENBRAIN_PLUGIN_ID,
OPENBRAIN_PLUGIN_VERSION,
} from "./constants.js";
import { OpenBrainContextEngine } from "./engine.js";
import type { OpenClawPluginApi } from "./openclaw-types.js";
export { OPENBRAIN_CONTEXT_ENGINE_ID } from "./constants.js";
export { OpenBrainContextEngine } from "./engine.js";
export { OpenBrainConfigError, OpenBrainHttpError, OpenBrainRequestError } from "./errors.js";
export { OpenBrainClient } from "./openbrain-client.js";
export type { OpenBrainContextEngineConfig } from "./engine.js";
export type { OpenClawPluginApi } from "./openclaw-types.js";
export function register(api: OpenClawPluginApi): void {
api.registerContextEngine(OPENBRAIN_CONTEXT_ENGINE_ID, () => {
const deps = api.logger !== undefined ? { logger: api.logger } : undefined;
return new OpenBrainContextEngine(api.pluginConfig, deps);
});
}
const plugin = {
id: OPENBRAIN_PLUGIN_ID,
name: "OpenBrain Context Engine",
version: OPENBRAIN_PLUGIN_VERSION,
kind: "context-engine",
register,
};
export default plugin;

View File

@@ -0,0 +1,333 @@
import { OpenBrainConfigError, OpenBrainHttpError, OpenBrainRequestError } from "./errors.js";
export type OpenBrainThoughtMetadata = Record<string, unknown> & {
sessionId?: string;
turn?: number;
role?: string;
type?: string;
};
export type OpenBrainThought = {
id: string;
content: string;
source: string;
metadata: OpenBrainThoughtMetadata | undefined;
createdAt: string | undefined;
updatedAt: string | undefined;
score: number | undefined;
[key: string]: unknown;
};
export type OpenBrainThoughtInput = {
content: string;
source: string;
metadata?: OpenBrainThoughtMetadata;
};
export type OpenBrainSearchInput = {
query: string;
limit: number;
source?: string;
};
export type OpenBrainClientOptions = {
baseUrl: string;
apiKey: string;
fetchImpl?: typeof fetch;
};
export interface OpenBrainClientLike {
createThought(input: OpenBrainThoughtInput): Promise<OpenBrainThought>;
search(input: OpenBrainSearchInput): Promise<OpenBrainThought[]>;
listRecent(input: { limit: number; source?: string }): Promise<OpenBrainThought[]>;
updateThought(
id: string,
payload: { content?: string; metadata?: OpenBrainThoughtMetadata },
): Promise<OpenBrainThought>;
deleteThought(id: string): Promise<void>;
deleteThoughts(params: { source?: string; metadataId?: string }): Promise<void>;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function readString(record: Record<string, unknown>, key: string): string | undefined {
const value = record[key];
return typeof value === "string" ? value : undefined;
}
function readNumber(record: Record<string, unknown>, key: string): number | undefined {
const value = record[key];
return typeof value === "number" ? value : undefined;
}
function normalizeBaseUrl(baseUrl: string): string {
const normalized = baseUrl.trim().replace(/\/+$/, "");
if (normalized.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: baseUrl");
}
return normalized;
}
function normalizeApiKey(apiKey: string): string {
const normalized = apiKey.trim();
if (normalized.length === 0) {
throw new OpenBrainConfigError("Missing required OpenBrain config: apiKey");
}
return normalized;
}
function normalizeHeaders(headers: unknown): Record<string, string> {
if (headers === undefined) {
return {};
}
if (Array.isArray(headers)) {
const normalized: Record<string, string> = {};
for (const pair of headers) {
if (!Array.isArray(pair) || pair.length < 2) {
continue;
}
const key = pair[0];
const value = pair[1];
if (typeof key !== "string" || typeof value !== "string") {
continue;
}
normalized[key] = value;
}
return normalized;
}
if (headers instanceof Headers) {
const normalized: Record<string, string> = {};
for (const [key, value] of headers.entries()) {
normalized[key] = value;
}
return normalized;
}
if (!isRecord(headers)) {
return {};
}
const normalized: Record<string, string> = {};
for (const [key, value] of Object.entries(headers)) {
if (typeof value === "string") {
normalized[key] = value;
continue;
}
if (Array.isArray(value)) {
normalized[key] = value.join(", ");
}
}
return normalized;
}
async function readResponseBody(response: Response): Promise<string | undefined> {
try {
const body = await response.text();
return body.length > 0 ? body : undefined;
} catch {
return undefined;
}
}
export class OpenBrainClient implements OpenBrainClientLike {
private readonly baseUrl: string;
private readonly apiKey: string;
private readonly fetchImpl: typeof fetch;
constructor(options: OpenBrainClientOptions) {
this.baseUrl = normalizeBaseUrl(options.baseUrl);
this.apiKey = normalizeApiKey(options.apiKey);
this.fetchImpl = options.fetchImpl ?? fetch;
}
async createThought(input: OpenBrainThoughtInput): Promise<OpenBrainThought> {
const payload = await this.request<unknown>("/v1/thoughts", {
method: "POST",
body: JSON.stringify(input),
});
return this.extractThought(payload);
}
async search(input: OpenBrainSearchInput): Promise<OpenBrainThought[]> {
const payload = await this.request<unknown>("/v1/search", {
method: "POST",
body: JSON.stringify(input),
});
return this.extractThoughtArray(payload);
}
async listRecent(input: { limit: number; source?: string }): Promise<OpenBrainThought[]> {
const params = new URLSearchParams({
limit: String(input.limit),
});
if (input.source !== undefined && input.source.length > 0) {
params.set("source", input.source);
}
const payload = await this.request<unknown>(`/v1/thoughts/recent?${params.toString()}`, {
method: "GET",
});
return this.extractThoughtArray(payload);
}
async updateThought(
id: string,
payload: { content?: string; metadata?: OpenBrainThoughtMetadata },
): Promise<OpenBrainThought> {
const responsePayload = await this.request<unknown>(`/v1/thoughts/${encodeURIComponent(id)}`, {
method: "PATCH",
body: JSON.stringify(payload),
});
return this.extractThought(responsePayload);
}
async deleteThought(id: string): Promise<void> {
await this.request<unknown>(`/v1/thoughts/${encodeURIComponent(id)}`, {
method: "DELETE",
});
}
async deleteThoughts(params: { source?: string; metadataId?: string }): Promise<void> {
const query = new URLSearchParams();
if (params.source !== undefined && params.source.length > 0) {
query.set("source", params.source);
}
if (params.metadataId !== undefined && params.metadataId.length > 0) {
query.set("metadata_id", params.metadataId);
}
const suffix = query.size > 0 ? `?${query.toString()}` : "";
await this.request<unknown>(`/v1/thoughts${suffix}`, {
method: "DELETE",
});
}
private async request<T>(endpoint: string, init: RequestInit): Promise<T> {
const headers = normalizeHeaders(init.headers);
headers.Authorization = `Bearer ${this.apiKey}`;
if (init.body !== undefined && headers["Content-Type"] === undefined) {
headers["Content-Type"] = "application/json";
}
const url = `${this.baseUrl}${endpoint}`;
let response: Response;
try {
response = await this.fetchImpl(url, {
...init,
headers,
});
} catch (error) {
throw new OpenBrainRequestError({ endpoint, cause: error });
}
if (!response.ok) {
throw new OpenBrainHttpError({
endpoint,
status: response.status,
responseBody: await readResponseBody(response),
});
}
if (response.status === 204) {
return undefined as T;
}
const contentType = response.headers.get("content-type") ?? "";
if (!contentType.toLowerCase().includes("application/json")) {
return undefined as T;
}
return (await response.json()) as T;
}
private extractThoughtArray(payload: unknown): OpenBrainThought[] {
if (Array.isArray(payload)) {
return payload.map((item) => this.normalizeThought(item));
}
if (!isRecord(payload)) {
return [];
}
const candidates = [payload.thoughts, payload.data, payload.results, payload.items];
for (const candidate of candidates) {
if (Array.isArray(candidate)) {
return candidate.map((item) => this.normalizeThought(item));
}
}
return [];
}
private extractThought(payload: unknown): OpenBrainThought {
if (isRecord(payload)) {
const nested = payload.thought;
if (nested !== undefined) {
return this.normalizeThought(nested);
}
const data = payload.data;
if (data !== undefined && !Array.isArray(data)) {
return this.normalizeThought(data);
}
}
return this.normalizeThought(payload);
}
private normalizeThought(value: unknown): OpenBrainThought {
if (!isRecord(value)) {
return {
id: "",
content: "",
source: "",
metadata: undefined,
createdAt: undefined,
updatedAt: undefined,
score: undefined,
};
}
const metadataValue = value.metadata;
const metadata = isRecord(metadataValue)
? ({ ...metadataValue } as OpenBrainThoughtMetadata)
: undefined;
const id = readString(value, "id") ?? readString(value, "thought_id") ?? "";
const content =
readString(value, "content") ??
readString(value, "text") ??
(value.content === undefined ? "" : String(value.content));
const source = readString(value, "source") ?? "";
const createdAt = readString(value, "createdAt") ?? readString(value, "created_at");
const updatedAt = readString(value, "updatedAt") ?? readString(value, "updated_at");
const score = readNumber(value, "score");
return {
...value,
id,
content,
source,
metadata,
createdAt,
updatedAt,
score,
};
}
}
export { normalizeApiKey, normalizeBaseUrl };

View File

@@ -0,0 +1,128 @@
export type AgentMessageRole = "user" | "assistant" | "tool" | "system" | string;
export type AgentMessage = {
role: AgentMessageRole;
content: unknown;
timestamp?: number;
[key: string]: unknown;
};
export type AssembleResult = {
messages: AgentMessage[];
estimatedTokens: number;
systemPromptAddition?: string;
};
export type CompactResult = {
ok: boolean;
compacted: boolean;
reason?: string;
result?: {
summary?: string;
firstKeptEntryId?: string;
tokensBefore: number;
tokensAfter?: number;
details?: unknown;
};
};
export type IngestResult = {
ingested: boolean;
};
export type IngestBatchResult = {
ingestedCount: number;
};
export type BootstrapResult = {
bootstrapped: boolean;
importedMessages?: number;
reason?: string;
};
export type ContextEngineInfo = {
id: string;
name: string;
version?: string;
ownsCompaction?: boolean;
};
export type SubagentSpawnPreparation = {
rollback: () => void | Promise<void>;
};
export type SubagentEndReason = "deleted" | "completed" | "swept" | "released";
export interface ContextEngine {
readonly info: ContextEngineInfo;
bootstrap?(params: { sessionId: string; sessionFile: string }): Promise<BootstrapResult>;
ingest(params: {
sessionId: string;
message: AgentMessage;
isHeartbeat?: boolean;
}): Promise<IngestResult>;
ingestBatch?(params: {
sessionId: string;
messages: AgentMessage[];
isHeartbeat?: boolean;
}): Promise<IngestBatchResult>;
afterTurn?(params: {
sessionId: string;
sessionFile: string;
messages: AgentMessage[];
prePromptMessageCount: number;
autoCompactionSummary?: string;
isHeartbeat?: boolean;
tokenBudget?: number;
legacyCompactionParams?: Record<string, unknown>;
}): Promise<void>;
assemble(params: {
sessionId: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult>;
compact(params: {
sessionId: string;
sessionFile: string;
tokenBudget?: number;
force?: boolean;
currentTokenCount?: number;
compactionTarget?: "budget" | "threshold";
customInstructions?: string;
legacyParams?: Record<string, unknown>;
}): Promise<CompactResult>;
prepareSubagentSpawn?(params: {
parentSessionKey: string;
childSessionKey: string;
ttlMs?: number;
}): Promise<SubagentSpawnPreparation | undefined>;
onSubagentEnded?(params: {
childSessionKey: string;
reason: SubagentEndReason;
}): Promise<void>;
dispose?(): Promise<void>;
}
export type ContextEngineFactory = () => ContextEngine | Promise<ContextEngine>;
export type PluginLogger = {
debug?: (...args: unknown[]) => void;
info?: (...args: unknown[]) => void;
warn?: (...args: unknown[]) => void;
error?: (...args: unknown[]) => void;
};
export type OpenClawPluginApi = {
pluginConfig?: Record<string, unknown>;
logger?: PluginLogger;
registerContextEngine: (id: string, factory: ContextEngineFactory) => void;
};

View File

@@ -0,0 +1,414 @@
import { describe, expect, it, vi } from "vitest";
import { OpenBrainConfigError } from "../src/errors.js";
import { OpenBrainContextEngine } from "../src/engine.js";
import type { AgentMessage } from "../src/openclaw-types.js";
import type {
OpenBrainClientLike,
OpenBrainThought,
OpenBrainThoughtInput,
} from "../src/openbrain-client.js";
function makeThought(
id: string,
content: string,
sessionId: string,
role: string,
createdAt: string,
): OpenBrainThought {
return {
id,
content,
source: `openclaw:${sessionId}`,
metadata: {
sessionId,
role,
type: "message",
},
createdAt,
updatedAt: undefined,
score: undefined,
};
}
function makeMockClient(): OpenBrainClientLike {
return {
createThought: vi.fn(async (input: OpenBrainThoughtInput) => ({
id: `thought-${Math.random().toString(36).slice(2)}`,
content: input.content,
source: input.source,
metadata: input.metadata,
createdAt: new Date().toISOString(),
updatedAt: undefined,
score: undefined,
})),
search: vi.fn(async () => []),
listRecent: vi.fn(async () => []),
updateThought: vi.fn(async (id, payload) => ({
id,
content: payload.content ?? "",
source: "openclaw:session",
metadata: payload.metadata,
createdAt: new Date().toISOString(),
updatedAt: undefined,
score: undefined,
})),
deleteThought: vi.fn(async () => undefined),
deleteThoughts: vi.fn(async () => undefined),
};
}
const sessionId = "session-main";
const userMessage: AgentMessage = {
role: "user",
content: "What did we decide yesterday?",
timestamp: Date.now(),
};
describe("OpenBrainContextEngine", () => {
it("throws OpenBrainConfigError at bootstrap when baseUrl/apiKey are missing", async () => {
const engine = new OpenBrainContextEngine({});
await expect(
engine.bootstrap({
sessionId,
sessionFile: "/tmp/session.json",
}),
).rejects.toBeInstanceOf(OpenBrainConfigError);
});
it("ingests messages with session metadata", async () => {
const mockClient = makeMockClient();
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
source: "openclaw",
},
{
createClient: () => mockClient,
},
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.ingest({ sessionId, message: userMessage });
expect(result.ingested).toBe(true);
expect(mockClient.createThought).toHaveBeenCalledWith(
expect.objectContaining({
source: "openclaw:session-main",
metadata: expect.objectContaining({
sessionId,
role: "user",
type: "message",
turn: 0,
}),
}),
);
});
it("ingests batches and returns ingested count", async () => {
const mockClient = makeMockClient();
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.ingestBatch({
sessionId,
messages: [
{ role: "user", content: "one", timestamp: 1 },
{ role: "assistant", content: "two", timestamp: 2 },
],
});
expect(result.ingestedCount).toBe(2);
expect(mockClient.createThought).toHaveBeenCalledTimes(2);
});
it("ingests batches in parallel chunks of five", async () => {
const mockClient = makeMockClient();
let inFlight = 0;
let maxInFlight = 0;
let createdCount = 0;
vi.mocked(mockClient.createThought).mockImplementation(async (input: OpenBrainThoughtInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => {
setTimeout(resolve, 20);
});
inFlight -= 1;
createdCount += 1;
return {
id: `thought-${createdCount}`,
content: input.content,
source: input.source,
metadata: input.metadata,
createdAt: new Date().toISOString(),
updatedAt: undefined,
score: undefined,
};
});
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.ingestBatch({
sessionId,
messages: Array.from({ length: 10 }, (_, index) => ({
role: index % 2 === 0 ? "user" : "assistant",
content: `message-${index + 1}`,
timestamp: index + 1,
})),
});
expect(result.ingestedCount).toBe(10);
expect(maxInFlight).toBe(5);
expect(mockClient.createThought).toHaveBeenCalledTimes(10);
});
it("assembles context from recent + semantic search, deduped and budget-aware", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "recent user context", sessionId, "user", "2026-03-06T12:00:00.000Z"),
makeThought(
"t2",
"recent assistant context",
sessionId,
"assistant",
"2026-03-06T12:01:00.000Z",
),
]);
vi.mocked(mockClient.search).mockResolvedValue([
makeThought(
"t2",
"recent assistant context",
sessionId,
"assistant",
"2026-03-06T12:01:00.000Z",
),
makeThought("t3", "semantic match", sessionId, "assistant", "2026-03-06T12:02:00.000Z"),
]);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
recentMessages: 10,
semanticSearchLimit: 10,
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.assemble({
sessionId,
messages: [
{
role: "user",
content: "Find the semantic context",
timestamp: Date.now(),
},
],
tokenBudget: 40,
});
expect(mockClient.search).toHaveBeenCalledWith(
expect.objectContaining({
query: "Find the semantic context",
limit: 10,
}),
);
expect(result.estimatedTokens).toBeLessThanOrEqual(40);
expect(result.messages.map((message) => String(message.content))).toEqual([
"recent user context",
"recent assistant context",
"semantic match",
]);
});
it("compact archives a summary thought and deletes summarized inputs", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue(
Array.from({ length: 12 }, (_, index) => {
return makeThought(
`t${index + 1}`,
`message ${index + 1}`,
sessionId,
index % 2 === 0 ? "user" : "assistant",
`2026-03-06T12:${String(index).padStart(2, "0")}:00.000Z`,
);
}),
);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.compact({
sessionId,
sessionFile: "/tmp/session.json",
tokenBudget: 128,
});
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(mockClient.createThought).toHaveBeenCalledWith(
expect.objectContaining({
source: "openclaw:session-main",
metadata: expect.objectContaining({
sessionId,
type: "summary",
}),
}),
);
const deletedIds = vi
.mocked(mockClient.deleteThought)
.mock.calls.map(([id]) => id)
.sort((left, right) => left.localeCompare(right));
expect(deletedIds).toEqual([
"t10",
"t11",
"t12",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
]);
});
it("stops trimming once the newest message exceeds budget", async () => {
const mockClient = makeMockClient();
const oversizedNewest = "z".repeat(400);
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "small older message", sessionId, "assistant", "2026-03-06T12:00:00.000Z"),
makeThought("t2", oversizedNewest, sessionId, "assistant", "2026-03-06T12:01:00.000Z"),
]);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const result = await engine.assemble({
sessionId,
messages: [
{
role: "user",
content: "query",
timestamp: Date.now(),
},
],
tokenBudget: 12,
});
expect(result.messages.map((message) => String(message.content))).toEqual([oversizedNewest]);
});
it("prepares subagent spawn and rollback deletes seeded context", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent).mockResolvedValue([
makeThought("t1", "parent context", sessionId, "assistant", "2026-03-06T12:00:00.000Z"),
]);
vi.mocked(mockClient.createThought).mockResolvedValue({
id: "seed-thought",
content: "seed",
source: "openclaw:child",
metadata: undefined,
createdAt: "2026-03-06T12:01:00.000Z",
updatedAt: undefined,
score: undefined,
});
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
const prep = await engine.prepareSubagentSpawn({
parentSessionKey: sessionId,
childSessionKey: "child-session",
});
expect(prep).toBeDefined();
expect(mockClient.createThought).toHaveBeenCalledWith(
expect.objectContaining({
source: "openclaw:child-session",
}),
);
await prep?.rollback();
expect(mockClient.deleteThought).toHaveBeenCalledWith("seed-thought");
});
it("stores child outcome back into parent on subagent end", async () => {
const mockClient = makeMockClient();
vi.mocked(mockClient.listRecent)
.mockResolvedValueOnce([
makeThought("p1", "parent context", sessionId, "assistant", "2026-03-06T12:00:00.000Z"),
])
.mockResolvedValueOnce([
makeThought("c1", "child result detail", "child-session", "assistant", "2026-03-06T12:05:00.000Z"),
]);
const engine = new OpenBrainContextEngine(
{
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
{ createClient: () => mockClient },
);
await engine.bootstrap({ sessionId, sessionFile: "/tmp/session.json" });
await engine.prepareSubagentSpawn({
parentSessionKey: sessionId,
childSessionKey: "child-session",
});
await engine.onSubagentEnded({
childSessionKey: "child-session",
reason: "completed",
});
expect(mockClient.createThought).toHaveBeenLastCalledWith(
expect.objectContaining({
source: "openclaw:session-main",
metadata: expect.objectContaining({
type: "subagent-result",
sessionId,
}),
}),
);
});
});

View File

@@ -0,0 +1,81 @@
import { describe, expect, it, vi } from "vitest";
import { OpenBrainConfigError, OpenBrainHttpError } from "../src/errors.js";
import { OpenBrainClient } from "../src/openbrain-client.js";
function jsonResponse(body: unknown, init?: ResponseInit): Response {
return new Response(JSON.stringify(body), {
status: init?.status ?? 200,
headers: {
"content-type": "application/json",
...(init?.headers ?? {}),
},
});
}
describe("OpenBrainClient", () => {
it("sends bearer auth and normalized URL for createThought", async () => {
const fetchMock = vi.fn(async () =>
jsonResponse({
id: "thought-1",
content: "hello",
source: "openclaw:main",
}),
);
const client = new OpenBrainClient({
baseUrl: "https://brain.example.com/",
apiKey: "secret",
fetchImpl: fetchMock as unknown as typeof fetch,
});
await client.createThought({
content: "hello",
source: "openclaw:main",
metadata: { sessionId: "session-1" },
});
expect(fetchMock).toHaveBeenCalledTimes(1);
const firstCall = fetchMock.mock.calls[0];
expect(firstCall).toBeDefined();
if (firstCall === undefined) {
throw new Error("Expected fetch call arguments");
}
const [url, init] = firstCall as unknown as [string, RequestInit];
expect(url).toBe("https://brain.example.com/v1/thoughts");
expect(init.method).toBe("POST");
expect(init.headers).toMatchObject({
Authorization: "Bearer secret",
"Content-Type": "application/json",
});
});
it("throws OpenBrainHttpError on non-2xx responses", async () => {
const fetchMock = vi.fn(async () =>
jsonResponse({ error: "unauthorized" }, { status: 401 }),
);
const client = new OpenBrainClient({
baseUrl: "https://brain.example.com",
apiKey: "secret",
fetchImpl: fetchMock as unknown as typeof fetch,
});
await expect(client.listRecent({ limit: 5, source: "openclaw:main" })).rejects.toBeInstanceOf(
OpenBrainHttpError,
);
await expect(client.listRecent({ limit: 5, source: "openclaw:main" })).rejects.toMatchObject({
status: 401,
});
});
it("throws OpenBrainConfigError when initialized without baseUrl or apiKey", () => {
expect(
() => new OpenBrainClient({ baseUrl: "", apiKey: "secret", fetchImpl: fetch }),
).toThrow(OpenBrainConfigError);
expect(
() => new OpenBrainClient({ baseUrl: "https://brain.example.com", apiKey: "", fetchImpl: fetch }),
).toThrow(OpenBrainConfigError);
});
});

View File

@@ -0,0 +1,30 @@
import { describe, expect, it, vi } from "vitest";
import { OPENBRAIN_CONTEXT_ENGINE_ID, register } from "../src/index.js";
describe("plugin register()", () => {
it("registers the openbrain context engine factory", async () => {
const registerContextEngine = vi.fn();
register({
registerContextEngine,
pluginConfig: {
baseUrl: "https://brain.example.com",
apiKey: "secret",
},
logger: {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
});
expect(registerContextEngine).toHaveBeenCalledTimes(1);
const [id, factory] = registerContextEngine.mock.calls[0] as [string, () => Promise<unknown> | unknown];
expect(id).toBe(OPENBRAIN_CONTEXT_ENGINE_ID);
const engine = await factory();
expect(engine).toHaveProperty("info.id", OPENBRAIN_CONTEXT_ENGINE_ID);
});
});

View File

@@ -0,0 +1,9 @@
import { describe, expect, it } from "vitest";
import { OPENBRAIN_CONTEXT_ENGINE_ID } from "../src/index.js";
describe("project scaffold", () => {
it("exports openbrain context engine id", () => {
expect(OPENBRAIN_CONTEXT_ENGINE_ID).toBe("openbrain");
});
});

View File

@@ -0,0 +1,24 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"lib": ["ES2022"],
"strict": true,
"noUncheckedIndexedAccess": true,
"exactOptionalPropertyTypes": true,
"isolatedModules": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "dist",
"rootDir": ".",
"types": ["node", "vitest/globals"],
"skipLibCheck": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*.ts", "tests/**/*.ts"],
"exclude": ["dist", "node_modules"]
}

2229
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff