Compare commits
2 Commits
43ad813e0d
...
df905b7a34
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df905b7a34 | ||
|
|
906620dd54 |
@@ -92,7 +92,7 @@ Goal: Two federated gateways exchange real data over mTLS. Inbound requests pass
|
|||||||
> **Tracking issue:** #462.
|
> **Tracking issue:** #462.
|
||||||
|
|
||||||
| id | status | description | issue | agent | branch | depends_on | estimate | notes |
|
| id | status | description | issue | agent | branch | depends_on | estimate | notes |
|
||||||
| --------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----- | ------ | ------------------------------------ | ---------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
| --------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----- | ------ | ------------------------------------ | --------------------------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| FED-M3-01 | done | `packages/types/src/federation/` — request/response DTOs for `list`, `get`, `capabilities` verbs. Wire-format zod schemas + inferred TS types. Includes `FederationRequest`, `FederationListResponse<T>`, `FederationGetResponse<T>`, `FederationCapabilitiesResponse`, error envelope, `_source` tag. | #462 | sonnet | feat/federation-m3-types | — | 4K | Reusable from gateway server + client + harness. Pure types — no I/O, no NestJS. |
|
| FED-M3-01 | done | `packages/types/src/federation/` — request/response DTOs for `list`, `get`, `capabilities` verbs. Wire-format zod schemas + inferred TS types. Includes `FederationRequest`, `FederationListResponse<T>`, `FederationGetResponse<T>`, `FederationCapabilitiesResponse`, error envelope, `_source` tag. | #462 | sonnet | feat/federation-m3-types | — | 4K | Reusable from gateway server + client + harness. Pure types — no I/O, no NestJS. |
|
||||||
| FED-M3-02 | done | `tools/federation-harness/` scaffold: `docker-compose.two-gateways.yml` (Server A + Server B + step-CA), `seed.ts` (provisions grants, peers, sample tasks/notes/credentials per scope variant), `harness.ts` helper (boots stack, returns typed clients). README documents harness use. | #462 | sonnet | feat/federation-m3-harness | DEPLOY-04 (soft) | 8K | Falls back to local docker-compose if `mos-test-1/-2` not yet redeployed (DEPLOY chain blocked on IMG-FIX). Permanent test infra used by M3+. |
|
| FED-M3-02 | done | `tools/federation-harness/` scaffold: `docker-compose.two-gateways.yml` (Server A + Server B + step-CA), `seed.ts` (provisions grants, peers, sample tasks/notes/credentials per scope variant), `harness.ts` helper (boots stack, returns typed clients). README documents harness use. | #462 | sonnet | feat/federation-m3-harness | DEPLOY-04 (soft) | 8K | Falls back to local docker-compose if `mos-test-1/-2` not yet redeployed (DEPLOY chain blocked on IMG-FIX). Permanent test infra used by M3+. |
|
||||||
| FED-M3-03 | done | `apps/gateway/src/federation/server/federation-auth.guard.ts` (NestJS guard). Validates inbound client cert from Fastify TLS context, extracts `grantId` + `subjectUserId` from custom OIDs, loads grant from DB, asserts `status='active'`, attaches `FederationContext` to request. | #462 | sonnet | feat/federation-m3-auth-guard | M3-01 | 8K | Reuses OID parsing logic mirrored from `ca.service.ts` post-issuance verification. 401 on malformed/missing OIDs; 403 on revoked/expired/missing grant. |
|
| FED-M3-03 | done | `apps/gateway/src/federation/server/federation-auth.guard.ts` (NestJS guard). Validates inbound client cert from Fastify TLS context, extracts `grantId` + `subjectUserId` from custom OIDs, loads grant from DB, asserts `status='active'`, attaches `FederationContext` to request. | #462 | sonnet | feat/federation-m3-auth-guard | M3-01 | 8K | Reuses OID parsing logic mirrored from `ca.service.ts` post-issuance verification. 401 on malformed/missing OIDs; 403 on revoked/expired/missing grant. |
|
||||||
|
|||||||
@@ -45,6 +45,8 @@ import {
|
|||||||
removeAgentFromRoster,
|
removeAgentFromRoster,
|
||||||
resolveFleetPaths,
|
resolveFleetPaths,
|
||||||
resolvePresetFilename,
|
resolvePresetFilename,
|
||||||
|
restartLockPath,
|
||||||
|
RESTART_LOCK_STALE_MS,
|
||||||
RUNTIME_ACCEPTABLE_COMMANDS,
|
RUNTIME_ACCEPTABLE_COMMANDS,
|
||||||
serializeRosterToYaml,
|
serializeRosterToYaml,
|
||||||
VERIFY_DEFAULT_TIMEOUT_MS,
|
VERIFY_DEFAULT_TIMEOUT_MS,
|
||||||
@@ -678,6 +680,144 @@ describe('fleet command construction', () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('waits for an in-flight restart to clear before relaunching (re-entry guard)', async () => {
|
||||||
|
const home = await tempDir();
|
||||||
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
rosterPath,
|
||||||
|
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||||
|
'\n',
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Simulate another `mosaic fleet restart` process mid-teardown: a fresh lock
|
||||||
|
// (recent timestamp, so it is NOT treated as stale) already held.
|
||||||
|
const lockPath = restartLockPath(home);
|
||||||
|
await mkdir(dirname(lockPath), { recursive: true });
|
||||||
|
await writeFile(lockPath, `4242\n${Date.now()}\n`);
|
||||||
|
|
||||||
|
const events: string[] = [];
|
||||||
|
const runner: CommandRunner = async (command, args) => {
|
||||||
|
events.push(`run:${args[args.length - 1]}`);
|
||||||
|
return { stdout: '', stderr: '', exitCode: 0 };
|
||||||
|
};
|
||||||
|
// The injected sleep stands in for time passing while we wait; the in-flight
|
||||||
|
// restart "finishes" (releases its lock) after the first poll.
|
||||||
|
let sleeps = 0;
|
||||||
|
const sleepFn: SleepFn = async () => {
|
||||||
|
sleeps += 1;
|
||||||
|
events.push(`sleep:${sleeps}`);
|
||||||
|
await rm(lockPath, { force: true });
|
||||||
|
};
|
||||||
|
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||||
|
|
||||||
|
try {
|
||||||
|
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||||
|
|
||||||
|
// It must have waited at least once before issuing any systemctl restart.
|
||||||
|
expect(sleeps).toBeGreaterThan(0);
|
||||||
|
const firstSleep = events.findIndex((e) => e.startsWith('sleep:'));
|
||||||
|
const firstRun = events.findIndex((e) => e.startsWith('run:'));
|
||||||
|
expect(firstSleep).toBeGreaterThanOrEqual(0);
|
||||||
|
expect(firstRun).toBeGreaterThan(firstSleep);
|
||||||
|
|
||||||
|
// And it still performs the full restart once the lock clears.
|
||||||
|
expect(events).toContain('run:mosaic-tmux-holder.service');
|
||||||
|
expect(events).toContain('run:mosaic-agent@coder0.service');
|
||||||
|
|
||||||
|
// The lock is released after the restart completes.
|
||||||
|
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||||
|
} finally {
|
||||||
|
await rm(home, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('breaks a stale restart lock and proceeds without waiting', async () => {
|
||||||
|
const home = await tempDir();
|
||||||
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
rosterPath,
|
||||||
|
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||||
|
'\n',
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
// A lock left behind by a crashed owner: timestamp older than the stale window.
|
||||||
|
const lockPath = restartLockPath(home);
|
||||||
|
await mkdir(dirname(lockPath), { recursive: true });
|
||||||
|
await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n`);
|
||||||
|
|
||||||
|
const calls: string[][] = [];
|
||||||
|
const runner: CommandRunner = async (command, args) => {
|
||||||
|
calls.push([command, ...args]);
|
||||||
|
return { stdout: '', stderr: '', exitCode: 0 };
|
||||||
|
};
|
||||||
|
const sleepFn = vi.fn<SleepFn>(async () => {});
|
||||||
|
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||||
|
|
||||||
|
try {
|
||||||
|
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||||
|
|
||||||
|
// Stale lock is broken immediately — no waiting.
|
||||||
|
expect(sleepFn).not.toHaveBeenCalled();
|
||||||
|
expect(calls).toEqual([
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||||
|
]);
|
||||||
|
// The stale lock is gone once the restart completes.
|
||||||
|
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||||
|
} finally {
|
||||||
|
await rm(home, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('releases the restart lock so a subsequent restart is not blocked', async () => {
|
||||||
|
const home = await tempDir();
|
||||||
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
rosterPath,
|
||||||
|
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||||
|
'\n',
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
const calls: string[][] = [];
|
||||||
|
const runner: CommandRunner = async (command, args) => {
|
||||||
|
calls.push([command, ...args]);
|
||||||
|
return { stdout: '', stderr: '', exitCode: 0 };
|
||||||
|
};
|
||||||
|
const sleepFn = vi.fn<SleepFn>(async () => {});
|
||||||
|
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||||
|
|
||||||
|
try {
|
||||||
|
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||||
|
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||||
|
|
||||||
|
// Two sequential restarts both run fully and neither has to wait.
|
||||||
|
expect(sleepFn).not.toHaveBeenCalled();
|
||||||
|
expect(calls).toEqual([
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||||
|
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||||
|
]);
|
||||||
|
} finally {
|
||||||
|
await rm(home, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
|
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
|
||||||
const home = await tempDir();
|
const home = await tempDir();
|
||||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
|||||||
@@ -1,5 +1,14 @@
|
|||||||
import { constants } from 'node:fs';
|
import { constants } from 'node:fs';
|
||||||
import { access, chmod, copyFile, mkdir, readFile, unlink, writeFile } from 'node:fs/promises';
|
import {
|
||||||
|
access,
|
||||||
|
chmod,
|
||||||
|
copyFile,
|
||||||
|
mkdir,
|
||||||
|
open,
|
||||||
|
readFile,
|
||||||
|
unlink,
|
||||||
|
writeFile,
|
||||||
|
} from 'node:fs/promises';
|
||||||
import { homedir, hostname, userInfo } from 'node:os';
|
import { homedir, hostname, userInfo } from 'node:os';
|
||||||
import { dirname, join, resolve } from 'node:path';
|
import { dirname, join, resolve } from 'node:path';
|
||||||
import { fileURLToPath } from 'node:url';
|
import { fileURLToPath } from 'node:url';
|
||||||
@@ -533,6 +542,108 @@ export function buildFleetServiceCommand(action: FleetServiceAction, agentName?:
|
|||||||
return ['systemctl', '--user', action, service];
|
return ['systemctl', '--user', action, service];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Poll interval (ms) while waiting for an in-flight restart's lock to clear. */
|
||||||
|
export const RESTART_LOCK_POLL_INTERVAL_MS = 250;
|
||||||
|
/**
|
||||||
|
* Maximum time (ms) a re-entrant restart waits for the in-flight restart to
|
||||||
|
* finish before it breaks the lock and proceeds anyway. A bound is required so
|
||||||
|
* a crashed holder of the lock can never deadlock the fleet permanently.
|
||||||
|
*/
|
||||||
|
export const RESTART_LOCK_MAX_WAIT_MS = 30_000;
|
||||||
|
/**
|
||||||
|
* Age (ms) past which a restart lock is treated as stale (its owner died
|
||||||
|
* without releasing it) and is broken immediately rather than waited on.
|
||||||
|
*/
|
||||||
|
export const RESTART_LOCK_STALE_MS = 60_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolves the path of the cross-process restart lock for a given Mosaic home.
|
||||||
|
* Kept strictly under `<mosaicHome>/fleet/run` (not the heartbeat env override)
|
||||||
|
* so the lock is scoped to the same fleet the restart acts on.
|
||||||
|
*/
|
||||||
|
export function restartLockPath(mosaicHome: string): string {
|
||||||
|
return join(mosaicHome, 'fleet', 'run', 'restart.lock');
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A held restart lock; `release()` removes the lock file (idempotent). */
|
||||||
|
interface RestartGuard {
|
||||||
|
release(): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true when an existing lock file is stale: older than
|
||||||
|
* RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially
|
||||||
|
* written lock left by a crashed owner). A vanished lock (ENOENT) is not stale —
|
||||||
|
* the next acquire attempt will simply succeed.
|
||||||
|
*/
|
||||||
|
async function isRestartLockStale(lockPath: string, now: number): Promise<boolean> {
|
||||||
|
let raw: string;
|
||||||
|
try {
|
||||||
|
raw = await readFile(lockPath, 'utf8');
|
||||||
|
} catch (err) {
|
||||||
|
if ((err as NodeJS.ErrnoException).code === 'ENOENT') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const stampLine = raw.split('\n')[1] ?? '';
|
||||||
|
const stamp = Number.parseInt(stampLine.trim(), 10);
|
||||||
|
if (!Number.isFinite(stamp)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return now - stamp >= RESTART_LOCK_STALE_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart`
|
||||||
|
* invocations across processes. Each restart tears the tmux holder (and the
|
||||||
|
* agent sessions inside it) down and back up; without this guard a re-entrant
|
||||||
|
* restart relaunches agents against a half-torn-down holder, which fails and
|
||||||
|
* tight-loops. A re-entrant caller waits for the in-flight restart to release
|
||||||
|
* the lock (clean shutdown settled) before proceeding, breaks a stale lock left
|
||||||
|
* by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to
|
||||||
|
* avoid a permanent deadlock.
|
||||||
|
*/
|
||||||
|
async function acquireRestartLock(mosaicHome: string, sleepFn: SleepFn): Promise<RestartGuard> {
|
||||||
|
const lockPath = restartLockPath(mosaicHome);
|
||||||
|
await mkdir(dirname(lockPath), { recursive: true });
|
||||||
|
const release = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
await unlink(lockPath);
|
||||||
|
} catch {
|
||||||
|
// Already gone (broken as stale by another waiter, or never written) — fine.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS;
|
||||||
|
for (;;) {
|
||||||
|
try {
|
||||||
|
const handle = await open(lockPath, 'wx');
|
||||||
|
await handle.writeFile(`${process.pid}\n${Date.now()}\n`);
|
||||||
|
await handle.close();
|
||||||
|
return { release };
|
||||||
|
} catch (err) {
|
||||||
|
if ((err as NodeJS.ErrnoException).code !== 'EEXIST') {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
// A restart is already in flight (or its lock was left behind).
|
||||||
|
if (await isRestartLockStale(lockPath, Date.now())) {
|
||||||
|
process.stderr.write('Breaking stale fleet restart lock and proceeding.\n');
|
||||||
|
await release();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (Date.now() >= deadline) {
|
||||||
|
process.stderr.write(
|
||||||
|
`Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` +
|
||||||
|
'restart; breaking the lock and proceeding.\n',
|
||||||
|
);
|
||||||
|
await release();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the systemctl --user enable command for a given unit.
|
* Returns the systemctl --user enable command for a given unit.
|
||||||
* Used by the install auto-enable step to persist units across reboots.
|
* Used by the install auto-enable step to persist units across reboots.
|
||||||
@@ -1172,6 +1283,7 @@ export function isSendAccepted(capturedOutput: string): SendVerifyResult {
|
|||||||
|
|
||||||
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
|
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
|
||||||
const runner = deps.runner ?? runCommand;
|
const runner = deps.runner ?? runCommand;
|
||||||
|
const sleepFn = deps.sleepFn ?? defaultSleep;
|
||||||
const paths = resolveFleetPaths(deps.mosaicHome);
|
const paths = resolveFleetPaths(deps.mosaicHome);
|
||||||
const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot();
|
const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot();
|
||||||
|
|
||||||
@@ -1285,9 +1397,22 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
|
|||||||
.command(`${action} [agent]`)
|
.command(`${action} [agent]`)
|
||||||
.description(`${action} the fleet holder or one agent`)
|
.description(`${action} the fleet holder or one agent`)
|
||||||
.action(async (agent?: string) => {
|
.action(async (agent?: string) => {
|
||||||
|
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
|
||||||
|
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
|
||||||
const roster = await loadRosterForCommand(cmd);
|
const roster = await loadRosterForCommand(cmd);
|
||||||
if (agent) {
|
if (agent) {
|
||||||
getRosterAgent(roster, agent);
|
getRosterAgent(roster, agent);
|
||||||
|
// Single-agent restart is guarded too: it can race a full restart that
|
||||||
|
// is tearing the shared holder down.
|
||||||
|
if (action === 'restart') {
|
||||||
|
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
|
||||||
|
try {
|
||||||
|
await runChecked(runner, buildFleetServiceCommand(action, agent));
|
||||||
|
} finally {
|
||||||
|
await guard.release();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
await runChecked(runner, buildFleetServiceCommand(action, agent));
|
await runChecked(runner, buildFleetServiceCommand(action, agent));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1298,6 +1423,21 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
|
|||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (action === 'restart') {
|
||||||
|
// Serialize the holder+agents teardown/relaunch behind the restart lock
|
||||||
|
// so a re-entrant restart waits for clean shutdown before relaunching,
|
||||||
|
// instead of racing a half-torn-down holder into a tight loop.
|
||||||
|
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
|
||||||
|
try {
|
||||||
|
await runChecked(runner, buildFleetServiceCommand(action));
|
||||||
|
for (const rosterAgent of roster.agents) {
|
||||||
|
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
await guard.release();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
await runChecked(runner, buildFleetServiceCommand(action));
|
await runChecked(runner, buildFleetServiceCommand(action));
|
||||||
for (const rosterAgent of roster.agents) {
|
for (const rosterAgent of roster.agents) {
|
||||||
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
|
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
|
||||||
|
|||||||
Reference in New Issue
Block a user