Compare commits

..

1 Commits

Author SHA1 Message Date
Jason Woltje
96a9892717 chore(release): mosaic CLI 0.0.43
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
2026-06-24 00:56:40 -05:00
16 changed files with 94 additions and 5039 deletions

View File

@@ -1,138 +0,0 @@
# Fleet Backlog Conventions
The **backlog** is Mosaic's native backlog-of-record for fleet work. It is built
end-to-end on Mosaic's own storage layer (`@mosaicstack/db`, drizzle/Postgres)
and surfaced as `mosaic fleet backlog <sub> --json`.
> **Mosaic-native, no Hermes.** This backlog REPLACES the former Hermes adapter.
> There is **no** runtime dependency on Hermes, `hermes kanban`, or `~/.hermes`
> anywhere in this feature. Anything previously delegated to Hermes is recreated
> here on Mosaic's own Postgres storage layer.
## Storage tier — PGlite by default, Postgres by config
The backlog uses the existing Mosaic storage layer; there is **no** new database
engine (no sqlite, no raw client).
| Condition | Tier | Data location |
| ------------------------------ | -------------------- | -------------------------------- |
| `DATABASE_URL` set | Full server Postgres | the configured database |
| `PGLITE_DATA_DIR` set (no URL) | Embedded PGlite | that directory |
| neither (default) | Embedded PGlite | `~/.config/mosaic/fleet/backlog` |
PGlite is real Postgres semantics in-process — including the row locks the atomic
claim relies on — so the **same code** runs on a laptop (embedded, single-host
default) and on a full Postgres deployment. Switching tiers is config-only.
The schema (`backlog` table) is created automatically on first CLI use:
`runMigrations()` for Postgres, `runPgliteMigrations()` for embedded PGlite.
### Update safety
The embedded PGlite store lives under `~/.config/mosaic/fleet/backlog`, which is
listed in `PRESERVE_PATHS` in `packages/mosaic/framework/install.sh`. This means
`mosaic update` (which runs the framework sync with `rsync --delete`) will **not**
wipe the operator's backlog — same protection as the roster, per-agent env, and
heartbeat run dir.
## Card schema
A card is one row in the `backlog` table:
| Column | Type | Notes |
| ------------------- | ------------------- | ------------------------------------------------------------- |
| `id` | text (PK) | Stable, caller-supplied id (e.g. `A4`, `fleet-001`). |
| `title` | text | Required. |
| `body` | text (nullable) | Free-form description. |
| `phase` | text (nullable) | Board/phase grouping (see below). |
| `priority` | int (default 0) | **Higher = sooner.** Claim picks the max-priority ready card. |
| `status` | enum | `ready` \| `claimed` \| `blocked` \| `done`. |
| `depends_on` | jsonb `string[]` | DAG edges — ids of cards this one depends on. |
| `claim_owner` | text (nullable) | Owner token of the active claim. |
| `claim_ttl_seconds` | int (nullable) | TTL of the active claim. |
| `claimed_at` | timestamptz (null) | When the claim was taken. `claimed_at + ttl` = expiry. |
| `attempts` | int (default 0) | Incremented each time the card is claimed. |
| `idempotency_key` | text (unique, null) | Dedups `create`; NULLs are distinct in Postgres. |
| `acceptance` | jsonb (nullable) | Acceptance criteria (array of strings or object). |
| `created_at` | timestamptz | |
| `updated_at` | timestamptz | |
`depends_on` is modeled as a `jsonb` array column rather than a separate edge
table. Justification: it matches the repo's existing style (e.g. `tasks.tags`,
`agents.skills`, `routing_rules.conditions` are all jsonb arrays), keeps a card
self-contained, and the DAG is small (per-card dependency lists), so a join table
would add ceremony without benefit.
### Board / phase convention
`phase` is a free-form grouping string used as the board column / milestone label
(e.g. `M1`, `fleet`, `infra`). `list --phase <phase>` filters to one board lane.
`priority` orders cards **within** the ready pool regardless of phase.
## Status lifecycle
```
create
┌──────► ready ───── claim ─────► claimed ───── complete ─────► done
│ │ │
│ block reclaim (TTL expiry or --id)
│ ▼ │
│ blocked └──────────────────────────┘ (back to ready)
└──────────┘ (reclaim / re-create can return a card to ready)
```
- **ready** — eligible to be claimed once every `depends_on` card is `done`.
- **claimed** — a worker holds it; `claim_owner` + `claimed_at` set.
- **blocked** — explicitly parked; never auto-claimed.
- **done** — completed; satisfies dependents.
## Atomic claim (`FOR UPDATE SKIP LOCKED`) + TTL
`claim` is atomic. Inside a single transaction it locks candidate `ready` rows
with `SELECT ... FOR UPDATE SKIP LOCKED` (via the drizzle `sql` operator), picks
the highest-priority deps-satisfied card, and flips it to `claimed`. Because a row
already locked by a concurrent claimer is **skipped**, two claimers can **never**
both win the same card — the loser falls through to the next candidate or gets
`null`. (Proven by the concurrency tests in `packages/db/src/backlog.spec.ts`.)
- **Deps gate:** a card is only claimable when every id in `depends_on` is `done`.
- **TTL:** `claim --ttl <sec>` (default **900s**) records `claim_ttl_seconds`.
- **reclaim:** releases claims whose `claimed_at + ttl` is in the past (expired)
back to `ready`, clearing the claim fields. `reclaim --id <id>` force-releases a
specific card regardless of expiry. This is how a crashed worker's card returns
to the pool.
## CLI — `mosaic fleet backlog <sub> --json`
All subcommands support `--json`.
| Subcommand | Purpose |
| --------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------- |
| `create --id --title [--body --phase --priority --depends-on --acceptance --idempotency-key]` | Create a card; `idempotency_key` dedups (repeat returns the existing card). |
| `list [--status --phase --ready-only]` | List cards. `--ready-only` = status `ready` AND all deps `done`. |
| `claim --owner [--ttl <sec> --id <id>]` | Atomically claim the highest-priority ready card (or `--id`). Returns the card or `null`. |
| `reclaim [--id <id>]` | Release expired claims (or a specific card) back to `ready`. |
| `link --from --to` | Add a `depends_on` edge (`--from` depends on `--to`). |
| `stats` | Counts by status, oldest-ready age, expired-claim count. |
| `block --id` | Set a card to `blocked`. |
| `complete --id` | Set a card to `done` (releases any claim). |
### Example
```sh
# Seed two cards, the second depends on the first.
mosaic fleet backlog create --id A1 --title "schema" --priority 5
mosaic fleet backlog create --id A2 --title "service" --depends-on A1 --priority 9
# A2 is gated on A1, so claim returns A1 first.
mosaic fleet backlog claim --owner worker-1 --ttl 600 --json
# Finish A1; now A2 is ready.
mosaic fleet backlog complete --id A1
mosaic fleet backlog list --ready-only --json
# Recover stalled work.
mosaic fleet backlog reclaim --json
```

View File

@@ -1,53 +0,0 @@
# H1b — tmux pane idle signal wiring
## Objective
Feed `classifyReadiness()` a real idle signal on tmux 3.4 by deriving `idleSeconds` from the first available tmux timestamp source: pane activity, then window activity, then session activity.
## Scope
- `packages/mosaic/src/commands/fleet.ts`
- Extend `buildTmuxListPanesCommand()` format to include `#{window_activity}` and `#{session_activity}` after the existing fields.
- Update `parseTmuxListPanes()` to choose the first non-empty finite positive timestamp and clamp future idle values to 0.
- `packages/mosaic/src/commands/fleet.spec.ts`
- Cover pane/window/session activity parsing behavior, empty-field index alignment, null idle, future clamping, math correctness, and exact tmux format.
## Out of Scope
- No changes to `classifyReadiness()`, thresholds, `AgentPsRow`, or `fleet ps` rendering.
- No merge by worker; orchestrator routes review/merge.
- Workers do not modify `docs/TASKS.md`.
## PRD Alignment
Aligned with `docs/fleet/PRD.md` FR-1 and acceptance criteria for truthful `mosaic fleet ps` pane/pid/idle observability.
## Plan
1. Sync branch from latest `origin/main` and install dependencies with required pnpm env.
2. Add/confirm reproducer tests for tmux 3.4 empty `pane_activity` and new fallback behavior.
3. Implement the focused parser/format change only.
4. Run required build, baseline gates, fleet vitest, and independent review.
5. Run pre-push queue guard, push branch, and open PR to `main` with Mosaic wrapper.
## Progress
- 2026-06-24: Branch `fix/fleet-pane-idle-activity` created from `origin/main` @ `ec8dd7c` after fetching.
- 2026-06-24: Session-start generated local `.mosaic/orchestrator/*` changes on the previous release branch; stashed as `coder1 session-start state before H1b` to keep this branch clean.
- 2026-06-24: Added TDD coverage for the tmux 3.4 production case (`pane_activity` empty, `window_activity` populated), exact new list-panes format, null/future/multiple-source behavior.
- 2026-06-24: Implemented parser fallback without changing readiness classifier thresholds or render shape.
## Verification Evidence
- `pnpm install --store-dir "$HOME/.pnpm-store"` — pass.
- Reproducer before implementation: `pnpm --filter @mosaicstack/mosaic exec vitest run src/commands/fleet.spec.ts` — failed as expected (old format, no fallback, negative future idle).
- `npx turbo build --filter=@mosaicstack/mosaic^...` — pass, 12/12 tasks successful.
- `pnpm typecheck` — pass, 41/41 tasks successful.
- `pnpm lint` — pass, 23/23 tasks successful.
- `pnpm format:check` — pass, all matched files use Prettier style.
- `pnpm --filter @mosaicstack/mosaic exec vitest run src/commands/fleet.spec.ts` — pass, 176 tests.
- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` — approve, 0 findings (reviewed supplied diff; sandbox file-inspection limitation noted by tool).
## Risks / Blockers
- No current blocker.

View File

@@ -1,70 +0,0 @@
# H2 — readiness semantics: available, not stuck
## Objective
Correct fleet readiness semantics so a healthy long-idle agent is reported as `available` (good/assignable) instead of `stuck` (fault). Reserve `stuck` in the type/JSON value space for future positive block evidence.
## Scope
- `packages/mosaic/src/commands/fleet.ts`
- replace `idle` readiness state with `available`
- keep `stuck` in the union but stop emitting it from idle-only heuristics
- remove stuck threshold helper/env handling
- remove IDLE/STUCK alarm flags from table rendering
- `packages/mosaic/src/commands/fleet.spec.ts`
- update classifier branch/boundary tests
- assert very long idle maps to `available`, not `stuck`
- update table/JSON assertions for available with no alarm flags
- remove stuck threshold helper tests
## Acceptance Criteria
- `classifyReadiness()` remains pure/total/never-throw and maps:
- dead/stale/unknown unchanged
- busy/null/undefined/non-finite idle to `working`
- idle >= activity threshold to `available`
- idle < activity threshold to `working`
- No idle-derived path emits `stuck`.
- `MOSAIC_HEARTBEAT_IDLE_THRESHOLD` remains backward compatible as the working→available activity threshold.
- `MOSAIC_HEARTBEAT_STUCK_THRESHOLD` and helper/default are removed.
- `fleet ps` keeps the idle-seconds column header `IDLE`, renders `available` in HB label, and does not add IDLE/STUCK warning flags.
- Local gates green: build precheck, typecheck, lint, format:check, fleet vitest.
- PR opened against `main`; no merge by worker.
## Constraints / Assumptions
- Source branch: `origin/main` @ `1020cfa`.
- `docs/TASKS.md` is orchestrator-owned; worker will not modify it.
- Documentation impact is captured in this scratchpad and PR description; no user/admin guide behavior beyond CLI readiness label semantics.
## Plan
1. Install dependencies with requested PNPM environment.
2. Inspect current H1/H1b readiness implementation and tests.
3. Update classifier types/helpers/rendering.
4. Update focused tests.
5. Run build precheck + required gates.
6. Run automated code review, remediate any findings.
7. Queue guard, push, open PR.
## Progress
- 2026-06-24: Branch created from `origin/main` @ `1020cfa`.
- 2026-06-24: Replaced idle-derived `idle`/`stuck` outputs with `available`; retained `stuck` in type union for future positive block evidence.
- 2026-06-24: Removed stuck threshold env/helper plumbing and IDLE/STUCK alarm flags.
- 2026-06-24: Updated classifier and table-render tests for available semantics.
## Verification Evidence
- `pnpm install --store-dir "$HOME/.pnpm-store"` — pass.
- `npx turbo build --filter=@mosaicstack/mosaic^...` — pass, 12/12 tasks successful.
- `pnpm typecheck` — pass, 41/41 tasks successful.
- `pnpm lint` — pass, 23/23 tasks successful.
- `pnpm format:check` — pass, all matched files use Prettier style.
- `pnpm --filter @mosaicstack/mosaic exec vitest run src/commands/fleet.spec.ts` — pass, 177 tests.
- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` — approve, 0 findings (reviewed supplied diff; sandbox file-inspection limitation noted by tool).
## Risks / Blockers
- No current blocker.
- Review tool could not inspect repo files directly due sandbox wrapper limitation, but it reviewed the supplied diff and approved with no findings.

View File

@@ -1,22 +0,0 @@
CREATE TYPE "public"."backlog_status" AS ENUM('ready', 'claimed', 'blocked', 'done');--> statement-breakpoint
CREATE TABLE "backlog" (
"id" text PRIMARY KEY NOT NULL,
"title" text NOT NULL,
"body" text,
"phase" text,
"priority" integer DEFAULT 0 NOT NULL,
"status" "backlog_status" DEFAULT 'ready' NOT NULL,
"depends_on" jsonb DEFAULT '[]'::jsonb NOT NULL,
"claim_owner" text,
"claim_ttl_seconds" integer,
"claimed_at" timestamp with time zone,
"attempts" integer DEFAULT 0 NOT NULL,
"idempotency_key" text,
"acceptance" jsonb,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE INDEX "backlog_status_priority_idx" ON "backlog" USING btree ("status","priority");--> statement-breakpoint
CREATE INDEX "backlog_status_claimed_at_idx" ON "backlog" USING btree ("status","claimed_at");--> statement-breakpoint
CREATE UNIQUE INDEX "backlog_idempotency_key_idx" ON "backlog" USING btree ("idempotency_key");

File diff suppressed because it is too large Load Diff

View File

@@ -78,13 +78,6 @@
"when": 1745366400000, "when": 1745366400000,
"tag": "0010_federation_enrollment_tokens", "tag": "0010_federation_enrollment_tokens",
"breakpoints": true "breakpoints": true
},
{
"idx": 11,
"version": "7",
"when": 1782310438919,
"tag": "0011_bitter_gateway",
"breakpoints": true
} }
] ]
} }

View File

@@ -1,217 +0,0 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { sql } from 'drizzle-orm';
import { createPgliteDb } from './client-pglite.js';
import { runPgliteMigrations } from './migrate.js';
import type { DbHandle } from './client.js';
import { BacklogService } from './backlog.js';
import { backlog } from './schema.js';
// Helper: backdate a claim's claimed_at by 1 hour so it is past any short TTL.
function sqlBackdate(id: string) {
return sql`UPDATE ${backlog} SET claimed_at = now() - interval '1 hour' WHERE ${backlog.id} = ${id}`;
}
/**
* Real Postgres semantics, no external server: embedded in-memory PGlite.
* The migration path creates the `backlog` table (and every other table) so the
* service runs against the actual generated schema, including the row locks the
* atomic-claim path depends on.
*/
async function freshService(): Promise<{ handle: DbHandle; svc: BacklogService }> {
const handle = createPgliteDb('memory://');
await runPgliteMigrations(handle);
return { handle, svc: new BacklogService(handle.db) };
}
describe('BacklogService', () => {
let handle: DbHandle;
let svc: BacklogService;
beforeEach(async () => {
({ handle, svc } = await freshService());
});
afterEach(async () => {
await handle.close();
});
it('create then list returns the card', async () => {
await svc.create({ id: 'c1', title: 'First card', phase: 'M1', priority: 5 });
const all = await svc.list();
expect(all).toHaveLength(1);
expect(all[0]).toMatchObject({ id: 'c1', title: 'First card', phase: 'M1', status: 'ready' });
});
it('idempotency_key dedups create', async () => {
const a = await svc.create({ id: 'c1', title: 'one', idempotencyKey: 'k-1' });
const b = await svc.create({ id: 'c2', title: 'two', idempotencyKey: 'k-1' });
expect(b.id).toBe(a.id);
const all = await svc.list();
expect(all).toHaveLength(1);
});
it('list filters by status and phase', async () => {
await svc.create({ id: 'c1', title: 'a', phase: 'M1' });
await svc.create({ id: 'c2', title: 'b', phase: 'M2' });
await svc.block('c2');
expect(await svc.list({ phase: 'M1' })).toHaveLength(1);
expect(await svc.list({ status: 'blocked' })).toHaveLength(1);
expect((await svc.list({ status: 'blocked' }))[0]!.id).toBe('c2');
});
describe('atomic claim', () => {
it('two concurrent claimers on one card => exactly one wins', async () => {
await svc.create({ id: 'only', title: 'the one', priority: 10 });
// Two independent claimers race for the single ready card on the same db.
// The atomic claim path (`FOR UPDATE SKIP LOCKED` inside a transaction)
// guarantees the loser's locked row is skipped, so it can never also flip
// the card to claimed — it gets the next candidate (none) and returns null.
const svcA = new BacklogService(handle.db);
const svcB = new BacklogService(handle.db);
const [a, b] = await Promise.all([
svcA.claim({ owner: 'worker-A' }),
svcB.claim({ owner: 'worker-B' }),
]);
const winners = [a, b].filter((c) => c !== null);
expect(winners).toHaveLength(1);
expect(winners[0]!.id).toBe('only');
expect(winners[0]!.status).toBe('claimed');
expect(['worker-A', 'worker-B']).toContain(winners[0]!.claimOwner);
const card = await svc.get('only');
expect(card!.status).toBe('claimed');
expect(card!.attempts).toBe(1);
});
it('many concurrent claimers on N cards => no card is double-claimed', async () => {
// 5 ready cards, 8 concurrent claimers. Exactly 5 win, all distinct.
for (let i = 0; i < 5; i++) {
await svc.create({ id: `card-${i}`, title: `card ${i}`, priority: i });
}
const claimers = Array.from({ length: 8 }, (_, i) =>
new BacklogService(handle.db).claim({ owner: `w-${i}` }),
);
const results = await Promise.all(claimers);
const won = results.filter((c): c is NonNullable<typeof c> => c !== null);
const wonIds = won.map((c) => c.id);
expect(won).toHaveLength(5);
expect(new Set(wonIds).size).toBe(5); // all distinct — no double-claim
});
it('claim picks the highest-priority ready card', async () => {
await svc.create({ id: 'low', title: 'low', priority: 1 });
await svc.create({ id: 'high', title: 'high', priority: 9 });
const claimed = await svc.claim({ owner: 'w' });
expect(claimed!.id).toBe('high');
});
it('claim of a specific --id', async () => {
await svc.create({ id: 'a', title: 'a', priority: 9 });
await svc.create({ id: 'b', title: 'b', priority: 1 });
const claimed = await svc.claim({ owner: 'w', id: 'b' });
expect(claimed!.id).toBe('b');
});
it('claim returns null when nothing is ready', async () => {
const claimed = await svc.claim({ owner: 'w' });
expect(claimed).toBeNull();
});
});
describe('deps DAG gate', () => {
it('card with an unfinished dep is not claimable and not ready', async () => {
await svc.create({ id: 'dep', title: 'dependency' });
await svc.create({ id: 'main', title: 'depends on dep', dependsOn: ['dep'] });
// `main` should NOT be claimable while `dep` is not done — `dep` wins.
const first = await svc.claim({ owner: 'w' });
expect(first!.id).toBe('dep');
// With dep claimed (not done), main still cannot be claimed.
const second = await svc.claim({ owner: 'w' });
expect(second).toBeNull();
// ready-only list excludes main while its dep is unfinished.
const ready = await svc.list({ readyOnly: true });
expect(ready.map((c) => c.id)).not.toContain('main');
// Once dep is done, main becomes ready and claimable.
await svc.complete('dep');
const readyAfter = await svc.list({ readyOnly: true });
expect(readyAfter.map((c) => c.id)).toContain('main');
const third = await svc.claim({ owner: 'w' });
expect(third!.id).toBe('main');
});
it('link adds a depends_on edge', async () => {
await svc.create({ id: 'a', title: 'a' });
await svc.create({ id: 'b', title: 'b' });
const linked = await svc.link('a', 'b');
expect(linked.dependsOn).toEqual(['b']);
// a is now gated on b
const claimed = await svc.claim({ owner: 'w' });
expect(claimed!.id).toBe('b');
});
});
describe('reclaim TTL', () => {
it('reclaim returns expired claims to ready', async () => {
await svc.create({ id: 'c1', title: 'c1' });
const claimed = await svc.claim({ owner: 'w', ttlSeconds: 60 });
expect(claimed!.status).toBe('claimed');
// Backdate the claim so it is well past its TTL.
await handle.db.execute(sqlBackdate('c1'));
const result = await svc.reclaim();
expect(result.reclaimed).toEqual(['c1']);
const card = await svc.get('c1');
expect(card!.status).toBe('ready');
expect(card!.claimOwner).toBeNull();
expect(card!.claimedAt).toBeNull();
});
it('reclaim does not touch a fresh (unexpired) claim', async () => {
await svc.create({ id: 'c1', title: 'c1' });
await svc.claim({ owner: 'w', ttlSeconds: 3600 });
const result = await svc.reclaim();
expect(result.reclaimed).toEqual([]);
expect((await svc.get('c1'))!.status).toBe('claimed');
});
it('reclaim --id releases a specific claim regardless of expiry', async () => {
await svc.create({ id: 'c1', title: 'c1' });
await svc.claim({ owner: 'w', ttlSeconds: 3600 });
const result = await svc.reclaim({ id: 'c1' });
expect(result.reclaimed).toEqual(['c1']);
expect((await svc.get('c1'))!.status).toBe('ready');
});
});
describe('stats', () => {
it('computes counts, oldest-ready age, and expired-claim count', async () => {
await svc.create({ id: 'r1', title: 'r1' });
await svc.create({ id: 'r2', title: 'r2' });
await svc.create({ id: 'b1', title: 'b1' });
await svc.block('b1');
await svc.create({ id: 'd1', title: 'd1' });
await svc.complete('d1');
await svc.create({ id: 'cl1', title: 'cl1' });
await svc.claim({ owner: 'w', id: 'cl1', ttlSeconds: 60 });
await handle.db.execute(sqlBackdate('cl1'));
const stats = await svc.stats();
expect(stats.counts.ready).toBe(2);
expect(stats.counts.blocked).toBe(1);
expect(stats.counts.done).toBe(1);
expect(stats.counts.claimed).toBe(1);
expect(stats.total).toBe(5);
expect(stats.expiredClaimCount).toBe(1);
expect(stats.oldestReadyAgeSeconds).not.toBeNull();
expect(stats.oldestReadyAgeSeconds!).toBeGreaterThanOrEqual(0);
});
});
});

View File

@@ -1,406 +0,0 @@
/**
* Mosaic-native backlog-of-record service (card A4).
*
* This is the backlog Mosaic owns end-to-end on its OWN Postgres storage layer.
* It REPLACES the former Hermes adapter — there is NO runtime dependency on
* Hermes here or anywhere downstream.
*
* The service takes a `Db` handle, so it works identically against:
* - `createDb()` — server Postgres (DATABASE_URL / config), and
* - `createPgliteDb()` — embedded Postgres (file or in-memory).
* Same code, same semantics — PGlite gives real Postgres behaviour (including
* row locks), so the atomic-claim path is exercised by the in-memory tests.
*
* Atomic claim: `claim()` selects the highest-priority, deps-satisfied, ready
* card with `SELECT ... FOR UPDATE SKIP LOCKED` and flips it to `claimed` inside
* one transaction. Two concurrent claimers can therefore NEVER both win the same
* card — the loser's locked row is skipped and it picks the next candidate (or
* gets null).
*/
import { and, asc, desc, eq, sql } from 'drizzle-orm';
import type { Db } from './client.js';
import { backlog } from './schema.js';
export type BacklogStatus = 'ready' | 'claimed' | 'blocked' | 'done';
export interface BacklogCard {
id: string;
title: string;
body: string | null;
phase: string | null;
priority: number;
status: BacklogStatus;
dependsOn: string[];
claimOwner: string | null;
claimTtlSeconds: number | null;
claimedAt: Date | null;
attempts: number;
idempotencyKey: string | null;
acceptance: unknown;
createdAt: Date;
updatedAt: Date;
}
export interface CreateCardInput {
id: string;
title: string;
body?: string | null;
phase?: string | null;
priority?: number;
dependsOn?: string[];
acceptance?: unknown;
idempotencyKey?: string | null;
status?: BacklogStatus;
}
export interface ListFilter {
status?: BacklogStatus;
phase?: string;
/** When true, return only cards that are `ready` AND have all deps `done`. */
readyOnly?: boolean;
}
export interface ClaimOptions {
owner: string;
/** Claim time-to-live in seconds (default 900). */
ttlSeconds?: number;
/** Claim a specific card by id instead of the highest-priority ready one. */
id?: string;
}
export interface ReclaimResult {
reclaimed: string[];
}
export interface BacklogStats {
counts: Record<BacklogStatus, number>;
total: number;
oldestReadyAgeSeconds: number | null;
expiredClaimCount: number;
}
export const DEFAULT_CLAIM_TTL_SECONDS = 900;
type Row = typeof backlog.$inferSelect;
/**
* Row shape as returned by the raw `SELECT * ... FOR UPDATE SKIP LOCKED` path.
* That path bypasses drizzle's column-name mapping, so JSON columns arrive as
* the snake_case `depends_on` (and may be a JSON string under some drivers).
*/
interface RawRow extends Row {
depends_on?: unknown;
}
function toCard(row: Row): BacklogCard {
return {
id: row.id,
title: row.title,
body: row.body,
phase: row.phase,
priority: row.priority,
status: row.status,
dependsOn: row.dependsOn ?? [],
claimOwner: row.claimOwner,
claimTtlSeconds: row.claimTtlSeconds,
claimedAt: row.claimedAt,
attempts: row.attempts,
idempotencyKey: row.idempotencyKey,
acceptance: row.acceptance,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
};
}
/**
* The backlog repository/service. Construct with any `Db` handle.
*/
export class BacklogService {
constructor(private readonly db: Db) {}
/**
* Create a card. If `idempotencyKey` is provided and a card already exists
* with that key, the existing card is returned unchanged (no duplicate).
*/
async create(input: CreateCardInput): Promise<BacklogCard> {
if (input.idempotencyKey) {
const existing = await this.db
.select()
.from(backlog)
.where(eq(backlog.idempotencyKey, input.idempotencyKey))
.limit(1);
if (existing[0]) return toCard(existing[0]);
}
const inserted = await this.db
.insert(backlog)
.values({
id: input.id,
title: input.title,
body: input.body ?? null,
phase: input.phase ?? null,
priority: input.priority ?? 0,
status: input.status ?? 'ready',
dependsOn: input.dependsOn ?? [],
acceptance: input.acceptance ?? null,
idempotencyKey: input.idempotencyKey ?? null,
})
.returning();
return toCard(inserted[0]!);
}
/** Fetch a single card by id, or null. */
async get(id: string): Promise<BacklogCard | null> {
const rows = await this.db.select().from(backlog).where(eq(backlog.id, id)).limit(1);
return rows[0] ? toCard(rows[0]) : null;
}
/**
* List cards with optional filters. `readyOnly` enforces the DAG gate:
* a card is "ready" only when its own status is `ready` AND every card in
* `depends_on` exists and is `done`.
*/
async list(filter: ListFilter = {}): Promise<BacklogCard[]> {
const conditions = [];
if (filter.status) conditions.push(eq(backlog.status, filter.status));
if (filter.phase) conditions.push(eq(backlog.phase, filter.phase));
const rows = await this.db
.select()
.from(backlog)
.where(conditions.length ? and(...conditions) : undefined)
.orderBy(desc(backlog.priority), asc(backlog.createdAt));
const cards = rows.map(toCard);
if (!filter.readyOnly) return cards;
const doneIds = await this.doneIdSet();
return cards.filter(
(c) => c.status === 'ready' && c.dependsOn.every((dep) => doneIds.has(dep)),
);
}
private async doneIdSet(): Promise<Set<string>> {
const done = await this.db
.select({ id: backlog.id })
.from(backlog)
.where(eq(backlog.status, 'done'));
return new Set(done.map((d) => d.id));
}
/**
* Atomically claim a card.
*
* Strategy: inside ONE transaction we lock the candidate row(s) with
* `FOR UPDATE SKIP LOCKED`. A concurrent claimer that already holds the lock
* on a row has that row skipped for us, so two claimers can never both win the
* same card. The locked row is then flipped to `claimed` and returned.
*
* Candidate selection (when no explicit `id`):
* - status = 'ready'
* - all deps satisfied (deps ⊆ done set)
* - ordered by priority DESC, created_at ASC
*
* Returns the claimed card, or null if nothing is claimable.
*/
async claim(opts: ClaimOptions): Promise<BacklogCard | null> {
const ttl = opts.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS;
return this.db.transaction(async (tx) => {
// Compute the set of satisfied dependencies up front. A card is claimable
// only if every id in depends_on is currently 'done'.
const doneRows = await tx
.select({ id: backlog.id })
.from(backlog)
.where(eq(backlog.status, 'done'));
const doneIds = new Set(doneRows.map((r) => r.id));
// Lock candidate ready rows, skipping any already locked by a concurrent
// claimer. We over-fetch (no LIMIT 1) so the deps filter below can fall
// through to the next-best card rather than returning null spuriously.
let lockedRows: RawRow[];
if (opts.id) {
const result = await tx.execute(
sql`SELECT * FROM ${backlog}
WHERE ${backlog.id} = ${opts.id} AND ${backlog.status} = 'ready'
FOR UPDATE SKIP LOCKED`,
);
lockedRows = rowsOf(result);
} else {
const result = await tx.execute(
sql`SELECT * FROM ${backlog}
WHERE ${backlog.status} = 'ready'
ORDER BY ${backlog.priority} DESC, ${backlog.createdAt} ASC
FOR UPDATE SKIP LOCKED`,
);
lockedRows = rowsOf(result);
}
const candidate = lockedRows.find((row) =>
normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)),
);
if (!candidate) return null;
const updated = await tx
.update(backlog)
.set({
status: 'claimed',
claimOwner: opts.owner,
claimTtlSeconds: ttl,
claimedAt: new Date(),
attempts: sql`${backlog.attempts} + 1`,
updatedAt: new Date(),
})
.where(eq(backlog.id, candidate.id))
.returning();
return toCard(updated[0]!);
});
}
/**
* Release expired claims (claimed_at + ttl < now) back to `ready`, OR release
* a specific card by id regardless of expiry. Cleared claim fields.
* Returns the ids that were released.
*/
async reclaim(opts: { id?: string } = {}): Promise<ReclaimResult> {
if (opts.id) {
const released = await this.db
.update(backlog)
.set({
status: 'ready',
claimOwner: null,
claimTtlSeconds: null,
claimedAt: null,
updatedAt: new Date(),
})
.where(and(eq(backlog.id, opts.id), eq(backlog.status, 'claimed')))
.returning({ id: backlog.id });
return { reclaimed: released.map((r) => r.id) };
}
// Expired = status claimed AND claimed_at + (ttl seconds) < now().
const released = await this.db
.update(backlog)
.set({
status: 'ready',
claimOwner: null,
claimTtlSeconds: null,
claimedAt: null,
updatedAt: new Date(),
})
.where(
and(
eq(backlog.status, 'claimed'),
sql`${backlog.claimedAt} + make_interval(secs => ${backlog.claimTtlSeconds}) < now()`,
),
)
.returning({ id: backlog.id });
return { reclaimed: released.map((r) => r.id) };
}
/** Add a `depends_on` edge (from → depends on → to). Idempotent. */
async link(from: string, to: string): Promise<BacklogCard> {
const card = await this.get(from);
if (!card) throw new Error(`backlog card not found: ${from}`);
const target = await this.get(to);
if (!target) throw new Error(`backlog dependency not found: ${to}`);
if (from === to) throw new Error('a card cannot depend on itself');
if (card.dependsOn.includes(to)) return card;
const nextDeps = [...card.dependsOn, to];
const updated = await this.db
.update(backlog)
.set({ dependsOn: nextDeps, updatedAt: new Date() })
.where(eq(backlog.id, from))
.returning();
return toCard(updated[0]!);
}
/** Mark a card blocked. */
async block(id: string): Promise<BacklogCard | null> {
return this.setStatus(id, 'blocked');
}
/** Mark a card done (releasing any claim). */
async complete(id: string): Promise<BacklogCard | null> {
const updated = await this.db
.update(backlog)
.set({
status: 'done',
claimOwner: null,
claimTtlSeconds: null,
claimedAt: null,
updatedAt: new Date(),
})
.where(eq(backlog.id, id))
.returning();
return updated[0] ? toCard(updated[0]) : null;
}
private async setStatus(id: string, status: BacklogStatus): Promise<BacklogCard | null> {
const updated = await this.db
.update(backlog)
.set({ status, updatedAt: new Date() })
.where(eq(backlog.id, id))
.returning();
return updated[0] ? toCard(updated[0]) : null;
}
/** Counts by status, oldest-ready age (seconds), and expired-claim count. */
async stats(): Promise<BacklogStats> {
const all = await this.db.select().from(backlog);
const counts: Record<BacklogStatus, number> = {
ready: 0,
claimed: 0,
blocked: 0,
done: 0,
};
let oldestReady: Date | null = null;
let expiredClaimCount = 0;
const now = Date.now();
for (const row of all) {
counts[row.status] += 1;
if (row.status === 'ready') {
if (oldestReady === null || row.createdAt < oldestReady) oldestReady = row.createdAt;
}
if (row.status === 'claimed' && row.claimedAt && row.claimTtlSeconds != null) {
const expiry = row.claimedAt.getTime() + row.claimTtlSeconds * 1000;
if (expiry < now) expiredClaimCount += 1;
}
}
return {
counts,
total: all.length,
oldestReadyAgeSeconds:
oldestReady === null ? null : Math.max(0, Math.floor((now - oldestReady.getTime()) / 1000)),
expiredClaimCount,
};
}
}
/** Extract rows from a drizzle `.execute()` result across drivers (pg / pglite). */
function rowsOf(result: unknown): RawRow[] {
if (Array.isArray(result)) return result as RawRow[];
const maybe = result as { rows?: unknown };
if (maybe && Array.isArray(maybe.rows)) return maybe.rows as RawRow[];
return [];
}
/** A raw SQL row returns snake_case `depends_on`; normalize to string[]. */
function normalizeDeps(value: unknown): string[] {
if (Array.isArray(value)) return value as string[];
if (typeof value === 'string') {
try {
const parsed = JSON.parse(value);
return Array.isArray(parsed) ? (parsed as string[]) : [];
} catch {
return [];
}
}
return [];
}

View File

@@ -3,17 +3,6 @@ export { createPgliteDb } from './client-pglite.js';
export { runMigrations, runPgliteMigrations } from './migrate.js'; export { runMigrations, runPgliteMigrations } from './migrate.js';
export * from './schema.js'; export * from './schema.js';
export * from './federation.js'; export * from './federation.js';
export {
BacklogService,
DEFAULT_CLAIM_TTL_SECONDS,
type BacklogCard,
type BacklogStatus,
type BacklogStats,
type ClaimOptions,
type CreateCardInput,
type ListFilter,
type ReclaimResult,
} from './backlog.js';
export { export {
eq, eq,
and, and,

View File

@@ -587,62 +587,6 @@ export const summarizationJobs = pgTable(
(t) => [index('summarization_jobs_status_idx').on(t.status)], (t) => [index('summarization_jobs_status_idx').on(t.status)],
); );
// ─── Fleet Backlog ────────────────────────────────────────────────────────────
// Mosaic-native backlog-of-record (card A4). This REPLACES the former Hermes
// adapter — there is NO runtime dependency on Hermes. Cards form a dependency
// DAG (`depends_on`), are claimed atomically by fleet workers via
// `SELECT ... FOR UPDATE SKIP LOCKED`, and auto-expire via a TTL so a crashed
// claimer's card returns to the pool.
/**
* Lifecycle status of a backlog card.
* - ready: eligible to be claimed (once its deps are all `done`).
* - claimed: a worker holds it (claim_owner + claimed_at set); may expire via TTL.
* - blocked: explicitly parked; never auto-claimed.
* - done: completed; satisfies dependents.
*/
export const backlogStatusEnum = pgEnum('backlog_status', ['ready', 'claimed', 'blocked', 'done']);
export const backlog = pgTable(
'backlog',
{
/** Stable, caller-supplied card id (e.g. "A4", "fleet-001"). PK. */
id: text('id').primaryKey(),
title: text('title').notNull(),
body: text('body'),
/** Board/phase grouping (e.g. "M1", "fleet"). Free-form. */
phase: text('phase'),
/** Higher number = higher priority; claim picks the max-priority ready card. */
priority: integer('priority').notNull().default(0),
status: backlogStatusEnum('status').notNull().default('ready'),
/** DAG edges: ids of cards this one depends on. "ready" requires all done. */
dependsOn: jsonb('depends_on').notNull().$type<string[]>().default([]),
/** Owner token of the current claim (worker/agent id). NULL when unclaimed. */
claimOwner: text('claim_owner'),
/** TTL of the active claim in seconds. NULL when unclaimed. */
claimTtlSeconds: integer('claim_ttl_seconds'),
/** When the active claim was taken. NULL when unclaimed. claimed_at + ttl = expiry. */
claimedAt: timestamp('claimed_at', { withTimezone: true }),
/** Count of times this card has been claimed (incremented on each claim). */
attempts: integer('attempts').notNull().default(0),
/** Optional dedup key for `create`; a repeat key returns the existing card. */
idempotencyKey: text('idempotency_key'),
/** Acceptance criteria — free-form JSON (array of strings or object). */
acceptance: jsonb('acceptance'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
},
(t) => [
// Hot path: claim scans ready cards ordered by priority then age.
index('backlog_status_priority_idx').on(t.status, t.priority),
// reclaim sweeps claimed cards by claimed_at to find expired ones.
index('backlog_status_claimed_at_idx').on(t.status, t.claimedAt),
// Idempotent create dedups on this key (NULLs are distinct in Postgres, so
// many unkeyed cards coexist; a repeated non-null key collides).
uniqueIndex('backlog_idempotency_key_idx').on(t.idempotencyKey),
],
);
// ─── Federation ────────────────────────────────────────────────────────────── // ─── Federation ──────────────────────────────────────────────────────────────
// Enums declared before tables that reference them. // Enums declared before tables that reference them.
// All federation definitions live in this file (avoids CJS/ESM cross-import // All federation definitions live in this file (avoids CJS/ESM cross-import

View File

@@ -28,12 +28,10 @@ INSTALL_MODE="${MOSAIC_INSTALL_MODE:-prompt}"
# fleet/roster.schema.json (synced normally). The user's own fleet files MUST # fleet/roster.schema.json (synced normally). The user's own fleet files MUST
# survive `mosaic update` (which runs this sync automatically): the active # survive `mosaic update` (which runs this sync automatically): the active
# roster (`fleet/roster.yaml` + any other `fleet/*.yaml`), per-agent env # roster (`fleet/roster.yaml` + any other `fleet/*.yaml`), per-agent env
# (`fleet/agents/`), heartbeat run dir (`fleet/run/`), and the Mosaic-native # (`fleet/agents/`), and heartbeat run dir (`fleet/run/`). Without these, an
# backlog-of-record store (`fleet/backlog/` — embedded PGlite data dir; see # update wipes the operator's fleet. Glob entries are honored by both the rsync
# packages/mosaic/src/commands/fleet-backlog.ts). Without these, an update # path (`--exclude`) and the glob-aware cp fallback below.
# wipes the operator's fleet AND their backlog. Glob entries are honored by PRESERVE_PATHS=("CONSTITUTION.md" "AGENTS.md" "SOUL.md" "USER.md" "TOOLS.md" "STANDARDS.md" "memory" "sources" "credentials" "fleet/*.yaml" "fleet/agents" "fleet/run")
# both the rsync path (`--exclude`) and the glob-aware cp fallback below.
PRESERVE_PATHS=("CONSTITUTION.md" "AGENTS.md" "SOUL.md" "USER.md" "TOOLS.md" "STANDARDS.md" "memory" "sources" "credentials" "fleet/*.yaml" "fleet/agents" "fleet/run" "fleet/backlog")
# Framework-owned contract files: re-copied from defaults/ on every upgrade (the # Framework-owned contract files: re-copied from defaults/ on every upgrade (the
# user must not edit them; a divergent copy is backed up once before overwrite). # user must not edit them; a divergent copy is backed up once before overwrite).

View File

@@ -1,6 +1,6 @@
{ {
"name": "@mosaicstack/mosaic", "name": "@mosaicstack/mosaic",
"version": "0.0.45", "version": "0.0.43",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://git.mosaicstack.dev/mosaicstack/stack.git", "url": "https://git.mosaicstack.dev/mosaicstack/stack.git",
@@ -29,7 +29,6 @@
"dependencies": { "dependencies": {
"@mosaicstack/brain": "workspace:*", "@mosaicstack/brain": "workspace:*",
"@mosaicstack/config": "workspace:*", "@mosaicstack/config": "workspace:*",
"@mosaicstack/db": "workspace:*",
"@mosaicstack/forge": "workspace:*", "@mosaicstack/forge": "workspace:*",
"@mosaicstack/log": "workspace:*", "@mosaicstack/log": "workspace:*",
"@mosaicstack/macp": "workspace:*", "@mosaicstack/macp": "workspace:*",

View File

@@ -1,285 +0,0 @@
/**
* `mosaic fleet backlog <sub> --json` — Mosaic-native backlog of record.
*
* Mosaic OWNS this backlog end-to-end on its existing Postgres storage layer
* (`@mosaicstack/db`). It REPLACES the former Hermes adapter — there is NO
* runtime dependency on Hermes.
*
* Storage tier (the existing storage-layer convention, no new engine):
* - default: embedded PGlite at <mosaicHome>/fleet/backlog (real Postgres
* semantics, persisted on disk so the operator's backlog survives reboots
* and `mosaic update` — see install.sh PRESERVE_PATHS).
* - DATABASE_URL set: full server Postgres — same code, no change.
*
* Migrations run on first use so the `backlog` table always exists.
*/
import { mkdir } from 'node:fs/promises';
import { homedir } from 'node:os';
import { join } from 'node:path';
import type { Command } from 'commander';
import {
BacklogService,
DEFAULT_CLAIM_TTL_SECONDS,
type BacklogCard,
type DbHandle,
} from '@mosaicstack/db';
function defaultMosaicHome(): string {
return process.env['MOSAIC_HOME'] ?? join(homedir(), '.config', 'mosaic');
}
/** Resolve where the embedded PGlite backlog store lives (default tier). */
export function defaultBacklogDataDir(mosaicHome = defaultMosaicHome()): string {
return join(mosaicHome, 'fleet', 'backlog');
}
/**
* Open a db handle for the backlog and ensure the schema exists.
*
* Tier detection mirrors the storage layer: DATABASE_URL => server Postgres
* (migrations applied via runMigrations); otherwise embedded PGlite at the
* fleet/backlog data dir (migrations applied via runPgliteMigrations).
*/
async function openBacklogDb(mosaicHome: string): Promise<DbHandle> {
const { createDb, createPgliteDb, runMigrations, runPgliteMigrations } =
await import('@mosaicstack/db');
const url = process.env['DATABASE_URL'];
if (url) {
await runMigrations(url);
return createDb(url);
}
const dataDir = process.env['PGLITE_DATA_DIR'] ?? defaultBacklogDataDir(mosaicHome);
// PGlite writes a file-backed store to dataDir but does not create missing
// parent directories (e.g. <mosaicHome>/fleet). Create them first. Skip for
// the in-memory pseudo-paths so a memory:// store never touches the fs.
if (!dataDir.startsWith('memory://') && dataDir !== ':memory:') {
await mkdir(dataDir, { recursive: true });
}
const handle = createPgliteDb(dataDir);
await runPgliteMigrations(handle);
return handle;
}
function parseDependsOn(value?: string): string[] {
if (!value) return [];
return value
.split(',')
.map((s) => s.trim())
.filter((s) => s.length > 0);
}
function parseAcceptance(value?: string): unknown {
if (!value) return null;
try {
return JSON.parse(value);
} catch {
// Fall back to a list of newline/semicolon-separated criteria.
return value
.split(/[\n;]/)
.map((s) => s.trim())
.filter((s) => s.length > 0);
}
}
function printCard(card: BacklogCard | null, json?: boolean): void {
if (json) {
console.log(JSON.stringify(card));
return;
}
if (!card) {
console.log('(none)');
return;
}
const deps = card.dependsOn.length ? card.dependsOn.join(',') : '-';
console.log(
`${card.id}\t[${card.status}]\tp=${card.priority}\tphase=${card.phase ?? '-'}\tdeps=${deps}\t${card.title}`,
);
}
function printCards(cards: BacklogCard[], json?: boolean): void {
if (json) {
console.log(JSON.stringify(cards));
return;
}
if (cards.length === 0) {
console.log('(no cards)');
return;
}
for (const card of cards) printCard(card, false);
}
/**
* Register `backlog` under an existing `fleet` command.
* `mosaicHomeFor` resolves the active --mosaic-home (parent flag) at call time.
*/
export function registerFleetBacklogCommand(
fleetCmd: Command,
mosaicHomeFor: () => string,
): Command {
const backlogCmd = fleetCmd
.command('backlog')
.description('Mosaic-native backlog of record (atomic claim + TTL, deps DAG)');
const withSvc = async <T>(fn: (svc: BacklogService) => Promise<T>): Promise<T> => {
const handle = await openBacklogDb(mosaicHomeFor());
try {
return await fn(new BacklogService(handle.db));
} finally {
await handle.close();
}
};
backlogCmd
.command('create')
.description('Create a backlog card (idempotency_key dedups)')
.requiredOption('--id <id>', 'Stable card id')
.requiredOption('--title <title>', 'Card title')
.option('--body <body>', 'Card body / description')
.option('--phase <phase>', 'Board/phase grouping')
.option('--priority <n>', 'Priority (higher = sooner)', (v) => parseInt(v, 10), 0)
.option('--depends-on <ids>', 'Comma-separated dependency card ids')
.option('--acceptance <json>', 'Acceptance criteria (JSON or ;/newline list)')
.option('--idempotency-key <key>', 'Dedup key; repeat returns the existing card')
.option('--json', 'Print JSON')
.action(
async (opts: {
id: string;
title: string;
body?: string;
phase?: string;
priority: number;
dependsOn?: string;
acceptance?: string;
idempotencyKey?: string;
json?: boolean;
}) => {
const card = await withSvc((svc) =>
svc.create({
id: opts.id,
title: opts.title,
body: opts.body ?? null,
phase: opts.phase ?? null,
priority: opts.priority,
dependsOn: parseDependsOn(opts.dependsOn),
acceptance: parseAcceptance(opts.acceptance),
idempotencyKey: opts.idempotencyKey ?? null,
}),
);
printCard(card, opts.json);
},
);
backlogCmd
.command('list')
.description('List cards (filters: --status, --phase, --ready-only)')
.option('--status <status>', 'Filter by status: ready|claimed|blocked|done')
.option('--phase <phase>', 'Filter by phase')
.option('--ready-only', 'Only cards that are ready AND have all deps done')
.option('--json', 'Print JSON')
.action(
async (opts: {
status?: BacklogCard['status'];
phase?: string;
readyOnly?: boolean;
json?: boolean;
}) => {
const cards = await withSvc((svc) =>
svc.list({
...(opts.status ? { status: opts.status } : {}),
...(opts.phase ? { phase: opts.phase } : {}),
...(opts.readyOnly ? { readyOnly: true } : {}),
}),
);
printCards(cards, opts.json);
},
);
backlogCmd
.command('claim')
.description('Atomically claim the highest-priority ready card (FOR UPDATE SKIP LOCKED)')
.requiredOption('--owner <owner>', 'Claim owner (worker/agent id)')
.option(
'--ttl <sec>',
'Claim TTL in seconds',
(v) => parseInt(v, 10),
DEFAULT_CLAIM_TTL_SECONDS,
)
.option('--id <id>', 'Claim a specific card by id')
.option('--json', 'Print JSON')
.action(async (opts: { owner: string; ttl: number; id?: string; json?: boolean }) => {
const card = await withSvc((svc) =>
svc.claim({ owner: opts.owner, ttlSeconds: opts.ttl, ...(opts.id ? { id: opts.id } : {}) }),
);
printCard(card, opts.json);
if (!card && !opts.json) process.exitCode = 0;
});
backlogCmd
.command('reclaim')
.description('Release expired claims back to ready (or a specific --id)')
.option('--id <id>', 'Release a specific card regardless of expiry')
.option('--json', 'Print JSON')
.action(async (opts: { id?: string; json?: boolean }) => {
const result = await withSvc((svc) => svc.reclaim(opts.id ? { id: opts.id } : {}));
if (opts.json) {
console.log(JSON.stringify(result));
} else if (result.reclaimed.length === 0) {
console.log('(nothing to reclaim)');
} else {
console.log(`reclaimed: ${result.reclaimed.join(', ')}`);
}
});
backlogCmd
.command('link')
.description('Add a depends_on edge (--from depends on --to)')
.requiredOption('--from <id>', 'Card that gains the dependency')
.requiredOption('--to <id>', 'Card it now depends on')
.option('--json', 'Print JSON')
.action(async (opts: { from: string; to: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.link(opts.from, opts.to));
printCard(card, opts.json);
});
backlogCmd
.command('stats')
.description('Counts by status, oldest-ready age, expired-claim count')
.option('--json', 'Print JSON')
.action(async (opts: { json?: boolean }) => {
const stats = await withSvc((svc) => svc.stats());
if (opts.json) {
console.log(JSON.stringify(stats));
return;
}
console.log(`total: ${stats.total}`);
console.log(
`ready=${stats.counts.ready} claimed=${stats.counts.claimed} ` +
`blocked=${stats.counts.blocked} done=${stats.counts.done}`,
);
console.log(`oldest-ready-age: ${stats.oldestReadyAgeSeconds ?? '-'}s`);
console.log(`expired-claims: ${stats.expiredClaimCount}`);
});
backlogCmd
.command('block')
.description('Mark a card blocked')
.requiredOption('--id <id>', 'Card id')
.option('--json', 'Print JSON')
.action(async (opts: { id: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.block(opts.id));
printCard(card, opts.json);
});
backlogCmd
.command('complete')
.description('Mark a card done')
.requiredOption('--id <id>', 'Card id')
.option('--json', 'Print JSON')
.action(async (opts: { id: string; json?: boolean }) => {
const card = await withSvc((svc) => svc.complete(opts.id));
printCard(card, opts.json);
});
return backlogCmd;
}

View File

@@ -27,6 +27,7 @@ import {
enableFleetUnits, enableFleetUnits,
FLEET_PROFILES, FLEET_PROFILES,
HEARTBEAT_IDLE_THRESHOLD_SECONDS, HEARTBEAT_IDLE_THRESHOLD_SECONDS,
HEARTBEAT_STUCK_THRESHOLD_SECONDS,
generateAgentEnv, generateAgentEnv,
getDefaultOperatorSourceLabel, getDefaultOperatorSourceLabel,
getDefaultTenantAndHost, getDefaultTenantAndHost,
@@ -47,6 +48,7 @@ import {
resolvePresetFilename, resolvePresetFilename,
RUNTIME_ACCEPTABLE_COMMANDS, RUNTIME_ACCEPTABLE_COMMANDS,
serializeRosterToYaml, serializeRosterToYaml,
stuckThresholdSeconds,
VERIFY_DEFAULT_TIMEOUT_MS, VERIFY_DEFAULT_TIMEOUT_MS,
VERIFY_POLL_INTERVAL_MS, VERIFY_POLL_INTERVAL_MS,
type AgentPsRow, type AgentPsRow,
@@ -78,7 +80,6 @@ describe('registerFleetCommand', () => {
expect(fleet).toBeDefined(); expect(fleet).toBeDefined();
expect(fleet!.commands.map((command) => command.name()).sort()).toEqual([ expect(fleet!.commands.map((command) => command.name()).sort()).toEqual([
'add', 'add',
'backlog',
'init', 'init',
'install', 'install',
'install-systemd', 'install-systemd',
@@ -92,24 +93,6 @@ describe('registerFleetCommand', () => {
]); ]);
}); });
it('registers the backlog subcommand with its operations', () => {
const program = buildProgram();
const fleet = program.commands.find((command) => command.name() === 'fleet');
const backlog = fleet!.commands.find((command) => command.name() === 'backlog');
expect(backlog).toBeDefined();
expect(backlog!.commands.map((command) => command.name()).sort()).toEqual([
'block',
'claim',
'complete',
'create',
'link',
'list',
'reclaim',
'stats',
]);
});
it('adds fleet-backed agent subcommands without removing existing options', () => { it('adds fleet-backed agent subcommands without removing existing options', () => {
const program = buildProgram(); const program = buildProgram();
const agent = program.commands.find((command) => command.name() === 'agent'); const agent = program.commands.find((command) => command.name() === 'agent');
@@ -872,7 +855,7 @@ describe('fleet ps — command construction', () => {
'-t', '-t',
'=canary-pi:0.0', '=canary-pi:0.0',
'-F', '-F',
'#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity} #{window_activity} #{session_activity}', '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}',
]); ]);
}); });
@@ -957,33 +940,42 @@ describe('fleet ps — heartbeat parsing', () => {
describe('fleet ps — readiness thresholds', () => { describe('fleet ps — readiness thresholds', () => {
const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
afterEach(() => { afterEach(() => {
if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle; else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle;
if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck;
}); });
it('uses the default activity threshold when env is unset', () => { it('uses default readiness thresholds when env is unset', () => {
delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS); expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS);
expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS);
}); });
it('honors a positive integer activity threshold from env', () => { it('honors positive integer readiness thresholds from env', () => {
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '120'; process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '120';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '480';
expect(idleThresholdSeconds()).toBe(120); expect(idleThresholdSeconds()).toBe(120);
expect(stuckThresholdSeconds()).toBe(480);
}); });
it('falls back to the default for invalid activity thresholds', () => { it('falls back to defaults for invalid readiness thresholds', () => {
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '0'; process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '0';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = 'not-a-number';
expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS); expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS);
expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS);
}); });
}); });
describe('fleet ps — readiness classification', () => { describe('fleet ps — readiness classification', () => {
const thresholds = { idleThresholdSeconds: 300 }; const thresholds = { idleThresholdSeconds: 300, stuckThresholdSeconds: 900 };
it('reports dead when the pane is not alive', () => { it('reports dead when the pane is not alive', () => {
expect( expect(
@@ -1012,7 +1004,7 @@ describe('fleet ps — readiness classification', () => {
).toBe('stale'); ).toBe('stale');
}); });
it('reports working when heartbeat status is busy, even after the activity threshold', () => { it('reports working when heartbeat status is busy, even past stuck threshold', () => {
expect( expect(
classifyReadiness( classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'busy', idleSeconds: 2_000 }, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'busy', idleSeconds: 2_000 },
@@ -1021,7 +1013,7 @@ describe('fleet ps — readiness classification', () => {
).toBe('working'); ).toBe('working');
}); });
it('reports working when pane idle seconds are null', () => { it('reports working when pane idle seconds are unavailable', () => {
expect( expect(
classifyReadiness( classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: null }, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: null },
@@ -1030,31 +1022,25 @@ describe('fleet ps — readiness classification', () => {
).toBe('working'); ).toBe('working');
}); });
it('reports working when pane idle seconds are undefined', () => { it('reports stuck at the stuck threshold boundary', () => {
expect(
classifyReadiness({ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok' }, thresholds),
).toBe('working');
});
it('reports working when pane idle seconds are non-finite', () => {
expect( expect(
classifyReadiness( classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: Number.NaN }, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 900 },
thresholds, thresholds,
), ),
).toBe('working'); ).toBe('stuck');
}); });
it('reports available at the activity threshold boundary', () => { it('reports idle at the idle threshold boundary', () => {
expect( expect(
classifyReadiness( classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 300 }, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 300 },
thresholds, thresholds,
), ),
).toBe('available'); ).toBe('idle');
}); });
it('reports working below the activity threshold', () => { it('reports working below the idle threshold', () => {
expect( expect(
classifyReadiness( classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 299 }, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 299 },
@@ -1063,14 +1049,13 @@ describe('fleet ps — readiness classification', () => {
).toBe('working'); ).toBe('working');
}); });
it('reports very long idle as available, not stuck', () => { it('checks stuck before idle when thresholds are inverted', () => {
const readiness = classifyReadiness( expect(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 100_000 }, classifyReadiness(
thresholds, { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 350 },
); { idleThresholdSeconds: 900, stuckThresholdSeconds: 300 },
),
expect(readiness).toBe('available'); ).toBe('stuck');
expect(readiness).not.toBe('stuck');
}); });
}); });
@@ -1094,11 +1079,9 @@ describe('fleet ps — systemd show parsing', () => {
describe('fleet ps — tmux list-panes parsing', () => { describe('fleet ps — tmux list-panes parsing', () => {
const NOW_MS = 1_700_000_000_000; const NOW_MS = 1_700_000_000_000;
it('uses pane_activity when present', () => { it('parses alive pane with pid, command, and idle time', () => {
const paneActivityEpoch = Math.floor((NOW_MS - 30_000) / 1000); // 30s ago const activityEpoch = Math.floor((NOW_MS - 30_000) / 1000); // 30s ago
const windowActivityEpoch = Math.floor((NOW_MS - 60_000) / 1000); // 60s ago const output = `12345 claude 0 ${activityEpoch}\n`;
const sessionActivityEpoch = Math.floor((NOW_MS - 90_000) / 1000); // 90s ago
const output = `12345 claude 0 ${paneActivityEpoch} ${windowActivityEpoch} ${sessionActivityEpoch}\n`;
const result = parseTmuxListPanes(output, NOW_MS); const result = parseTmuxListPanes(output, NOW_MS);
expect(result.pid).toBe(12345); expect(result.pid).toBe(12345);
expect(result.command).toBe('claude'); expect(result.command).toBe('claude');
@@ -1106,45 +1089,8 @@ describe('fleet ps — tmux list-panes parsing', () => {
expect(result.idleSeconds).toBe(30); expect(result.idleSeconds).toBe(30);
}); });
it('uses window_activity when pane_activity is empty', () => {
const windowActivityEpoch = Math.floor((NOW_MS - 45_000) / 1000); // 45s ago
const sessionActivityEpoch = Math.floor((NOW_MS - 90_000) / 1000); // 90s ago
const output = `12345 node 0 ${windowActivityEpoch} ${sessionActivityEpoch}\n`;
expect(output).toContain('0 '); // empty pane_activity preserves index alignment
const result = parseTmuxListPanes(output, NOW_MS);
expect(result.pid).toBe(12345);
expect(result.command).toBe('node');
expect(result.dead).toBe(false);
expect(result.idleSeconds).toBe(45);
});
it('uses session_activity when pane_activity and window_activity are empty', () => {
const sessionActivityEpoch = Math.floor((NOW_MS - 75_000) / 1000); // 75s ago
const output = `12345 node 0 ${sessionActivityEpoch}\n`;
const result = parseTmuxListPanes(output, NOW_MS);
expect(result.idleSeconds).toBe(75);
});
it('reports null idleSeconds when all activity sources are empty', () => {
const output = '12345 node 0 \n';
const result = parseTmuxListPanes(output, NOW_MS);
expect(result.idleSeconds).toBeNull();
});
it('computes exact idle seconds from now minus epoch seconds', () => {
const activityEpoch = 1_699_999_877;
const result = parseTmuxListPanes(`12345 claude 0 ${activityEpoch} 0 0\n`, NOW_MS);
expect(result.idleSeconds).toBe(123);
});
it('clamps future activity epochs to 0 idle seconds', () => {
const futureActivityEpoch = Math.floor((NOW_MS + 30_000) / 1000);
const result = parseTmuxListPanes(`12345 claude 0 ${futureActivityEpoch} 0 0\n`, NOW_MS);
expect(result.idleSeconds).toBe(0);
});
it('reports dead pane when pane_dead=1', () => { it('reports dead pane when pane_dead=1', () => {
const output = `0 bash 1 0 0 0\n`; const output = `0 bash 1 0\n`;
const result = parseTmuxListPanes(output, NOW_MS); const result = parseTmuxListPanes(output, NOW_MS);
expect(result.dead).toBe(true); expect(result.dead).toBe(true);
}); });
@@ -1569,7 +1515,7 @@ describe('fleet ps — command sequences issued', () => {
}); });
describe('fleet ps — readiness table output', () => { describe('fleet ps — readiness table output', () => {
it('renders available in HB column without idle/stuck alarm flags', async () => { it('renders readiness in HB column and flags idle/stuck rows', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
const rosterPath = join(home, 'fleet', 'roster.yaml'); const rosterPath = join(home, 'fleet', 'roster.yaml');
const runDir = join(home, 'fleet', 'run'); const runDir = join(home, 'fleet', 'run');
@@ -1580,34 +1526,36 @@ describe('fleet ps — readiness table output', () => {
'version: 1', 'version: 1',
'transport: tmux', 'transport: tmux',
'agents:', 'agents:',
' - name: working-agent', ' - name: idle-agent',
' runtime: pi', ' runtime: pi',
' - name: available-agent', ' - name: stuck-agent',
' runtime: pi', ' runtime: pi',
].join('\n'), ].join('\n'),
); );
const nowMs = 1_700_000_000_000; const nowMs = 1_700_000_000_000;
const workingActivityEpoch = Math.floor((nowMs - 2_000) / 1000); const idleActivityEpoch = Math.floor((nowMs - 10_000) / 1000);
const availableActivityEpoch = Math.floor((nowMs - 40_000) / 1000); const stuckActivityEpoch = Math.floor((nowMs - 40_000) / 1000);
const hbTs = new Date(nowMs - 1_000).toISOString(); const hbTs = new Date(nowMs - 1_000).toISOString();
await writeFile(join(runDir, 'working-agent.hb'), `ts=${hbTs}\npid=111\nstatus=ok\n`); await writeFile(join(runDir, 'idle-agent.hb'), `ts=${hbTs}\npid=111\nstatus=ok\n`);
await writeFile(join(runDir, 'available-agent.hb'), `ts=${hbTs}\npid=222\nstatus=ok\n`); await writeFile(join(runDir, 'stuck-agent.hb'), `ts=${hbTs}\npid=222\nstatus=ok\n`);
const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '5'; process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '5';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '30';
const dateNow = vi.spyOn(Date, 'now').mockReturnValue(nowMs); const dateNow = vi.spyOn(Date, 'now').mockReturnValue(nowMs);
const runner: CommandRunner = async (command, args) => { const runner: CommandRunner = async (command, args) => {
const full = [command, ...args].join(' '); const full = [command, ...args].join(' ');
if (full.includes('list-sessions')) { if (full.includes('list-sessions')) {
return { stdout: 'working-agent\navailable-agent\n', stderr: '', exitCode: 0 }; return { stdout: 'idle-agent\nstuck-agent\n', stderr: '', exitCode: 0 };
} }
if (full.includes('=working-agent:0.0')) { if (full.includes('=idle-agent:0.0')) {
return { stdout: `111 pi 0 ${workingActivityEpoch}\n`, stderr: '', exitCode: 0 }; return { stdout: `111 pi 0 ${idleActivityEpoch}\n`, stderr: '', exitCode: 0 };
} }
if (full.includes('=available-agent:0.0')) { if (full.includes('=stuck-agent:0.0')) {
return { stdout: `222 pi 0 ${availableActivityEpoch}\n`, stderr: '', exitCode: 0 }; return { stdout: `222 pi 0 ${stuckActivityEpoch}\n`, stderr: '', exitCode: 0 };
} }
if (full.includes('systemctl') && full.includes('show')) { if (full.includes('systemctl') && full.includes('show')) {
return { return {
@@ -1636,17 +1584,19 @@ describe('fleet ps — readiness table output', () => {
dateNow.mockRestore(); dateNow.mockRestore();
if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle; else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle;
if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck;
await rm(home, { recursive: true, force: true }); await rm(home, { recursive: true, force: true });
} }
const workingLine = lines.find((line) => line.includes('working-agent')); const idleLine = lines.find((line) => line.includes('idle-agent'));
const availableLine = lines.find((line) => line.includes('available-agent')); const stuckLine = lines.find((line) => line.includes('stuck-agent'));
expect(workingLine).toBeDefined(); expect(idleLine).toBeDefined();
expect(workingLine).toContain('1s/working'); expect(idleLine).toContain('1s/idle');
expect(availableLine).toBeDefined(); expect(idleLine).toMatch(/\bIDLE\b/);
expect(availableLine).toContain('1s/available'); expect(stuckLine).toBeDefined();
expect(availableLine).not.toMatch(/\bIDLE\b/); expect(stuckLine).toContain('1s/stuck');
expect(availableLine).not.toMatch(/\bSTUCK\b/); expect(stuckLine).toMatch(/\bSTUCK\b/);
}); });
}); });

View File

@@ -8,7 +8,6 @@ import * as readline from 'node:readline';
import type { Command } from 'commander'; import type { Command } from 'commander';
import YAML from 'yaml'; import YAML from 'yaml';
import { resolveCommsBlock } from '../fleet/comms-onboarding.js'; import { resolveCommsBlock } from '../fleet/comms-onboarding.js';
import { registerFleetBacklogCommand } from './fleet-backlog.js';
/** /**
* A function that spawns a command with inherited stdio (TTY passthrough). * A function that spawns a command with inherited stdio (TTY passthrough).
@@ -396,6 +395,7 @@ export function buildAgentTailCommand(agentName: string, lines: number, socketNa
export const HEARTBEAT_INTERVAL_MS = 15_000; export const HEARTBEAT_INTERVAL_MS = 15_000;
export const HEARTBEAT_IDLE_THRESHOLD_SECONDS = 300; export const HEARTBEAT_IDLE_THRESHOLD_SECONDS = 300;
export const HEARTBEAT_STUCK_THRESHOLD_SECONDS = 900;
/** /**
* Heartbeat interval in ms, honoring MOSAIC_HEARTBEAT_INTERVAL (seconds) so the * Heartbeat interval in ms, honoring MOSAIC_HEARTBEAT_INTERVAL (seconds) so the
@@ -407,14 +407,20 @@ export function heartbeatIntervalMs(): number {
return Number.isFinite(sec) && sec > 0 ? sec * 1000 : HEARTBEAT_INTERVAL_MS; return Number.isFinite(sec) && sec > 0 ? sec * 1000 : HEARTBEAT_INTERVAL_MS;
} }
/** Activity threshold in seconds, honoring MOSAIC_HEARTBEAT_IDLE_THRESHOLD. */ /** Idle threshold in seconds, honoring MOSAIC_HEARTBEAT_IDLE_THRESHOLD. */
export function idleThresholdSeconds(): number { export function idleThresholdSeconds(): number {
const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD ?? '', 10); const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD ?? '', 10);
return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_IDLE_THRESHOLD_SECONDS; return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_IDLE_THRESHOLD_SECONDS;
} }
/** Stuck threshold in seconds, honoring MOSAIC_HEARTBEAT_STUCK_THRESHOLD. */
export function stuckThresholdSeconds(): number {
const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD ?? '', 10);
return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_STUCK_THRESHOLD_SECONDS;
}
export const HEARTBEAT_HEALTHY_MULTIPLIER = 3; export const HEARTBEAT_HEALTHY_MULTIPLIER = 3;
export type ReadinessState = 'working' | 'available' | 'stuck' | 'stale' | 'dead' | 'unknown'; export type ReadinessState = 'working' | 'idle' | 'stuck' | 'stale' | 'dead' | 'unknown';
export interface ReadinessSignals { export interface ReadinessSignals {
paneAlive: boolean; paneAlive: boolean;
@@ -425,6 +431,7 @@ export interface ReadinessSignals {
export interface ReadinessThresholds { export interface ReadinessThresholds {
idleThresholdSeconds: number; idleThresholdSeconds: number;
stuckThresholdSeconds: number;
} }
/** /**
@@ -449,8 +456,12 @@ export function classifyReadiness(
const idleThreshold = Number.isFinite(thresholds?.idleThresholdSeconds) const idleThreshold = Number.isFinite(thresholds?.idleThresholdSeconds)
? Number(thresholds?.idleThresholdSeconds) ? Number(thresholds?.idleThresholdSeconds)
: idleThresholdSeconds(); : idleThresholdSeconds();
// Follow-up: stuck pending per-agent assignment awareness: assigned task + idle past threshold => stuck. const stuckThreshold = Number.isFinite(thresholds?.stuckThresholdSeconds)
if (idleSeconds >= idleThreshold) return 'available'; ? Number(thresholds?.stuckThresholdSeconds)
: stuckThresholdSeconds();
if (idleSeconds >= stuckThreshold) return 'stuck';
if (idleSeconds >= idleThreshold) return 'idle';
return 'working'; return 'working';
} catch { } catch {
return 'unknown'; return 'unknown';
@@ -513,7 +524,7 @@ export function buildSystemdShowCommand(agentName: string): string[] {
/** /**
* Returns the tmux list-panes command for an agent pane. * Returns the tmux list-panes command for an agent pane.
* Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity} #{window_activity} #{session_activity}` * Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}`
*/ */
export function buildTmuxListPanesCommand(agentName: string, socketName = ''): string[] { export function buildTmuxListPanesCommand(agentName: string, socketName = ''): string[] {
return [ return [
@@ -523,7 +534,7 @@ export function buildTmuxListPanesCommand(agentName: string, socketName = ''): s
'-t', '-t',
`=${agentName}:0.0`, `=${agentName}:0.0`,
'-F', '-F',
'#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity} #{window_activity} #{session_activity}', '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}',
]; ];
} }
@@ -623,8 +634,8 @@ export function parseSystemdShow(output: string): {
} }
/** /**
* Parse the output of `tmux list-panes -F '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity} #{window_activity} #{session_activity}'` * Parse the output of `tmux list-panes -F '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}'`
* Activity fields are Unix epoch timestamps (seconds), ordered most precise to coarsest. * pane_activity is a Unix epoch timestamp (seconds).
*/ */
export function parseTmuxListPanes( export function parseTmuxListPanes(
output: string, output: string,
@@ -634,17 +645,15 @@ export function parseTmuxListPanes(
if (!line) { if (!line) {
return { pid: null, command: null, dead: true, idleSeconds: null }; return { pid: null, command: null, dead: true, idleSeconds: null };
} }
// format: <pid> <command> <dead(0|1)> <pane_activity> <window_activity> <session_activity> // format: <pid> <command> <dead(0|1)> <activity_epoch>
const parts = line.split(' '); const parts = line.split(' ');
const pid = parts[0] ? (Number.isFinite(Number(parts[0])) ? Number(parts[0]) : null) : null; const pid = parts[0] ? (Number.isFinite(Number(parts[0])) ? Number(parts[0]) : null) : null;
const command = parts[1] ?? null; const command = parts[1] ?? null;
const dead = parts[2] === '1'; const dead = parts[2] === '1';
const activityEpoch = parts const activityEpoch = parts[3] ? Number(parts[3]) : NaN;
.slice(3, 6) const idleSeconds =
.map((part) => (part ? Number(part) : NaN)) Number.isFinite(activityEpoch) && activityEpoch > 0
.find((epoch) => Number.isFinite(epoch) && epoch > 0); ? Math.floor((nowMs - activityEpoch * 1000) / 1000)
const idleSeconds = activityEpoch
? Math.max(0, Math.floor((nowMs - activityEpoch * 1000) / 1000))
: null; : null;
return { pid, command, dead, idleSeconds }; return { pid, command, dead, idleSeconds };
} }
@@ -1078,6 +1087,7 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
const rows: AgentPsRow[] = []; const rows: AgentPsRow[] = [];
const readinessThresholds = { const readinessThresholds = {
idleThresholdSeconds: idleThresholdSeconds(), idleThresholdSeconds: idleThresholdSeconds(),
stuckThresholdSeconds: stuckThresholdSeconds(),
}; };
// Build the set of roster agent names for quick lookup when filtering socket sessions. // Build the set of roster agent names for quick lookup when filtering socket sessions.
@@ -1252,6 +1262,8 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
if (!row.managed) flags.push('UNMANAGED'); if (!row.managed) flags.push('UNMANAGED');
if (row.driftFlag) flags.push('DRIFT'); if (row.driftFlag) flags.push('DRIFT');
if (row.bootEnableWarning) flags.push('BOOT-ENABLE'); if (row.bootEnableWarning) flags.push('BOOT-ENABLE');
if (row.readiness === 'idle') flags.push('IDLE');
if (row.readiness === 'stuck') flags.push('STUCK');
console.log( console.log(
[ [
@@ -1415,11 +1427,6 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
console.log(`Removed ${name} from the fleet.`); console.log(`Removed ${name} from the fleet.`);
}); });
// Mosaic-native backlog of record (card A4). Resolves the active --mosaic-home
// (parent flag) at call time so the embedded PGlite store lands under the same
// fleet/ directory as the roster and heartbeats.
registerFleetBacklogCommand(cmd, () => cmd.opts<{ mosaicHome: string }>().mosaicHome);
return cmd; return cmd;
} }

3
pnpm-lock.yaml generated
View File

@@ -540,9 +540,6 @@ importers:
'@mosaicstack/config': '@mosaicstack/config':
specifier: workspace:* specifier: workspace:*
version: link:../config version: link:../config
'@mosaicstack/db':
specifier: workspace:*
version: link:../db
'@mosaicstack/forge': '@mosaicstack/forge':
specifier: workspace:* specifier: workspace:*
version: link:../forge version: link:../forge