/** * Mosaic-native backlog-of-record service (card A4). * * This is the backlog Mosaic owns end-to-end on its OWN Postgres storage layer. * It REPLACES the former Hermes adapter — there is NO runtime dependency on * Hermes here or anywhere downstream. * * The service takes a `Db` handle, so it works identically against: * - `createDb()` — server Postgres (DATABASE_URL / config), and * - `createPgliteDb()` — embedded Postgres (file or in-memory). * Same code, same semantics — PGlite gives real Postgres behaviour (including * row locks), so the atomic-claim path is exercised by the in-memory tests. * * Atomic claim: `claim()` selects the highest-priority, deps-satisfied, ready * card with `SELECT ... FOR UPDATE SKIP LOCKED` and flips it to `claimed` inside * one transaction. Two concurrent claimers can therefore NEVER both win the same * card — the loser's locked row is skipped and it picks the next candidate (or * gets null). */ import { and, asc, desc, eq, sql } from 'drizzle-orm'; import type { Db } from './client.js'; import { backlog } from './schema.js'; export type BacklogStatus = 'ready' | 'claimed' | 'blocked' | 'done'; export interface BacklogCard { id: string; title: string; body: string | null; phase: string | null; priority: number; status: BacklogStatus; dependsOn: string[]; claimOwner: string | null; claimTtlSeconds: number | null; claimedAt: Date | null; attempts: number; idempotencyKey: string | null; acceptance: unknown; createdAt: Date; updatedAt: Date; } export interface CreateCardInput { id: string; title: string; body?: string | null; phase?: string | null; priority?: number; dependsOn?: string[]; acceptance?: unknown; idempotencyKey?: string | null; status?: BacklogStatus; } export interface ListFilter { status?: BacklogStatus; phase?: string; /** When true, return only cards that are `ready` AND have all deps `done`. */ readyOnly?: boolean; } export interface ClaimOptions { owner: string; /** Claim time-to-live in seconds (default 900). */ ttlSeconds?: number; /** Claim a specific card by id instead of the highest-priority ready one. */ id?: string; } export interface ReclaimResult { reclaimed: string[]; } export interface BacklogStats { counts: Record; total: number; oldestReadyAgeSeconds: number | null; expiredClaimCount: number; } export const DEFAULT_CLAIM_TTL_SECONDS = 900; type Row = typeof backlog.$inferSelect; /** * Row shape as returned by the raw `SELECT * ... FOR UPDATE SKIP LOCKED` path. * That path bypasses drizzle's column-name mapping, so JSON columns arrive as * the snake_case `depends_on` (and may be a JSON string under some drivers). */ interface RawRow extends Row { depends_on?: unknown; } function toCard(row: Row): BacklogCard { return { id: row.id, title: row.title, body: row.body, phase: row.phase, priority: row.priority, status: row.status, dependsOn: row.dependsOn ?? [], claimOwner: row.claimOwner, claimTtlSeconds: row.claimTtlSeconds, claimedAt: row.claimedAt, attempts: row.attempts, idempotencyKey: row.idempotencyKey, acceptance: row.acceptance, createdAt: row.createdAt, updatedAt: row.updatedAt, }; } /** * The backlog repository/service. Construct with any `Db` handle. */ export class BacklogService { constructor(private readonly db: Db) {} /** * Create a card. If `idempotencyKey` is provided and a card already exists * with that key, the existing card is returned unchanged (no duplicate). */ async create(input: CreateCardInput): Promise { if (input.idempotencyKey) { const existing = await this.db .select() .from(backlog) .where(eq(backlog.idempotencyKey, input.idempotencyKey)) .limit(1); if (existing[0]) return toCard(existing[0]); } const inserted = await this.db .insert(backlog) .values({ id: input.id, title: input.title, body: input.body ?? null, phase: input.phase ?? null, priority: input.priority ?? 0, status: input.status ?? 'ready', dependsOn: input.dependsOn ?? [], acceptance: input.acceptance ?? null, idempotencyKey: input.idempotencyKey ?? null, }) .returning(); return toCard(inserted[0]!); } /** Fetch a single card by id, or null. */ async get(id: string): Promise { const rows = await this.db.select().from(backlog).where(eq(backlog.id, id)).limit(1); return rows[0] ? toCard(rows[0]) : null; } /** * List cards with optional filters. `readyOnly` enforces the DAG gate: * a card is "ready" only when its own status is `ready` AND every card in * `depends_on` exists and is `done`. */ async list(filter: ListFilter = {}): Promise { const conditions = []; if (filter.status) conditions.push(eq(backlog.status, filter.status)); if (filter.phase) conditions.push(eq(backlog.phase, filter.phase)); const rows = await this.db .select() .from(backlog) .where(conditions.length ? and(...conditions) : undefined) .orderBy(desc(backlog.priority), asc(backlog.createdAt)); const cards = rows.map(toCard); if (!filter.readyOnly) return cards; const doneIds = await this.doneIdSet(); return cards.filter( (c) => c.status === 'ready' && c.dependsOn.every((dep) => doneIds.has(dep)), ); } private async doneIdSet(): Promise> { const done = await this.db .select({ id: backlog.id }) .from(backlog) .where(eq(backlog.status, 'done')); return new Set(done.map((d) => d.id)); } /** * Atomically claim a card. * * Strategy: inside ONE transaction we lock the candidate row with * `FOR UPDATE SKIP LOCKED LIMIT 1`. A concurrent claimer that already holds * the lock on a row has that row skipped for us, so two claimers can never * both win the same card — and, crucially, each claimer locks exactly ONE * row, so concurrent claimers fan out across distinct ready cards instead of * one claimer locking the whole ready set and starving the rest. * * Candidate selection (when no explicit `id`): * - status = 'ready' * - all deps satisfied (every id in depends_on is currently 'done') * - ordered by priority DESC, created_at ASC * * Returns the claimed card, or null if nothing is claimable. */ async claim(opts: ClaimOptions): Promise { const ttl = opts.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS; return this.db.transaction(async (tx) => { // Specific-id path: lock that one ready row (if free) and apply the // deps-satisfied gate in JS, exactly as before. if (opts.id) { const doneRows = await tx .select({ id: backlog.id }) .from(backlog) .where(eq(backlog.status, 'done')); const doneIds = new Set(doneRows.map((r) => r.id)); const result = await tx.execute( sql`SELECT * FROM ${backlog} WHERE ${backlog.id} = ${opts.id} AND ${backlog.status} = 'ready' FOR UPDATE SKIP LOCKED`, ); const candidate = rowsOf(result).find((row) => normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)), ); if (!candidate) return null; const updated = await tx .update(backlog) .set({ status: 'claimed', claimOwner: opts.owner, claimTtlSeconds: ttl, claimedAt: new Date(), attempts: sql`${backlog.attempts} + 1`, updatedAt: new Date(), }) .where(eq(backlog.id, candidate.id)) .returning(); return toCard(updated[0]!); } // No-id path: claim the single highest-priority, deps-satisfied ready // card. We lock exactly ONE row in the inner SELECT (`FOR UPDATE SKIP // LOCKED LIMIT 1`) so concurrent claimers grab distinct cards rather than // one claimer locking every ready row and forcing the others to null. // // The deps-satisfied gate is pushed into SQL so `LIMIT 1` lands on the // next genuinely-eligible card: a card is eligible iff none of its // depends_on ids is absent from the set of 'done' card ids. const updated = await tx.execute( sql`UPDATE ${backlog} SET status = 'claimed', claim_owner = ${opts.owner}, claim_ttl_seconds = ${ttl}, claimed_at = now(), attempts = ${backlog.attempts} + 1, updated_at = now() WHERE ${backlog.id} = ( SELECT b.id FROM ${backlog} AS b WHERE b.status = 'ready' AND NOT EXISTS ( SELECT 1 FROM jsonb_array_elements_text(b.depends_on) AS dep WHERE dep NOT IN ( SELECT d.id FROM ${backlog} AS d WHERE d.status = 'done' ) ) ORDER BY b.priority DESC, b.created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING *`, ); const row = rowsOf(updated)[0]; return row ? toCard(rawToRow(row)) : null; }); } /** * Release expired claims (claimed_at + ttl < now) back to `ready`, OR release * a specific card by id regardless of expiry. Cleared claim fields. * Returns the ids that were released. */ async reclaim(opts: { id?: string } = {}): Promise { if (opts.id) { const released = await this.db .update(backlog) .set({ status: 'ready', claimOwner: null, claimTtlSeconds: null, claimedAt: null, updatedAt: new Date(), }) .where(and(eq(backlog.id, opts.id), eq(backlog.status, 'claimed'))) .returning({ id: backlog.id }); return { reclaimed: released.map((r) => r.id) }; } // Expired = status claimed AND claimed_at + (ttl seconds) < now(). const released = await this.db .update(backlog) .set({ status: 'ready', claimOwner: null, claimTtlSeconds: null, claimedAt: null, updatedAt: new Date(), }) .where( and( eq(backlog.status, 'claimed'), sql`${backlog.claimedAt} + make_interval(secs => ${backlog.claimTtlSeconds}) < now()`, ), ) .returning({ id: backlog.id }); return { reclaimed: released.map((r) => r.id) }; } /** Add a `depends_on` edge (from → depends on → to). Idempotent. */ async link(from: string, to: string): Promise { const card = await this.get(from); if (!card) throw new Error(`backlog card not found: ${from}`); const target = await this.get(to); if (!target) throw new Error(`backlog dependency not found: ${to}`); if (from === to) throw new Error('a card cannot depend on itself'); if (card.dependsOn.includes(to)) return card; const nextDeps = [...card.dependsOn, to]; const updated = await this.db .update(backlog) .set({ dependsOn: nextDeps, updatedAt: new Date() }) .where(eq(backlog.id, from)) .returning(); return toCard(updated[0]!); } /** Mark a card blocked. */ async block(id: string): Promise { return this.setStatus(id, 'blocked'); } /** Mark a card done (releasing any claim). */ async complete(id: string): Promise { const updated = await this.db .update(backlog) .set({ status: 'done', claimOwner: null, claimTtlSeconds: null, claimedAt: null, updatedAt: new Date(), }) .where(eq(backlog.id, id)) .returning(); return updated[0] ? toCard(updated[0]) : null; } private async setStatus(id: string, status: BacklogStatus): Promise { const updated = await this.db .update(backlog) .set({ status, updatedAt: new Date() }) .where(eq(backlog.id, id)) .returning(); return updated[0] ? toCard(updated[0]) : null; } /** Counts by status, oldest-ready age (seconds), and expired-claim count. */ async stats(): Promise { const all = await this.db.select().from(backlog); const counts: Record = { ready: 0, claimed: 0, blocked: 0, done: 0, }; let oldestReady: Date | null = null; let expiredClaimCount = 0; const now = Date.now(); for (const row of all) { counts[row.status] += 1; if (row.status === 'ready') { if (oldestReady === null || row.createdAt < oldestReady) oldestReady = row.createdAt; } if (row.status === 'claimed' && row.claimedAt && row.claimTtlSeconds != null) { const expiry = row.claimedAt.getTime() + row.claimTtlSeconds * 1000; if (expiry < now) expiredClaimCount += 1; } } return { counts, total: all.length, oldestReadyAgeSeconds: oldestReady === null ? null : Math.max(0, Math.floor((now - oldestReady.getTime()) / 1000)), expiredClaimCount, }; } } /** Extract rows from a drizzle `.execute()` result across drivers (pg / pglite). */ function rowsOf(result: unknown): RawRow[] { if (Array.isArray(result)) return result as RawRow[]; const maybe = result as { rows?: unknown }; if (maybe && Array.isArray(maybe.rows)) return maybe.rows as RawRow[]; return []; } /** * Map a raw `RETURNING *` row (snake_case columns, possibly string-encoded * timestamps/JSON depending on the driver) onto the drizzle `Row` shape that * `toCard` consumes. Mirrors the column ↔ property mapping in `schema.ts`. */ function rawToRow(raw: RawRow): Row { const r = raw as unknown as Record; const toDate = (v: unknown): Date => (v instanceof Date ? v : new Date(v as string)); return { id: r.id as string, title: r.title as string, body: (r.body ?? null) as string | null, phase: (r.phase ?? null) as string | null, priority: Number(r.priority), status: r.status as BacklogStatus, dependsOn: normalizeDeps(r.depends_on), claimOwner: (r.claim_owner ?? null) as string | null, claimTtlSeconds: r.claim_ttl_seconds == null ? null : Number(r.claim_ttl_seconds), claimedAt: r.claimed_at == null ? null : toDate(r.claimed_at), attempts: Number(r.attempts), idempotencyKey: (r.idempotency_key ?? null) as string | null, acceptance: r.acceptance ?? null, createdAt: toDate(r.created_at), updatedAt: toDate(r.updated_at), }; } /** A raw SQL row returns snake_case `depends_on`; normalize to string[]. */ function normalizeDeps(value: unknown): string[] { if (Array.isArray(value)) return value as string[]; if (typeof value === 'string') { try { const parsed = JSON.parse(value); return Array.isArray(parsed) ? (parsed as string[]) : []; } catch { return []; } } return []; }