add drain-mode orchestration and docs/tasks sync for codex-opencode

This commit is contained in:
Jason Woltje
2026-02-17 14:41:47 -06:00
parent c7f363b2d2
commit 0ff39bcee4
15 changed files with 560 additions and 46 deletions

View File

@@ -139,6 +139,8 @@ Run from a bootstrapped repo:
```bash ```bash
~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle ~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle
~/.config/mosaic/bin/mosaic-orchestrator-run --once ~/.config/mosaic/bin/mosaic-orchestrator-run --once
~/.config/mosaic/bin/mosaic-orchestrator-sync-tasks --apply
~/.config/mosaic/bin/mosaic-orchestrator-drain
~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10
~/.config/mosaic/bin/mosaic-orchestrator-matrix-publish ~/.config/mosaic/bin/mosaic-orchestrator-matrix-publish
~/.config/mosaic/bin/mosaic-orchestrator-matrix-consume ~/.config/mosaic/bin/mosaic-orchestrator-matrix-consume
@@ -147,6 +149,14 @@ Run from a bootstrapped repo:
The controller reads/writes repo-local state in `.mosaic/orchestrator/` and emits The controller reads/writes repo-local state in `.mosaic/orchestrator/` and emits
structured events to `.mosaic/orchestrator/events.ndjson` for Matrix bridge consumption. structured events to `.mosaic/orchestrator/events.ndjson` for Matrix bridge consumption.
If your runtime command differs, set:
```bash
export MOSAIC_WORKER_EXEC="codex -p"
# or
export MOSAIC_WORKER_EXEC="opencode -p"
```
## Bootstrap Any Repo (Slave Linkage) ## Bootstrap Any Repo (Slave Linkage)
Attach any repository/workspace to the master layer: Attach any repository/workspace to the master layer:

View File

@@ -104,7 +104,8 @@ fi
echo "[mosaic] Repo bootstrap complete: $TARGET_DIR" echo "[mosaic] Repo bootstrap complete: $TARGET_DIR"
echo "[mosaic] Next: edit $TARGET_DIR/.mosaic/repo-hooks.sh with project workflows" echo "[mosaic] Next: edit $TARGET_DIR/.mosaic/repo-hooks.sh with project workflows"
echo "[mosaic] Optional: apply quality rails via ~/.config/mosaic/bin/mosaic-quality-apply --template <template> --target $TARGET_DIR" echo "[mosaic] Optional: apply quality rails via ~/.config/mosaic/bin/mosaic-quality-apply --template <template> --target $TARGET_DIR"
echo "[mosaic] Optional: run orchestrator rail via ~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle" echo "[mosaic] Optional: run orchestrator rail via ~/.config/mosaic/bin/mosaic-orchestrator-drain"
echo "[mosaic] Optional: run detached orchestrator via bash $TARGET_DIR/scripts/agent/orchestrator-daemon.sh start"
if [[ -n "$QUALITY_TEMPLATE" ]]; then if [[ -n "$QUALITY_TEMPLATE" ]]; then
if [[ -x "$MOSAIC_HOME/bin/mosaic-quality-apply" ]]; then if [[ -x "$MOSAIC_HOME/bin/mosaic-quality-apply" ]]; then

View File

@@ -126,10 +126,13 @@ expect_file "$MOSAIC_HOME/bin/mosaic-sync-skills"
expect_file "$MOSAIC_HOME/bin/mosaic-quality-apply" expect_file "$MOSAIC_HOME/bin/mosaic-quality-apply"
expect_file "$MOSAIC_HOME/bin/mosaic-quality-verify" expect_file "$MOSAIC_HOME/bin/mosaic-quality-verify"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-run" expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-run"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-sync-tasks"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-drain"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-publish" expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-publish"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-consume" expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-consume"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-cycle" expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-cycle"
expect_file "$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py" expect_file "$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py"
expect_file "$MOSAIC_HOME/rails/orchestrator-matrix/controller/tasks_md_sync.py"
# Claude runtime file checks (copied, non-symlink). # Claude runtime file checks (copied, non-symlink).
for rf in CLAUDE.md settings.json hooks-config.json context7-integration.md; do for rf in CLAUDE.md settings.json hooks-config.json context7-integration.md; do

33
bin/mosaic-orchestrator-drain Executable file
View File

@@ -0,0 +1,33 @@
#!/usr/bin/env bash
set -euo pipefail
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.config/mosaic}"
sync_cmd="$MOSAIC_HOME/bin/mosaic-orchestrator-sync-tasks"
run_cmd="$MOSAIC_HOME/bin/mosaic-orchestrator-run"
do_sync=1
poll_sec=15
extra_args=()
while [[ $# -gt 0 ]]; do
case "$1" in
--no-sync)
do_sync=0
shift
;;
--poll-sec)
poll_sec="${2:-15}"
shift 2
;;
*)
extra_args+=("$1")
shift
;;
esac
done
if [[ $do_sync -eq 1 ]]; then
"$sync_cmd" --apply
fi
exec "$run_cmd" --until-drained --poll-sec "$poll_sec" "${extra_args[@]}"

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.config/mosaic}"
SYNC="$MOSAIC_HOME/rails/orchestrator-matrix/controller/tasks_md_sync.py"
if [[ ! -f "$SYNC" ]]; then
echo "[mosaic-orchestrator-sync] missing sync script: $SYNC" >&2
exit 1
fi
exec python3 "$SYNC" --repo "$(pwd)" "$@"

View File

@@ -52,6 +52,8 @@ Matrix rail mode commands:
```bash ```bash
~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle ~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle
~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10
~/.config/mosaic/bin/mosaic-orchestrator-sync-tasks --apply
~/.config/mosaic/bin/mosaic-orchestrator-drain
``` ```
In Matrix rail mode, keep `docs/tasks.md` as canonical project tracking and use In Matrix rail mode, keep `docs/tasks.md` as canonical project tracking and use

View File

@@ -37,6 +37,7 @@ From a bootstrapped repo:
```bash ```bash
~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle ~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle
~/.config/mosaic/bin/mosaic-orchestrator-run --once ~/.config/mosaic/bin/mosaic-orchestrator-run --once
~/.config/mosaic/bin/mosaic-orchestrator-drain
``` ```
Continuous loop: Continuous loop:
@@ -45,6 +46,20 @@ Continuous loop:
~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10
``` ```
Sync from `docs/tasks.md` to queue:
```bash
~/.config/mosaic/bin/mosaic-orchestrator-sync-tasks --apply
```
Set worker command when needed:
```bash
export MOSAIC_WORKER_EXEC="codex -p"
# or
export MOSAIC_WORKER_EXEC="opencode -p"
```
Publish new orchestrator events to Matrix: Publish new orchestrator events to Matrix:
```bash ```bash

View File

@@ -65,7 +65,7 @@ def emit_event(
) )
def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path) -> tuple[int, str]: def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]:
log_path.parent.mkdir(parents=True, exist_ok=True) log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a", encoding="utf-8") as log: with log_path.open("a", encoding="utf-8") as log:
log.write(f"\n[{now_iso()}] COMMAND: {command}\n") log.write(f"\n[{now_iso()}] COMMAND: {command}\n")
@@ -78,14 +78,21 @@ def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path) -> tuple[
text=True, text=True,
encoding="utf-8", encoding="utf-8",
) )
output_chunks: list[str] = [] timed_out = False
assert proc.stdout is not None try:
for line in proc.stdout: output, _ = proc.communicate(timeout=max(1, timeout_sec))
output_chunks.append(line) code = proc.returncode
log.write(line) except subprocess.TimeoutExpired:
code = proc.wait() timed_out = True
proc.kill()
output, _ = proc.communicate()
code = 124
log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n")
if output:
log.write(output)
log.write(f"[{now_iso()}] EXIT: {code}\n") log.write(f"[{now_iso()}] EXIT: {code}\n")
return code, "".join(output_chunks) return code, output or "", timed_out
def render_command_template(template: str, task: dict[str, Any], task_file: pathlib.Path) -> str: def render_command_template(template: str, task: dict[str, Any], task_file: pathlib.Path) -> str:
@@ -96,9 +103,26 @@ def render_command_template(template: str, task: dict[str, Any], task_file: path
) )
def parse_dep_list(raw: Any) -> list[str]:
if isinstance(raw, list):
return [str(x).strip() for x in raw if str(x).strip()]
if isinstance(raw, str):
return [x.strip() for x in raw.split(",") if x.strip()]
return []
def is_completed_status(status: str) -> bool:
return status in {"completed", "done"}
def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None: def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None:
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks}
for task in tasks: for task in tasks:
if task.get("status", "pending") == "pending": if task.get("status", "pending") != "pending":
continue
deps = parse_dep_list(task.get("depends_on"))
deps_ready = all(is_completed_status(status_by_id.get(dep, "")) for dep in deps)
if deps_ready:
return task return task
return None return None
@@ -120,6 +144,10 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
return False return False
task_id = str(task.get("id", "unknown-task")) task_id = str(task.get("id", "unknown-task"))
max_attempts = int(task.get("max_attempts") or config.get("worker", {}).get("max_attempts") or 1)
attempt = int(task.get("attempts", 0)) + 1
task["attempts"] = attempt
task["max_attempts"] = max_attempts
task["status"] = "running" task["status"] = "running"
task["started_at"] = now_iso() task["started_at"] = now_iso()
save_json(tasks_path, {"tasks": task_items}) save_json(tasks_path, {"tasks": task_items})
@@ -153,17 +181,33 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
save_json(state_path, state) save_json(state_path, state)
return True return True
rc, _ = run_shell(cmd, repo_root, log_path) timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200)
rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
if rc != 0: if rc != 0:
task["status"] = "failed" task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
task["failed_at"] = now_iso() if attempt < max_attempts:
task["error"] = f"Worker command failed with exit code {rc}" task["status"] = "pending"
task["last_failed_at"] = now_iso()
emit_event(
events_path,
"task.retry.scheduled",
task_id,
"pending",
"worker",
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
)
else:
task["status"] = "failed"
task["failed_at"] = now_iso()
emit_event(events_path, "task.failed", task_id, "failed", "worker", task["error"])
save_json(tasks_path, {"tasks": task_items}) save_json(tasks_path, {"tasks": task_items})
emit_event(events_path, "task.failed", task_id, "failed", "worker", task["error"])
state["running_task_id"] = None state["running_task_id"] = None
state["updated_at"] = now_iso() state["updated_at"] = now_iso()
save_json(state_path, state) save_json(state_path, state)
save_json(results_dir / f"{task_id}.json", {"task_id": task_id, "status": "failed", "exit_code": rc}) save_json(
results_dir / f"{task_id}.json",
{"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts},
)
return True return True
gates = task.get("quality_gates") or config.get("quality_gates") or [] gates = task.get("quality_gates") or config.get("quality_gates") or []
@@ -174,7 +218,7 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
if not gate_cmd: if not gate_cmd:
continue continue
emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}") emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}")
gate_rc, _ = run_shell(gate_cmd, repo_root, log_path) gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec)
if gate_rc == 0: if gate_rc == 0:
emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}") emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}")
else: else:
@@ -185,7 +229,7 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
task_id, task_id,
"failed", "failed",
"quality-gate", "quality-gate",
f"Gate failed ({gate_rc}): {gate_cmd}", f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}",
) )
gate_results.append({"command": gate_cmd, "exit_code": gate_rc}) gate_results.append({"command": gate_cmd, "exit_code": gate_rc})
@@ -194,10 +238,22 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
task["completed_at"] = now_iso() task["completed_at"] = now_iso()
emit_event(events_path, "task.completed", task_id, "completed", "controller", "Task completed") emit_event(events_path, "task.completed", task_id, "completed", "controller", "Task completed")
else: else:
task["status"] = "failed"
task["failed_at"] = now_iso()
task["error"] = "One or more quality gates failed" task["error"] = "One or more quality gates failed"
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) if attempt < max_attempts:
task["status"] = "pending"
task["last_failed_at"] = now_iso()
emit_event(
events_path,
"task.retry.scheduled",
task_id,
"pending",
"controller",
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
)
else:
task["status"] = "failed"
task["failed_at"] = now_iso()
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
save_json(tasks_path, {"tasks": task_items}) save_json(tasks_path, {"tasks": task_items})
state["running_task_id"] = None state["running_task_id"] = None
@@ -216,10 +272,33 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
return True return True
def queue_state(orch_dir: pathlib.Path) -> dict[str, int]:
tasks = load_json(orch_dir / "tasks.json", {"tasks": []})
task_items = tasks.get("tasks", [])
if not isinstance(task_items, list):
return {"pending": 0, "running": 0, "runnable": 0}
pending = 0
running = 0
runnable = 0
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items}
for task in task_items:
status = str(task.get("status", "pending"))
if status == "pending":
pending += 1
deps = parse_dep_list(task.get("depends_on"))
if all(is_completed_status(status_by_id.get(dep, "")) for dep in deps):
runnable += 1
if status == "running":
running += 1
return {"pending": pending, "running": running, "runnable": runnable}
def main() -> int: def main() -> int:
parser = argparse.ArgumentParser(description="Mosaic deterministic orchestrator controller") parser = argparse.ArgumentParser(description="Mosaic deterministic orchestrator controller")
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)") parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
parser.add_argument("--once", action="store_true", help="Process at most one pending task and exit") parser.add_argument("--once", action="store_true", help="Process at most one pending task and exit")
parser.add_argument("--until-drained", action="store_true", help="Run until no pending tasks remain (or blocked)")
parser.add_argument("--poll-sec", type=int, default=10, help="Polling interval for continuous mode") parser.add_argument("--poll-sec", type=int, default=10, help="Polling interval for continuous mode")
args = parser.parse_args() args = parser.parse_args()
@@ -246,6 +325,14 @@ def main() -> int:
try: try:
processed = run_single_task(repo_root, orch_dir, config) processed = run_single_task(repo_root, orch_dir, config)
if not processed: if not processed:
qs = queue_state(orch_dir)
if args.until_drained:
if qs["pending"] == 0 and qs["running"] == 0:
print("[mosaic-orchestrator] drained: no pending tasks")
return 0
if qs["pending"] > 0 and qs["runnable"] == 0 and qs["running"] == 0:
print("[mosaic-orchestrator] blocked: pending tasks remain but dependencies are unmet", file=sys.stderr)
return 2
time.sleep(max(1, args.poll_sec)) time.sleep(max(1, args.poll_sec))
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n[mosaic-orchestrator] stopping") print("\n[mosaic-orchestrator] stopping")

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""Sync docs/tasks.md rows into .mosaic/orchestrator/tasks.json."""
from __future__ import annotations
import argparse
import json
import os
import pathlib
from typing import Any
def load_json(path: pathlib.Path, default: Any) -> Any:
if not path.exists():
return default
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: pathlib.Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
tmp.replace(path)
def split_pipe_row(line: str) -> list[str]:
row = line.strip()
if row.startswith("|"):
row = row[1:]
if row.endswith("|"):
row = row[:-1]
return [c.strip() for c in row.split("|")]
def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
if not path.exists():
return []
lines = path.read_text(encoding="utf-8").splitlines()
header_idx = -1
headers: list[str] = []
for i, line in enumerate(lines):
if "|" not in line:
continue
cells = [x.lower() for x in split_pipe_row(line)]
if "id" in cells and "status" in cells and "description" in cells:
header_idx = i
headers = cells
break
if header_idx < 0:
return []
rows: list[dict[str, str]] = []
for line in lines[header_idx + 2 :]:
if not line.strip().startswith("|"):
if rows:
break
continue
cells = split_pipe_row(line)
if len(cells) < len(headers):
cells += [""] * (len(headers) - len(cells))
row = {headers[i]: cells[i] for i in range(len(headers))}
task_id = row.get("id", "").strip()
if not task_id or task_id.lower() == "id":
continue
rows.append(row)
return rows
def map_status(raw: str) -> str:
value = raw.strip().lower()
mapping = {
"not-started": "pending",
"todo": "pending",
"pending": "pending",
"in-progress": "pending",
"needs-qa": "pending",
"done": "completed",
"completed": "completed",
"failed": "failed",
}
return mapping.get(value, "pending")
def parse_depends(raw: str) -> list[str]:
return [x.strip() for x in raw.split(",") if x.strip()]
def build_task(row: dict[str, str], existing: dict[str, Any], runtime_default: str) -> dict[str, Any]:
task_id = row.get("id", "").strip()
description = row.get("description", "").strip()
issue = row.get("issue", "").strip()
repo = row.get("repo", "").strip()
branch = row.get("branch", "").strip()
depends_on = parse_depends(row.get("depends_on", ""))
task = dict(existing)
task["id"] = task_id
task["title"] = description or task_id
task["description"] = description
task["status"] = map_status(row.get("status", "pending"))
task["depends_on"] = depends_on
task["runtime"] = str(task.get("runtime") or runtime_default or "codex")
task["command"] = str(task.get("command") or "")
task["quality_gates"] = task.get("quality_gates") or []
metadata = dict(task.get("metadata") or {})
metadata.update(
{
"source": "docs/tasks.md",
"issue": issue,
"repo": repo,
"branch": branch,
}
)
task["metadata"] = metadata
return task
def main() -> int:
parser = argparse.ArgumentParser(description="Sync docs/tasks.md into .mosaic/orchestrator/tasks.json")
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
parser.add_argument("--docs", default="docs/tasks.md", help="Path to tasks markdown (repo-relative)")
parser.add_argument(
"--tasks-json",
default=".mosaic/orchestrator/tasks.json",
help="Path to orchestrator tasks JSON (repo-relative)",
)
parser.add_argument("--keep-unlisted", action="store_true", help="Retain tasks already in JSON but missing from docs/tasks.md")
parser.add_argument("--apply", action="store_true", help="Write changes (default is dry-run)")
args = parser.parse_args()
repo = pathlib.Path(args.repo).resolve()
docs_path = (repo / args.docs).resolve()
tasks_path = (repo / args.tasks_json).resolve()
config_path = repo / ".mosaic" / "orchestrator" / "config.json"
config = load_json(config_path, {})
runtime_default = str(config.get("worker", {}).get("runtime") or "codex")
rows = parse_tasks_markdown(docs_path)
existing_payload = load_json(tasks_path, {"tasks": []})
existing_tasks = existing_payload.get("tasks", [])
if not isinstance(existing_tasks, list):
existing_tasks = []
existing_by_id = {str(t.get("id", "")): t for t in existing_tasks}
out_tasks: list[dict[str, Any]] = []
seen: set[str] = set()
for row in rows:
task_id = row.get("id", "").strip()
if not task_id:
continue
seen.add(task_id)
out_tasks.append(build_task(row, existing_by_id.get(task_id, {}), runtime_default))
if args.keep_unlisted:
for task in existing_tasks:
task_id = str(task.get("id", ""))
if task_id and task_id not in seen:
out_tasks.append(task)
payload = {"tasks": out_tasks}
if args.apply:
save_json(tasks_path, payload)
print(f"[mosaic-orchestrator-sync] wrote {len(out_tasks)} tasks -> {tasks_path}")
else:
print(f"[mosaic-orchestrator-sync] dry-run: {len(out_tasks)} tasks would be written -> {tasks_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -24,5 +24,6 @@ This file is an OpenCode adapter layer. It does not replace project guidance.
3. Queue work items in `.mosaic/orchestrator/tasks.json` (or via Matrix command ingestion). 3. Queue work items in `.mosaic/orchestrator/tasks.json` (or via Matrix command ingestion).
4. Run deterministic ticks: 4. Run deterministic ticks:
- `~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle` - `~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle`
- or drain to completion: `~/.config/mosaic/bin/mosaic-orchestrator-drain`
This preserves rails enforcement (`quality_gates`) even without runtime-native subagent features. This preserves rails enforcement (`quality_gates`) even without runtime-native subagent features.

View File

@@ -4,13 +4,13 @@ This repository is attached to the machine-wide Mosaic framework.
## Load Order for Agents ## Load Order for Agents
1. `~/.mosaic/STANDARDS.md` 1. `~/.config/mosaic/STANDARDS.md`
2. `AGENTS.md` (this repository) 2. `AGENTS.md` (this repository)
3. `.mosaic/repo-hooks.sh` (repo-specific automation hooks) 3. `.mosaic/repo-hooks.sh` (repo-specific automation hooks)
## Purpose ## Purpose
- Keep universal standards in `~/.mosaic` - Keep universal standards in `~/.config/mosaic`
- Keep repo-specific behavior in this repo - Keep repo-specific behavior in this repo
- Avoid copying large runtime configs into each project - Avoid copying large runtime configs into each project
@@ -21,13 +21,13 @@ Use `.mosaic/quality-rails.yml` to track whether quality rails are enabled for t
Apply a template: Apply a template:
```bash ```bash
~/.mosaic/bin/mosaic-quality-apply --template <template> --target . ~/.config/mosaic/bin/mosaic-quality-apply --template <template> --target .
``` ```
Verify enforcement: Verify enforcement:
```bash ```bash
~/.mosaic/bin/mosaic-quality-verify --target . ~/.config/mosaic/bin/mosaic-quality-verify --target .
``` ```
## Optional Matrix Orchestrator Rail ## Optional Matrix Orchestrator Rail
@@ -37,19 +37,42 @@ Repo-local orchestrator state lives in `.mosaic/orchestrator/`.
Run one cycle: Run one cycle:
```bash ```bash
~/.mosaic/bin/mosaic-orchestrator-matrix-cycle ~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle
~/.mosaic/bin/mosaic-orchestrator-run --once ~/.config/mosaic/bin/mosaic-orchestrator-run --once
``` ```
Run continuously: Run continuously:
```bash ```bash
~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10
``` ```
Bridge events to Matrix: Bridge events to Matrix:
```bash ```bash
~/.mosaic/bin/mosaic-orchestrator-matrix-publish ~/.config/mosaic/bin/mosaic-orchestrator-matrix-publish
~/.mosaic/bin/mosaic-orchestrator-matrix-consume ~/.config/mosaic/bin/mosaic-orchestrator-matrix-consume
```
Run until queue is drained (syncs from `docs/tasks.md` first):
```bash
~/.config/mosaic/bin/mosaic-orchestrator-drain
```
Set worker command if auto-detect does not match your CLI:
```bash
export MOSAIC_WORKER_EXEC="codex -p"
# or
export MOSAIC_WORKER_EXEC="opencode -p"
```
Use repo helper (foreground or detached):
```bash
bash scripts/agent/orchestrator-daemon.sh drain
bash scripts/agent/orchestrator-daemon.sh start
bash scripts/agent/orchestrator-daemon.sh status
bash scripts/agent/orchestrator-daemon.sh stop
``` ```

View File

@@ -10,8 +10,9 @@
}, },
"worker": { "worker": {
"runtime": "codex", "runtime": "codex",
"command_template": "", "command_template": "bash scripts/agent/orchestrator-worker.sh {task_file}",
"timeout_seconds": 7200 "timeout_seconds": 7200,
"max_attempts": 1
}, },
"quality_gates": [ "quality_gates": [
"pnpm lint", "pnpm lint",

View File

@@ -1,16 +1,3 @@
{ {
"tasks": [ "tasks": []
{
"id": "EXAMPLE-001",
"title": "Example orchestrator task",
"description": "Replace this with a real task and command",
"status": "pending",
"runtime": "codex",
"command": "",
"quality_gates": [],
"metadata": {
"source": "bootstrap-template"
}
}
]
} }

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=./common.sh
source "$SCRIPT_DIR/common.sh"
ensure_repo_root
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.config/mosaic}"
ORCH_DIR=".mosaic/orchestrator"
PID_FILE="$ORCH_DIR/orchestrator.pid"
LOG_FILE="$ORCH_DIR/logs/daemon.log"
usage() {
cat <<USAGE
Usage: $(basename "$0") <start|drain|stop|status> [--poll-sec N] [--no-sync]
Commands:
start Run orchestrator drain loop in background (detached)
drain Run orchestrator drain loop in foreground (until queue drained)
stop Stop background orchestrator if running
status Show background orchestrator status
Options:
--poll-sec N Poll interval (default: 15)
--no-sync Skip docs/tasks.md -> orchestrator queue sync before run
USAGE
}
cmd="${1:-status}"
if [[ $# -gt 0 ]]; then
shift
fi
poll_sec=15
sync_arg=""
while [[ $# -gt 0 ]]; do
case "$1" in
--poll-sec)
poll_sec="${2:-15}"
shift 2
;;
--no-sync)
sync_arg="--no-sync"
shift
;;
*)
echo "[agent-framework] unknown argument: $1" >&2
usage
exit 1
;;
esac
done
mkdir -p "$ORCH_DIR/logs" "$ORCH_DIR/results"
is_running() {
[[ -f "$PID_FILE" ]] || return 1
local pid
pid="$(cat "$PID_FILE" 2>/dev/null || true)"
[[ -n "$pid" ]] || return 1
kill -0 "$pid" 2>/dev/null
}
case "$cmd" in
start)
if is_running; then
echo "[agent-framework] orchestrator already running (pid=$(cat "$PID_FILE"))"
exit 0
fi
nohup "$MOSAIC_HOME/bin/mosaic-orchestrator-drain" --poll-sec "$poll_sec" $sync_arg >"$LOG_FILE" 2>&1 &
echo "$!" > "$PID_FILE"
echo "[agent-framework] orchestrator started (pid=$!, log=$LOG_FILE)"
;;
drain)
exec "$MOSAIC_HOME/bin/mosaic-orchestrator-drain" --poll-sec "$poll_sec" $sync_arg
;;
stop)
if ! is_running; then
echo "[agent-framework] orchestrator not running"
rm -f "$PID_FILE"
exit 0
fi
pid="$(cat "$PID_FILE")"
kill "$pid" || true
rm -f "$PID_FILE"
echo "[agent-framework] orchestrator stopped (pid=$pid)"
;;
status)
if is_running; then
echo "[agent-framework] orchestrator running (pid=$(cat "$PID_FILE"), log=$LOG_FILE)"
else
echo "[agent-framework] orchestrator not running"
rm -f "$PID_FILE"
fi
;;
*)
usage
exit 1
;;
esac

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env bash
set -euo pipefail
task_file="${1:-}"
if [[ -z "$task_file" || ! -f "$task_file" ]]; then
echo "[orchestrator-worker] missing task file argument" >&2
exit 1
fi
worker_exec="${MOSAIC_WORKER_EXEC:-}"
if [[ -z "$worker_exec" ]]; then
if command -v codex >/dev/null 2>&1; then
worker_exec="codex -p"
elif command -v opencode >/dev/null 2>&1; then
worker_exec="opencode -p"
else
echo "[orchestrator-worker] set MOSAIC_WORKER_EXEC to your worker command (example: 'codex -p' or 'opencode -p')" >&2
exit 1
fi
fi
prompt="$(python3 - "$task_file" <<'PY'
import json
import sys
from pathlib import Path
task = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8"))
task_id = str(task.get("id", "TASK"))
title = str(task.get("title", ""))
description = str(task.get("description", ""))
meta = task.get("metadata", {}) or {}
issue = str(meta.get("issue", ""))
repo = str(meta.get("repo", ""))
branch = str(meta.get("branch", ""))
depends = task.get("depends_on", [])
if isinstance(depends, list):
depends_str = ", ".join(str(x) for x in depends)
else:
depends_str = str(depends)
print(
f"""Read ~/.config/mosaic/STANDARDS.md, then AGENTS.md and SOUL.md (if present).
Complete this queued task fully.
Task ID: {task_id}
Title: {title}
Description: {description}
Issue: {issue}
Repo hint: {repo}
Branch hint: {branch}
Depends on: {depends_str}
Requirements:
- Implement and verify the task end-to-end.
- Keep changes scoped to this task.
- Run project checks and tests relevant to touched code.
- Return with a concise summary of what changed and verification results.
"""
)
PY
)"
PROMPT="$prompt" bash -lc "$worker_exec \"\$PROMPT\""