Files
mosaic/packages/coord/src/tasks-file.ts
Jason Woltje 2c9359a570 feat(wave3): add @mosaic/coord — TypeScript orchestrator package
- Mission CRUD: createMission, loadMission
- TASKS.md parser/writer: parseTasksFile, writeTasksFile, updateTaskStatus
- Task runner: runTask, resumeTask (spawns agent via child_process)
- Status aggregation: getMissionStatus, getTaskStatus
- CLI: coord init | run | resume | status
- Depends on @mosaic/types and @mosaic/queue workspace:*
2026-03-06 19:31:53 -06:00

379 lines
9.7 KiB
TypeScript

import { promises as fs } from 'node:fs';
import path from 'node:path';
import type { Mission, MissionTask, TaskStatus } from './types.js';
import { normalizeTaskStatus } from './types.js';
const TASKS_LOCK_FILE = '.TASKS.md.lock';
const TASKS_LOCK_STALE_MS = 5 * 60 * 1000;
const TASKS_LOCK_WAIT_MS = 5 * 1000;
const TASKS_LOCK_RETRY_MS = 100;
const DEFAULT_TABLE_HEADER = [
'| id | status | milestone | description | pr | notes |',
'|----|--------|-----------|-------------|----|-------|',
] as const;
const DEFAULT_TASKS_PREAMBLE = [
'# Tasks',
'',
'> Single-writer: orchestrator only. Workers read but never modify.',
'',
...DEFAULT_TABLE_HEADER,
] as const;
interface ParsedTableRow {
readonly lineIndex: number;
readonly cells: string[];
}
interface ParsedTable {
readonly headerLineIndex: number;
readonly separatorLineIndex: number;
readonly headers: string[];
readonly rows: ParsedTableRow[];
readonly idColumn: number;
readonly statusColumn: number;
}
function normalizeHeaderName(input: string): string {
return input.trim().toLowerCase();
}
function splitMarkdownRow(line: string): string[] {
const trimmed = line.trim();
if (!trimmed.startsWith('|')) {
return [];
}
const parts = trimmed.split(/(?<!\\)\|/);
if (parts.length < 3) {
return [];
}
return parts.slice(1, -1).map((part) => part.trim().replace(/\\\|/g, '|'));
}
function isSeparatorRow(cells: readonly string[]): boolean {
return (
cells.length > 0 &&
cells.every((cell) => {
const value = cell.trim();
return /^:?-{3,}:?$/.test(value);
})
);
}
function parseTable(content: string): ParsedTable | undefined {
const lines = content.split(/\r?\n/);
let headerLineIndex = -1;
let separatorLineIndex = -1;
let headers: string[] = [];
for (let index = 0; index < lines.length; index += 1) {
const cells = splitMarkdownRow(lines[index]);
if (cells.length === 0) {
continue;
}
const normalized = cells.map(normalizeHeaderName);
if (!normalized.includes('id') || !normalized.includes('status')) {
continue;
}
if (index + 1 >= lines.length) {
continue;
}
const separatorCells = splitMarkdownRow(lines[index + 1]);
if (!isSeparatorRow(separatorCells)) {
continue;
}
headerLineIndex = index;
separatorLineIndex = index + 1;
headers = normalized;
break;
}
if (headerLineIndex < 0 || separatorLineIndex < 0) {
return undefined;
}
const idColumn = headers.indexOf('id');
const statusColumn = headers.indexOf('status');
if (idColumn < 0 || statusColumn < 0) {
return undefined;
}
const rows: ParsedTableRow[] = [];
let sawData = false;
for (let index = separatorLineIndex + 1; index < lines.length; index += 1) {
const rawLine = lines[index];
const trimmed = rawLine.trim();
if (!trimmed.startsWith('|')) {
if (sawData) {
break;
}
continue;
}
const cells = splitMarkdownRow(rawLine);
if (cells.length === 0) {
if (sawData) {
break;
}
continue;
}
sawData = true;
const normalizedRow = [...cells];
while (normalizedRow.length < headers.length) {
normalizedRow.push('');
}
rows.push({ lineIndex: index, cells: normalizedRow });
}
return {
headerLineIndex,
separatorLineIndex,
headers,
rows,
idColumn,
statusColumn,
};
}
function escapeTableCell(value: string): string {
return value.replace(/\|/g, '\\|').replace(/\r?\n/g, ' ').trim();
}
function formatTableRow(cells: readonly string[]): string {
const escaped = cells.map((cell) => escapeTableCell(cell));
return `| ${escaped.join(' | ')} |`;
}
function parseDependencies(raw: string | undefined): string[] {
if (raw === undefined || raw.trim().length === 0) {
return [];
}
return raw
.split(',')
.map((value) => value.trim())
.filter((value) => value.length > 0);
}
function resolveTasksFilePath(mission: Mission): string {
if (path.isAbsolute(mission.tasksFile)) {
return mission.tasksFile;
}
return path.join(mission.projectPath, mission.tasksFile);
}
function isNodeErrorWithCode(error: unknown, code: string): boolean {
return (
typeof error === 'object' &&
error !== null &&
'code' in error &&
(error as { code?: string }).code === code
);
}
async function delay(ms: number): Promise<void> {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function acquireLock(lockPath: string): Promise<void> {
const startedAt = Date.now();
while (Date.now() - startedAt < TASKS_LOCK_WAIT_MS) {
try {
const handle = await fs.open(lockPath, 'wx');
await handle.writeFile(
JSON.stringify(
{
pid: process.pid,
acquiredAt: new Date().toISOString(),
},
null,
2,
),
);
await handle.close();
return;
} catch (error) {
if (!isNodeErrorWithCode(error, 'EEXIST')) {
throw error;
}
try {
const stats = await fs.stat(lockPath);
if (Date.now() - stats.mtimeMs > TASKS_LOCK_STALE_MS) {
await fs.rm(lockPath, { force: true });
continue;
}
} catch (statError) {
if (!isNodeErrorWithCode(statError, 'ENOENT')) {
throw statError;
}
}
await delay(TASKS_LOCK_RETRY_MS);
}
}
throw new Error(`Timed out acquiring TASKS lock: ${lockPath}`);
}
async function releaseLock(lockPath: string): Promise<void> {
await fs.rm(lockPath, { force: true });
}
async function writeAtomic(filePath: string, content: string): Promise<void> {
const directory = path.dirname(filePath);
const tempPath = path.join(
directory,
`.TASKS.md.tmp-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`,
);
await fs.writeFile(tempPath, content, 'utf8');
await fs.rename(tempPath, filePath);
}
export function parseTasksFile(content: string): MissionTask[] {
const parsedTable = parseTable(content);
if (parsedTable === undefined) {
return [];
}
const headerToColumn = new Map<string, number>();
parsedTable.headers.forEach((header, index) => {
headerToColumn.set(header, index);
});
const descriptionColumn =
headerToColumn.get('description') ?? headerToColumn.get('title') ?? -1;
const milestoneColumn = headerToColumn.get('milestone') ?? -1;
const prColumn = headerToColumn.get('pr') ?? -1;
const notesColumn = headerToColumn.get('notes') ?? -1;
const assigneeColumn = headerToColumn.get('assignee') ?? -1;
const dependenciesColumn = headerToColumn.get('dependencies') ?? -1;
const tasks: MissionTask[] = [];
for (const row of parsedTable.rows) {
const id = row.cells[parsedTable.idColumn]?.trim();
if (id === undefined || id.length === 0) {
continue;
}
const rawStatusValue = row.cells[parsedTable.statusColumn] ?? '';
const normalized = normalizeTaskStatus(rawStatusValue);
const title = descriptionColumn >= 0 ? row.cells[descriptionColumn] ?? '' : '';
const milestone = milestoneColumn >= 0 ? row.cells[milestoneColumn] ?? '' : '';
const pr = prColumn >= 0 ? row.cells[prColumn] ?? '' : '';
const notes = notesColumn >= 0 ? row.cells[notesColumn] ?? '' : '';
const assignee = assigneeColumn >= 0 ? row.cells[assigneeColumn] ?? '' : '';
const dependenciesRaw =
dependenciesColumn >= 0 ? row.cells[dependenciesColumn] ?? '' : '';
tasks.push({
id,
title,
status: normalized.status,
dependencies: parseDependencies(dependenciesRaw),
milestone: milestone.length > 0 ? milestone : undefined,
pr: pr.length > 0 ? pr : undefined,
notes: notes.length > 0 ? notes : undefined,
assignee: assignee.length > 0 ? assignee : undefined,
rawStatus: normalized.rawStatus,
line: row.lineIndex + 1,
});
}
return tasks;
}
export function writeTasksFile(tasks: MissionTask[]): string {
const lines: string[] = [...DEFAULT_TASKS_PREAMBLE];
for (const task of tasks) {
lines.push(
formatTableRow([
task.id,
task.status,
task.milestone ?? '',
task.title,
task.pr ?? '',
task.notes ?? '',
]),
);
}
return `${lines.join('\n')}\n`;
}
export async function updateTaskStatus(
mission: Mission,
taskId: string,
status: TaskStatus,
): Promise<void> {
const tasksFilePath = resolveTasksFilePath(mission);
const lockPath = path.join(path.dirname(tasksFilePath), TASKS_LOCK_FILE);
await fs.mkdir(path.dirname(tasksFilePath), { recursive: true });
await acquireLock(lockPath);
try {
let content: string;
try {
content = await fs.readFile(tasksFilePath, 'utf8');
} catch (error) {
if (isNodeErrorWithCode(error, 'ENOENT')) {
throw new Error(`TASKS file not found: ${tasksFilePath}`);
}
throw error;
}
const table = parseTable(content);
if (table === undefined) {
throw new Error(`Could not parse TASKS table in ${tasksFilePath}`);
}
const matchingRows = table.rows.filter((row) => {
const rowTaskId = row.cells[table.idColumn]?.trim();
return rowTaskId === taskId;
});
if (matchingRows.length === 0) {
throw new Error(`Task not found in TASKS.md: ${taskId}`);
}
if (matchingRows.length > 1) {
throw new Error(`Duplicate task IDs found in TASKS.md: ${taskId}`);
}
const targetRow = matchingRows[0];
const updatedCells = [...targetRow.cells];
updatedCells[table.statusColumn] = status;
const lines = content.split(/\r?\n/);
lines[targetRow.lineIndex] = formatTableRow(updatedCells);
const updatedContent = `${lines.join('\n').replace(/\n+$/, '')}\n`;
await writeAtomic(tasksFilePath, updatedContent);
} finally {
await releaseLock(lockPath);
}
}