feat(fleet): native Mosaic backlog on @mosaicstack/db (atomic claim + TTL) (#657)
Some checks failed
ci/woodpecker/push/ci-image Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was canceled

This commit was merged in pull request #657.
This commit is contained in:
2026-06-24 14:55:10 +00:00
parent 61b1bdac2a
commit f852250419
14 changed files with 4906 additions and 5 deletions

View File

@@ -30,10 +30,12 @@ INSTALL_MODE="${MOSAIC_INSTALL_MODE:-prompt}"
# own fleet files MUST
# survive `mosaic update` (which runs this sync automatically): the active
# roster (`fleet/roster.yaml` + any other `fleet/*.yaml`), per-agent env
# (`fleet/agents/`), and heartbeat run dir (`fleet/run/`). Without these, an
# update wipes the operator's fleet. Glob entries are honored by both the rsync
# path (`--exclude`) and the glob-aware cp fallback below.
PRESERVE_PATHS=("CONSTITUTION.md" "AGENTS.md" "SOUL.md" "USER.md" "TOOLS.md" "STANDARDS.md" "memory" "sources" "credentials" "fleet/*.yaml" "fleet/agents" "fleet/run")
# (`fleet/agents/`), heartbeat run dir (`fleet/run/`), and the Mosaic-native
# backlog-of-record store (`fleet/backlog/` — embedded PGlite data dir; see
# packages/mosaic/src/commands/fleet-backlog.ts). Without these, an update
# wipes the operator's fleet AND their backlog. Glob entries are honored by
# both the rsync path (`--exclude`) and the glob-aware cp fallback below.
PRESERVE_PATHS=("CONSTITUTION.md" "AGENTS.md" "SOUL.md" "USER.md" "TOOLS.md" "STANDARDS.md" "memory" "sources" "credentials" "fleet/*.yaml" "fleet/agents" "fleet/run" "fleet/backlog")
# Framework-owned contract files: re-copied from defaults/ on every upgrade (the
# user must not edit them; a divergent copy is backed up once before overwrite).

View File

@@ -29,6 +29,7 @@
"dependencies": {
"@mosaicstack/brain": "workspace:*",
"@mosaicstack/config": "workspace:*",
"@mosaicstack/db": "workspace:*",
"@mosaicstack/forge": "workspace:*",
"@mosaicstack/log": "workspace:*",
"@mosaicstack/macp": "workspace:*",

View File

@@ -0,0 +1,285 @@
/**
* `mosaic fleet backlog <sub> --json` — Mosaic-native backlog of record.
*
* Mosaic OWNS this backlog end-to-end on its existing Postgres storage layer
* (`@mosaicstack/db`). It REPLACES the former Hermes adapter — there is NO
* runtime dependency on Hermes.
*
* Storage tier (the existing storage-layer convention, no new engine):
* - default: embedded PGlite at <mosaicHome>/fleet/backlog (real Postgres
* semantics, persisted on disk so the operator's backlog survives reboots
* and `mosaic update` — see install.sh PRESERVE_PATHS).
* - DATABASE_URL set: full server Postgres — same code, no change.
*
* Migrations run on first use so the `backlog` table always exists.
*/
import { mkdir } from 'node:fs/promises';
import { homedir } from 'node:os';
import { join } from 'node:path';
import type { Command } from 'commander';
import {
BacklogService,
DEFAULT_CLAIM_TTL_SECONDS,
type BacklogCard,
type DbHandle,
} from '@mosaicstack/db';
function defaultMosaicHome(): string {
return process.env['MOSAIC_HOME'] ?? join(homedir(), '.config', 'mosaic');
}
/** Resolve where the embedded PGlite backlog store lives (default tier). */
export function defaultBacklogDataDir(mosaicHome = defaultMosaicHome()): string {
return join(mosaicHome, 'fleet', 'backlog');
}
/**
* Open a db handle for the backlog and ensure the schema exists.
*
* Tier detection mirrors the storage layer: DATABASE_URL => server Postgres
* (migrations applied via runMigrations); otherwise embedded PGlite at the
* fleet/backlog data dir (migrations applied via runPgliteMigrations).
*/
async function openBacklogDb(mosaicHome: string): Promise<DbHandle> {
const { createDb, createPgliteDb, runMigrations, runPgliteMigrations } =
await import('@mosaicstack/db');
const url = process.env['DATABASE_URL'];
if (url) {
await runMigrations(url);
return createDb(url);
}
const dataDir = process.env['PGLITE_DATA_DIR'] ?? defaultBacklogDataDir(mosaicHome);
// PGlite writes a file-backed store to dataDir but does not create missing
// parent directories (e.g. <mosaicHome>/fleet). Create them first. Skip for
// the in-memory pseudo-paths so a memory:// store never touches the fs.
if (!dataDir.startsWith('memory://') && dataDir !== ':memory:') {
await mkdir(dataDir, { recursive: true });
}
const handle = createPgliteDb(dataDir);
await runPgliteMigrations(handle);
return handle;
}
function parseDependsOn(value?: string): string[] {
if (!value) return [];
return value
.split(',')
.map((s) => s.trim())
.filter((s) => s.length > 0);
}
function parseAcceptance(value?: string): unknown {
if (!value) return null;
try {
return JSON.parse(value);
} catch {
// Fall back to a list of newline/semicolon-separated criteria.
return value
.split(/[\n;]/)
.map((s) => s.trim())
.filter((s) => s.length > 0);
}
}
function printCard(card: BacklogCard | null, json?: boolean): void {
if (json) {
console.log(JSON.stringify(card));
return;
}
if (!card) {
console.log('(none)');
return;
}
const deps = card.dependsOn.length ? card.dependsOn.join(',') : '-';
console.log(
`${card.id}\t[${card.status}]\tp=${card.priority}\tphase=${card.phase ?? '-'}\tdeps=${deps}\t${card.title}`,
);
}
function printCards(cards: BacklogCard[], json?: boolean): void {
if (json) {
console.log(JSON.stringify(cards));
return;
}
if (cards.length === 0) {
console.log('(no cards)');
return;
}
for (const card of cards) printCard(card, false);
}
/**
* Register `backlog` under an existing `fleet` command.
* `mosaicHomeFor` resolves the active --mosaic-home (parent flag) at call time.
*/
export function registerFleetBacklogCommand(
fleetCmd: Command,
mosaicHomeFor: () => string,
): Command {
const backlogCmd = fleetCmd
.command('backlog')
.description('Mosaic-native backlog of record (atomic claim + TTL, deps DAG)');
const withSvc = async <T>(fn: (svc: BacklogService) => Promise<T>): Promise<T> => {
const handle = await openBacklogDb(mosaicHomeFor());
try {
return await fn(new BacklogService(handle.db));
} finally {
await handle.close();
}
};
backlogCmd
.command('create')
.description('Create a backlog card (idempotency_key dedups)')
.requiredOption('--id <id>', 'Stable card id')
.requiredOption('--title <title>', 'Card title')
.option('--body <body>', 'Card body / description')
.option('--phase <phase>', 'Board/phase grouping')
.option('--priority <n>', 'Priority (higher = sooner)', (v) => parseInt(v, 10), 0)
.option('--depends-on <ids>', 'Comma-separated dependency card ids')
.option('--acceptance <json>', 'Acceptance criteria (JSON or ;/newline list)')
.option('--idempotency-key <key>', 'Dedup key; repeat returns the existing card')
.option('--json', 'Print JSON')
.action(
async (opts: {
id: string;
title: string;
body?: string;
phase?: string;
priority: number;
dependsOn?: string;
acceptance?: string;
idempotencyKey?: string;
json?: boolean;
}) => {
const card = await withSvc((svc) =>
svc.create({
id: opts.id,
title: opts.title,
body: opts.body ?? null,
phase: opts.phase ?? null,
priority: opts.priority,
dependsOn: parseDependsOn(opts.dependsOn),
acceptance: parseAcceptance(opts.acceptance),
idempotencyKey: opts.idempotencyKey ?? null,
}),
);
printCard(card, opts.json);
},
);
backlogCmd
.command('list')
.description('List cards (filters: --status, --phase, --ready-only)')
.option('--status <status>', 'Filter by status: ready|claimed|blocked|done')
.option('--phase <phase>', 'Filter by phase')
.option('--ready-only', 'Only cards that are ready AND have all deps done')
.option('--json', 'Print JSON')
.action(
async (opts: {
status?: BacklogCard['status'];
phase?: string;
readyOnly?: boolean;
json?: boolean;
}) => {
const cards = await withSvc((svc) =>
svc.list({
...(opts.status ? { status: opts.status } : {}),
...(opts.phase ? { phase: opts.phase } : {}),
...(opts.readyOnly ? { readyOnly: true } : {}),
}),
);
printCards(cards, opts.json);
},
);
backlogCmd
.command('claim')
.description('Atomically claim the highest-priority ready card (FOR UPDATE SKIP LOCKED)')
.requiredOption('--owner <owner>', 'Claim owner (worker/agent id)')
.option(
'--ttl <sec>',
'Claim TTL in seconds',
(v) => parseInt(v, 10),
DEFAULT_CLAIM_TTL_SECONDS,
)
.option('--id <id>', 'Claim a specific card by id')
.option('--json', 'Print JSON')
.action(async (opts: { owner: string; ttl: number; id?: string; json?: boolean }) => {
const card = await withSvc((svc) =>
svc.claim({ owner: opts.owner, ttlSeconds: opts.ttl, ...(opts.id ? { id: opts.id } : {}) }),
);
printCard(card, opts.json);
if (!card && !opts.json) process.exitCode = 0;
});
backlogCmd
.command('reclaim')
.description('Release expired claims back to ready (or a specific --id)')
.option('--id <id>', 'Release a specific card regardless of expiry')
.option('--json', 'Print JSON')
.action(async (opts: { id?: string; json?: boolean }) => {
const result = await withSvc((svc) => svc.reclaim(opts.id ? { id: opts.id } : {}));
if (opts.json) {
console.log(JSON.stringify(result));
} else if (result.reclaimed.length === 0) {
console.log('(nothing to reclaim)');
} else {
console.log(`reclaimed: ${result.reclaimed.join(', ')}`);
}
});
backlogCmd
.command('link')
.description('Add a depends_on edge (--from depends on --to)')
.requiredOption('--from <id>', 'Card that gains the dependency')
.requiredOption('--to <id>', 'Card it now depends on')
.option('--json', 'Print JSON')
.action(async (opts: { from: string; to: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.link(opts.from, opts.to));
printCard(card, opts.json);
});
backlogCmd
.command('stats')
.description('Counts by status, oldest-ready age, expired-claim count')
.option('--json', 'Print JSON')
.action(async (opts: { json?: boolean }) => {
const stats = await withSvc((svc) => svc.stats());
if (opts.json) {
console.log(JSON.stringify(stats));
return;
}
console.log(`total: ${stats.total}`);
console.log(
`ready=${stats.counts.ready} claimed=${stats.counts.claimed} ` +
`blocked=${stats.counts.blocked} done=${stats.counts.done}`,
);
console.log(`oldest-ready-age: ${stats.oldestReadyAgeSeconds ?? '-'}s`);
console.log(`expired-claims: ${stats.expiredClaimCount}`);
});
backlogCmd
.command('block')
.description('Mark a card blocked')
.requiredOption('--id <id>', 'Card id')
.option('--json', 'Print JSON')
.action(async (opts: { id: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.block(opts.id));
printCard(card, opts.json);
});
backlogCmd
.command('complete')
.description('Mark a card done')
.requiredOption('--id <id>', 'Card id')
.option('--json', 'Print JSON')
.action(async (opts: { id: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.complete(opts.id));
printCard(card, opts.json);
});
return backlogCmd;
}

View File

@@ -78,6 +78,7 @@ describe('registerFleetCommand', () => {
expect(fleet).toBeDefined();
expect(fleet!.commands.map((command) => command.name()).sort()).toEqual([
'add',
'backlog',
'init',
'install',
'install-systemd',
@@ -91,6 +92,24 @@ describe('registerFleetCommand', () => {
]);
});
it('registers the backlog subcommand with its operations', () => {
const program = buildProgram();
const fleet = program.commands.find((command) => command.name() === 'fleet');
const backlog = fleet!.commands.find((command) => command.name() === 'backlog');
expect(backlog).toBeDefined();
expect(backlog!.commands.map((command) => command.name()).sort()).toEqual([
'block',
'claim',
'complete',
'create',
'link',
'list',
'reclaim',
'stats',
]);
});
it('adds fleet-backed agent subcommands without removing existing options', () => {
const program = buildProgram();
const agent = program.commands.find((command) => command.name() === 'agent');

View File

@@ -8,6 +8,7 @@ import * as readline from 'node:readline';
import type { Command } from 'commander';
import YAML from 'yaml';
import { resolveCommsBlock } from '../fleet/comms-onboarding.js';
import { registerFleetBacklogCommand } from './fleet-backlog.js';
/**
* A function that spawns a command with inherited stdio (TTY passthrough).
@@ -1700,6 +1701,11 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
console.log(`Removed ${name} from the fleet.`);
});
// Mosaic-native backlog of record (card A4). Resolves the active --mosaic-home
// (parent flag) at call time so the embedded PGlite store lands under the same
// fleet/ directory as the roster and heartbeats.
registerFleetBacklogCommand(cmd, () => cmd.opts<{ mosaicHome: string }>().mosaicHome);
return cmd;
}