Merge pull request 'feat: finalize orchestrator observability and mosaic rails integration' (#422) from feature/mosaic-stack-finalization into develop
Some checks failed
ci/woodpecker/push/web Pipeline failed
ci/woodpecker/push/orchestrator Pipeline was successful

Reviewed-on: #422
This commit was merged in pull request #422.
This commit is contained in:
2026-02-17 22:24:01 +00:00
49 changed files with 1804 additions and 98 deletions

View File

@@ -381,6 +381,17 @@ ELEMENT_IMAGE_TAG=latest
# Health endpoints (/health/*) remain unauthenticated # Health endpoints (/health/*) remain unauthenticated
ORCHESTRATOR_API_KEY=REPLACE_WITH_RANDOM_API_KEY_MINIMUM_32_CHARS ORCHESTRATOR_API_KEY=REPLACE_WITH_RANDOM_API_KEY_MINIMUM_32_CHARS
# Runtime safety defaults (recommended for low-memory hosts)
MAX_CONCURRENT_AGENTS=2
SESSION_CLEANUP_DELAY_MS=30000
ORCHESTRATOR_QUEUE_NAME=orchestrator-tasks
ORCHESTRATOR_QUEUE_CONCURRENCY=1
ORCHESTRATOR_QUEUE_MAX_RETRIES=3
ORCHESTRATOR_QUEUE_BASE_DELAY_MS=1000
ORCHESTRATOR_QUEUE_MAX_DELAY_MS=60000
SANDBOX_DEFAULT_MEMORY_MB=256
SANDBOX_DEFAULT_CPU_LIMIT=1.0
# ====================== # ======================
# AI Provider Configuration # AI Provider Configuration
# ====================== # ======================

10
.gitignore vendored
View File

@@ -59,3 +59,13 @@ yarn-error.log*
# Orchestrator reports (generated by QA automation, cleaned up after processing) # Orchestrator reports (generated by QA automation, cleaned up after processing)
docs/reports/qa-automation/ docs/reports/qa-automation/
# Repo-local orchestrator runtime artifacts
.mosaic/orchestrator/orchestrator.pid
.mosaic/orchestrator/state.json
.mosaic/orchestrator/tasks.json
.mosaic/orchestrator/matrix_state.json
.mosaic/orchestrator/logs/*.log
.mosaic/orchestrator/results/*
!.mosaic/orchestrator/logs/.gitkeep
!.mosaic/orchestrator/results/.gitkeep

View File

@@ -4,12 +4,12 @@ This repository is attached to the machine-wide Mosaic framework.
## Load Order for Agents ## Load Order for Agents
1. `~/.mosaic/STANDARDS.md` 1. `~/.config/mosaic/STANDARDS.md`
2. `AGENTS.md` (this repository) 2. `AGENTS.md` (this repository)
3. `.mosaic/repo-hooks.sh` (repo-specific automation hooks) 3. `.mosaic/repo-hooks.sh` (repo-specific automation hooks)
## Purpose ## Purpose
- Keep universal standards in `~/.mosaic` - Keep universal standards in `~/.config/mosaic`
- Keep repo-specific behavior in this repo - Keep repo-specific behavior in this repo
- Avoid copying large runtime configs into each project - Avoid copying large runtime configs into each project

View File

@@ -0,0 +1,18 @@
{
"enabled": true,
"transport": "matrix",
"matrix": {
"control_room_id": "",
"workspace_id": "",
"homeserver_url": "",
"access_token": "",
"bot_user_id": ""
},
"worker": {
"runtime": "codex",
"command_template": "bash scripts/agent/orchestrator-worker.sh {task_file}",
"timeout_seconds": 7200,
"max_attempts": 1
},
"quality_gates": ["pnpm lint", "pnpm typecheck", "pnpm test"]
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

10
.mosaic/quality-rails.yml Normal file
View File

@@ -0,0 +1,10 @@
enabled: false
template: ""
# Set enabled: true and choose one template:
# - typescript-node
# - typescript-nextjs
# - monorepo
#
# Apply manually:
# ~/.config/mosaic/bin/mosaic-quality-apply --template <template> --target <repo>

View File

@@ -25,6 +25,8 @@ Optional:
```bash ```bash
bash scripts/agent/log-limitation.sh "Short Name" bash scripts/agent/log-limitation.sh "Short Name"
bash scripts/agent/orchestrator-daemon.sh status
bash scripts/agent/orchestrator-events.sh recent --limit 50
``` ```
## Repo Context ## Repo Context

View File

@@ -7,7 +7,7 @@ Authoritative repo guidance is in `AGENTS.md`.
Load order for Claude sessions: Load order for Claude sessions:
1. `SOUL.md` 1. `SOUL.md`
2. `~/.mosaic/STANDARDS.md` 2. `~/.config/mosaic/STANDARDS.md`
3. `AGENTS.md` 3. `AGENTS.md`
4. `.mosaic/repo-hooks.sh` 4. `.mosaic/repo-hooks.sh`

View File

@@ -10,7 +10,7 @@ You are Jarvis for the Mosaic Stack repository, running on the current agent run
- Be calm and clear: keep responses concise, chunked, and PDA-friendly. - Be calm and clear: keep responses concise, chunked, and PDA-friendly.
- Respect canonical sources: - Respect canonical sources:
- Repo operations and conventions: `AGENTS.md` - Repo operations and conventions: `AGENTS.md`
- Machine-wide rails: `~/.mosaic/STANDARDS.md` - Machine-wide rails: `~/.config/mosaic/STANDARDS.md`
- Repo lifecycle hooks: `.mosaic/repo-hooks.sh` - Repo lifecycle hooks: `.mosaic/repo-hooks.sh`
## Guardrails ## Guardrails

View File

@@ -45,12 +45,22 @@ Monitored via `apps/web/` (Agent Dashboard).
### Agents ### Agents
| Method | Path | Description | | Method | Path | Description |
| ------ | ------------------------- | ---------------------- | | ------ | ------------------------- | ------------------------- |
| POST | `/agents/spawn` | Spawn a new agent | | POST | `/agents/spawn` | Spawn a new agent |
| GET | `/agents/:agentId/status` | Get agent status | | GET | `/agents/:agentId/status` | Get agent status |
| POST | `/agents/:agentId/kill` | Kill a single agent | | POST | `/agents/:agentId/kill` | Kill a single agent |
| POST | `/agents/kill-all` | Kill all active agents | | POST | `/agents/kill-all` | Kill all active agents |
| GET | `/agents/events` | SSE lifecycle/task events |
| GET | `/agents/events/recent` | Recent events (polling) |
### Queue
| Method | Path | Description |
| ------ | --------------- | ---------------------------- |
| GET | `/queue/stats` | Queue depth and worker stats |
| POST | `/queue/pause` | Pause queue processing |
| POST | `/queue/resume` | Resume queue processing |
#### POST /agents/spawn #### POST /agents/spawn
@@ -176,14 +186,17 @@ pnpm --filter @mosaic/orchestrator lint
Environment variables loaded via `@nestjs/config`. Key variables: Environment variables loaded via `@nestjs/config`. Key variables:
| Variable | Description | | Variable | Description |
| ------------------- | -------------------------------------- | | -------------------------------- | -------------------------------------------------- |
| `ORCHESTRATOR_PORT` | HTTP port (default: 3001) | | `ORCHESTRATOR_PORT` | HTTP port (default: 3001) |
| `CLAUDE_API_KEY` | Claude API key for agents | | `CLAUDE_API_KEY` | Claude API key for agents |
| `VALKEY_HOST` | Valkey/Redis host (default: localhost) | | `VALKEY_HOST` | Valkey/Redis host (default: localhost) |
| `VALKEY_PORT` | Valkey/Redis port (default: 6379) | | `VALKEY_PORT` | Valkey/Redis port (default: 6379) |
| `COORDINATOR_URL` | Quality Coordinator base URL | | `COORDINATOR_URL` | Quality Coordinator base URL |
| `SANDBOX_ENABLED` | Enable Docker sandbox (true/false) | | `SANDBOX_ENABLED` | Enable Docker sandbox (true/false) |
| `MAX_CONCURRENT_AGENTS` | Maximum concurrent in-memory sessions (default: 2) |
| `ORCHESTRATOR_QUEUE_CONCURRENCY` | BullMQ worker concurrency (default: 1) |
| `SANDBOX_DEFAULT_MEMORY_MB` | Sandbox memory limit in MB (default: 256) |
## Related Documentation ## Related Documentation

View File

@@ -0,0 +1,89 @@
import { Injectable, Logger, OnModuleInit } from "@nestjs/common";
import { randomUUID } from "crypto";
import { ValkeyService } from "../../valkey/valkey.service";
import type { EventHandler, OrchestratorEvent } from "../../valkey/types";
type UnsubscribeFn = () => void;
const MAX_RECENT_EVENTS = 500;
@Injectable()
export class AgentEventsService implements OnModuleInit {
private readonly logger = new Logger(AgentEventsService.name);
private readonly subscribers = new Map<string, EventHandler>();
private readonly recentEvents: OrchestratorEvent[] = [];
private connected = false;
constructor(private readonly valkeyService: ValkeyService) {}
async onModuleInit(): Promise<void> {
if (this.connected) return;
await this.valkeyService.subscribeToEvents(
(event) => {
this.appendRecentEvent(event);
this.subscribers.forEach((handler) => {
void handler(event);
});
},
(error, _raw, channel) => {
this.logger.warn(`Event stream parse/validation warning on ${channel}: ${error.message}`);
}
);
this.connected = true;
this.logger.log("Agent event stream subscription active");
}
subscribe(handler: EventHandler): UnsubscribeFn {
const id = randomUUID();
this.subscribers.set(id, handler);
return () => {
this.subscribers.delete(id);
};
}
async getInitialSnapshot(): Promise<{
type: "stream.snapshot";
timestamp: string;
agents: number;
tasks: number;
}> {
const [agents, tasks] = await Promise.all([
this.valkeyService.listAgents(),
this.valkeyService.listTasks(),
]);
return {
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: agents.length,
tasks: tasks.length,
};
}
createHeartbeat(): OrchestratorEvent {
return {
type: "task.processing",
timestamp: new Date().toISOString(),
data: {
heartbeat: true,
},
};
}
getRecentEvents(limit = 100): OrchestratorEvent[] {
const safeLimit = Math.min(Math.max(Math.floor(limit), 1), MAX_RECENT_EVENTS);
if (safeLimit >= this.recentEvents.length) {
return [...this.recentEvents];
}
return this.recentEvents.slice(-safeLimit);
}
private appendRecentEvent(event: OrchestratorEvent): void {
this.recentEvents.push(event);
if (this.recentEvents.length > MAX_RECENT_EVENTS) {
this.recentEvents.shift();
}
}
}

View File

@@ -4,6 +4,7 @@ import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service"; import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import type { KillAllResult } from "../../killswitch/killswitch.service"; import type { KillAllResult } from "../../killswitch/killswitch.service";
describe("AgentsController - Killswitch Endpoints", () => { describe("AgentsController - Killswitch Endpoints", () => {
@@ -20,6 +21,12 @@ describe("AgentsController - Killswitch Endpoints", () => {
}; };
let mockLifecycleService: { let mockLifecycleService: {
getAgentLifecycleState: ReturnType<typeof vi.fn>; getAgentLifecycleState: ReturnType<typeof vi.fn>;
registerSpawnedAgent: ReturnType<typeof vi.fn>;
};
let mockEventsService: {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
}; };
beforeEach(() => { beforeEach(() => {
@@ -38,13 +45,30 @@ describe("AgentsController - Killswitch Endpoints", () => {
mockLifecycleService = { mockLifecycleService = {
getAgentLifecycleState: vi.fn(), getAgentLifecycleState: vi.fn(),
registerSpawnedAgent: vi.fn(),
};
mockEventsService = {
subscribe: vi.fn().mockReturnValue(() => {}),
getInitialSnapshot: vi.fn().mockResolvedValue({
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: 0,
tasks: 0,
}),
createHeartbeat: vi.fn().mockReturnValue({
type: "task.processing",
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
}; };
controller = new AgentsController( controller = new AgentsController(
mockQueueService as unknown as QueueService, mockQueueService as unknown as QueueService,
mockSpawnerService as unknown as AgentSpawnerService, mockSpawnerService as unknown as AgentSpawnerService,
mockLifecycleService as unknown as AgentLifecycleService, mockLifecycleService as unknown as AgentLifecycleService,
mockKillswitchService as unknown as KillswitchService mockKillswitchService as unknown as KillswitchService,
mockEventsService as unknown as AgentEventsService
); );
}); });

View File

@@ -3,6 +3,7 @@ import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service"; import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
import { KillswitchService } from "../../killswitch/killswitch.service"; import { KillswitchService } from "../../killswitch/killswitch.service";
import { AgentEventsService } from "./agent-events.service";
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
describe("AgentsController", () => { describe("AgentsController", () => {
@@ -17,11 +18,18 @@ describe("AgentsController", () => {
}; };
let lifecycleService: { let lifecycleService: {
getAgentLifecycleState: ReturnType<typeof vi.fn>; getAgentLifecycleState: ReturnType<typeof vi.fn>;
registerSpawnedAgent: ReturnType<typeof vi.fn>;
}; };
let killswitchService: { let killswitchService: {
killAgent: ReturnType<typeof vi.fn>; killAgent: ReturnType<typeof vi.fn>;
killAllAgents: ReturnType<typeof vi.fn>; killAllAgents: ReturnType<typeof vi.fn>;
}; };
let eventsService: {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
beforeEach(() => { beforeEach(() => {
// Create mock services // Create mock services
@@ -37,6 +45,7 @@ describe("AgentsController", () => {
lifecycleService = { lifecycleService = {
getAgentLifecycleState: vi.fn(), getAgentLifecycleState: vi.fn(),
registerSpawnedAgent: vi.fn().mockResolvedValue(undefined),
}; };
killswitchService = { killswitchService = {
@@ -44,12 +53,29 @@ describe("AgentsController", () => {
killAllAgents: vi.fn(), killAllAgents: vi.fn(),
}; };
eventsService = {
subscribe: vi.fn().mockReturnValue(() => {}),
getInitialSnapshot: vi.fn().mockResolvedValue({
type: "stream.snapshot",
timestamp: new Date().toISOString(),
agents: 0,
tasks: 0,
}),
createHeartbeat: vi.fn().mockReturnValue({
type: "task.processing",
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
// Create controller with mocked services // Create controller with mocked services
controller = new AgentsController( controller = new AgentsController(
queueService as unknown as QueueService, queueService as unknown as QueueService,
spawnerService as unknown as AgentSpawnerService, spawnerService as unknown as AgentSpawnerService,
lifecycleService as unknown as AgentLifecycleService, lifecycleService as unknown as AgentLifecycleService,
killswitchService as unknown as KillswitchService killswitchService as unknown as KillswitchService,
eventsService as unknown as AgentEventsService
); );
}); });
@@ -195,6 +221,10 @@ describe("AgentsController", () => {
expect(queueService.addTask).toHaveBeenCalledWith(validRequest.taskId, validRequest.context, { expect(queueService.addTask).toHaveBeenCalledWith(validRequest.taskId, validRequest.context, {
priority: 5, priority: 5,
}); });
expect(lifecycleService.registerSpawnedAgent).toHaveBeenCalledWith(
agentId,
validRequest.taskId
);
expect(result).toEqual({ expect(result).toEqual({
agentId, agentId,
status: "spawning", status: "spawning",
@@ -334,4 +364,39 @@ describe("AgentsController", () => {
}); });
}); });
}); });
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([
{
type: "task.completed",
timestamp: "2026-02-17T15:00:00.000Z",
taskId: "task-123",
},
]);
const result = controller.getRecentEvents();
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100);
expect(result).toEqual({
events: [
{
type: "task.completed",
timestamp: "2026-02-17T15:00:00.000Z",
taskId: "task-123",
},
],
});
});
it("should parse and pass custom limit", () => {
controller.getRecentEvents("25");
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(25);
});
it("should fallback to default when limit is invalid", () => {
controller.getRecentEvents("invalid");
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100);
});
});
}); });

View File

@@ -11,8 +11,12 @@ import {
HttpCode, HttpCode,
UseGuards, UseGuards,
ParseUUIDPipe, ParseUUIDPipe,
Sse,
MessageEvent,
Query,
} from "@nestjs/common"; } from "@nestjs/common";
import { Throttle } from "@nestjs/throttler"; import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
import { QueueService } from "../../queue/queue.service"; import { QueueService } from "../../queue/queue.service";
import { AgentSpawnerService } from "../../spawner/agent-spawner.service"; import { AgentSpawnerService } from "../../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service"; import { AgentLifecycleService } from "../../spawner/agent-lifecycle.service";
@@ -20,6 +24,7 @@ import { KillswitchService } from "../../killswitch/killswitch.service";
import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto"; import { SpawnAgentDto, SpawnAgentResponseDto } from "./dto/spawn-agent.dto";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard"; import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
import { AgentEventsService } from "./agent-events.service";
/** /**
* Controller for agent management endpoints * Controller for agent management endpoints
@@ -41,7 +46,8 @@ export class AgentsController {
private readonly queueService: QueueService, private readonly queueService: QueueService,
private readonly spawnerService: AgentSpawnerService, private readonly spawnerService: AgentSpawnerService,
private readonly lifecycleService: AgentLifecycleService, private readonly lifecycleService: AgentLifecycleService,
private readonly killswitchService: KillswitchService private readonly killswitchService: KillswitchService,
private readonly eventsService: AgentEventsService
) {} ) {}
/** /**
@@ -67,6 +73,9 @@ export class AgentsController {
context: dto.context, context: dto.context,
}); });
// Persist initial lifecycle state in Valkey.
await this.lifecycleService.registerSpawnedAgent(spawnResponse.agentId, dto.taskId);
// Queue task in Valkey // Queue task in Valkey
await this.queueService.addTask(dto.taskId, dto.context, { await this.queueService.addTask(dto.taskId, dto.context, {
priority: 5, // Default priority priority: 5, // Default priority
@@ -85,6 +94,55 @@ export class AgentsController {
} }
} }
/**
* Stream orchestrator events as server-sent events (SSE)
*/
@Sse("events")
@Throttle({ status: { limit: 200, ttl: 60000 } })
streamEvents(): Observable<MessageEvent> {
return new Observable<MessageEvent>((subscriber) => {
let isClosed = false;
const unsubscribe = this.eventsService.subscribe((event) => {
if (!isClosed) {
subscriber.next({ data: event });
}
});
void this.eventsService.getInitialSnapshot().then((snapshot) => {
if (!isClosed) {
subscriber.next({ data: snapshot });
}
});
const heartbeat = setInterval(() => {
if (!isClosed) {
subscriber.next({ data: this.eventsService.createHeartbeat() });
}
}, 15000);
return () => {
isClosed = true;
clearInterval(heartbeat);
unsubscribe();
};
});
}
/**
* Return recent orchestrator events for non-streaming consumers.
*/
@Get("events/recent")
@Throttle({ status: { limit: 200, ttl: 60000 } })
getRecentEvents(@Query("limit") limit?: string): {
events: ReturnType<AgentEventsService["getRecentEvents"]>;
} {
const parsedLimit = Number.parseInt(limit ?? "100", 10);
return {
events: this.eventsService.getRecentEvents(Number.isNaN(parsedLimit) ? 100 : parsedLimit),
};
}
/** /**
* List all agents * List all agents
* @returns Array of all agent sessions with their status * @returns Array of all agent sessions with their status

View File

@@ -5,10 +5,11 @@ import { SpawnerModule } from "../../spawner/spawner.module";
import { KillswitchModule } from "../../killswitch/killswitch.module"; import { KillswitchModule } from "../../killswitch/killswitch.module";
import { ValkeyModule } from "../../valkey/valkey.module"; import { ValkeyModule } from "../../valkey/valkey.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard"; import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { AgentEventsService } from "./agent-events.service";
@Module({ @Module({
imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule], imports: [QueueModule, SpawnerModule, KillswitchModule, ValkeyModule],
controllers: [AgentsController], controllers: [AgentsController],
providers: [OrchestratorApiKeyGuard], providers: [OrchestratorApiKeyGuard, AgentEventsService],
}) })
export class AgentsModule {} export class AgentsModule {}

View File

@@ -0,0 +1,11 @@
import { Module } from "@nestjs/common";
import { QueueController } from "./queue.controller";
import { QueueModule } from "../../queue/queue.module";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
@Module({
imports: [QueueModule],
controllers: [QueueController],
providers: [OrchestratorApiKeyGuard],
})
export class QueueApiModule {}

View File

@@ -0,0 +1,65 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { QueueController } from "./queue.controller";
import { QueueService } from "../../queue/queue.service";
describe("QueueController", () => {
let controller: QueueController;
let queueService: {
getStats: ReturnType<typeof vi.fn>;
pause: ReturnType<typeof vi.fn>;
resume: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
queueService = {
getStats: vi.fn(),
pause: vi.fn(),
resume: vi.fn(),
};
controller = new QueueController(queueService as unknown as QueueService);
});
afterEach(() => {
vi.clearAllMocks();
});
it("should return queue stats", async () => {
queueService.getStats.mockResolvedValue({
pending: 5,
active: 1,
completed: 10,
failed: 2,
delayed: 0,
});
const result = await controller.getStats();
expect(queueService.getStats).toHaveBeenCalledOnce();
expect(result).toEqual({
pending: 5,
active: 1,
completed: 10,
failed: 2,
delayed: 0,
});
});
it("should pause queue processing", async () => {
queueService.pause.mockResolvedValue(undefined);
const result = await controller.pause();
expect(queueService.pause).toHaveBeenCalledOnce();
expect(result).toEqual({ message: "Queue processing paused" });
});
it("should resume queue processing", async () => {
queueService.resume.mockResolvedValue(undefined);
const result = await controller.resume();
expect(queueService.resume).toHaveBeenCalledOnce();
expect(result).toEqual({ message: "Queue processing resumed" });
});
});

View File

@@ -0,0 +1,39 @@
import { Controller, Get, HttpCode, Post, UseGuards } from "@nestjs/common";
import { Throttle } from "@nestjs/throttler";
import { QueueService } from "../../queue/queue.service";
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
import { OrchestratorThrottlerGuard } from "../../common/guards/throttler.guard";
@Controller("queue")
@UseGuards(OrchestratorApiKeyGuard, OrchestratorThrottlerGuard)
export class QueueController {
constructor(private readonly queueService: QueueService) {}
@Get("stats")
@Throttle({ status: { limit: 200, ttl: 60000 } })
async getStats(): Promise<{
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}> {
return this.queueService.getStats();
}
@Post("pause")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async pause(): Promise<{ message: string }> {
await this.queueService.pause();
return { message: "Queue processing paused" };
}
@Post("resume")
@Throttle({ strict: { limit: 10, ttl: 60000 } })
@HttpCode(200)
async resume(): Promise<{ message: string }> {
await this.queueService.resume();
return { message: "Queue processing resumed" };
}
}

View File

@@ -4,6 +4,7 @@ import { BullModule } from "@nestjs/bullmq";
import { ThrottlerModule } from "@nestjs/throttler"; import { ThrottlerModule } from "@nestjs/throttler";
import { HealthModule } from "./api/health/health.module"; import { HealthModule } from "./api/health/health.module";
import { AgentsModule } from "./api/agents/agents.module"; import { AgentsModule } from "./api/agents/agents.module";
import { QueueApiModule } from "./api/queue/queue-api.module";
import { CoordinatorModule } from "./coordinator/coordinator.module"; import { CoordinatorModule } from "./coordinator/coordinator.module";
import { BudgetModule } from "./budget/budget.module"; import { BudgetModule } from "./budget/budget.module";
import { CIModule } from "./ci"; import { CIModule } from "./ci";
@@ -46,6 +47,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
]), ]),
HealthModule, HealthModule,
AgentsModule, AgentsModule,
QueueApiModule,
CoordinatorModule, CoordinatorModule,
BudgetModule, BudgetModule,
CIModule, CIModule,

View File

@@ -157,12 +157,12 @@ describe("orchestratorConfig", () => {
}); });
describe("spawner config", () => { describe("spawner config", () => {
it("should use default maxConcurrentAgents of 20 when not set", () => { it("should use default maxConcurrentAgents of 2 when not set", () => {
delete process.env.MAX_CONCURRENT_AGENTS; delete process.env.MAX_CONCURRENT_AGENTS;
const config = orchestratorConfig(); const config = orchestratorConfig();
expect(config.spawner.maxConcurrentAgents).toBe(20); expect(config.spawner.maxConcurrentAgents).toBe(2);
}); });
it("should use provided maxConcurrentAgents when MAX_CONCURRENT_AGENTS is set", () => { it("should use provided maxConcurrentAgents when MAX_CONCURRENT_AGENTS is set", () => {

View File

@@ -27,7 +27,7 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({
sandbox: { sandbox: {
enabled: process.env.SANDBOX_ENABLED !== "false", enabled: process.env.SANDBOX_ENABLED !== "false",
defaultImage: process.env.SANDBOX_DEFAULT_IMAGE ?? "node:20-alpine", defaultImage: process.env.SANDBOX_DEFAULT_IMAGE ?? "node:20-alpine",
defaultMemoryMB: parseInt(process.env.SANDBOX_DEFAULT_MEMORY_MB ?? "512", 10), defaultMemoryMB: parseInt(process.env.SANDBOX_DEFAULT_MEMORY_MB ?? "256", 10),
defaultCpuLimit: parseFloat(process.env.SANDBOX_DEFAULT_CPU_LIMIT ?? "1.0"), defaultCpuLimit: parseFloat(process.env.SANDBOX_DEFAULT_CPU_LIMIT ?? "1.0"),
networkMode: process.env.SANDBOX_NETWORK_MODE ?? "none", networkMode: process.env.SANDBOX_NETWORK_MODE ?? "none",
}, },
@@ -41,9 +41,15 @@ export const orchestratorConfig = registerAs("orchestrator", () => ({
enabled: process.env.YOLO_MODE === "true", enabled: process.env.YOLO_MODE === "true",
}, },
spawner: { spawner: {
maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "20", 10), maxConcurrentAgents: parseInt(process.env.MAX_CONCURRENT_AGENTS ?? "2", 10),
sessionCleanupDelayMs: parseInt(process.env.SESSION_CLEANUP_DELAY_MS ?? "30000", 10),
}, },
queue: { queue: {
name: process.env.ORCHESTRATOR_QUEUE_NAME ?? "orchestrator-tasks",
maxRetries: parseInt(process.env.ORCHESTRATOR_QUEUE_MAX_RETRIES ?? "3", 10),
baseDelay: parseInt(process.env.ORCHESTRATOR_QUEUE_BASE_DELAY_MS ?? "1000", 10),
maxDelay: parseInt(process.env.ORCHESTRATOR_QUEUE_MAX_DELAY_MS ?? "60000", 10),
concurrency: parseInt(process.env.ORCHESTRATOR_QUEUE_CONCURRENCY ?? "1", 10),
completedRetentionCount: parseInt(process.env.QUEUE_COMPLETED_RETENTION_COUNT ?? "100", 10), completedRetentionCount: parseInt(process.env.QUEUE_COMPLETED_RETENTION_COUNT ?? "100", 10),
completedRetentionAgeSeconds: parseInt( completedRetentionAgeSeconds: parseInt(
process.env.QUEUE_COMPLETED_RETENTION_AGE_S ?? "3600", process.env.QUEUE_COMPLETED_RETENTION_AGE_S ?? "3600",

View File

@@ -2,9 +2,10 @@ import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config"; import { ConfigModule } from "@nestjs/config";
import { QueueService } from "./queue.service"; import { QueueService } from "./queue.service";
import { ValkeyModule } from "../valkey/valkey.module"; import { ValkeyModule } from "../valkey/valkey.module";
import { SpawnerModule } from "../spawner/spawner.module";
@Module({ @Module({
imports: [ConfigModule, ValkeyModule], imports: [ConfigModule, ValkeyModule, SpawnerModule],
providers: [QueueService], providers: [QueueService],
exports: [QueueService], exports: [QueueService],
}) })

View File

@@ -991,12 +991,17 @@ describe("QueueService", () => {
success: true, success: true,
metadata: { attempt: 1 }, metadata: { attempt: 1 },
}); });
expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith("task-123", "executing"); expect(mockValkeyService.updateTaskStatus).toHaveBeenCalledWith(
"task-123",
"executing",
undefined
);
expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({ expect(mockValkeyService.publishEvent).toHaveBeenCalledWith({
type: "task.processing", type: "task.executing",
timestamp: expect.any(String), timestamp: expect.any(String),
taskId: "task-123", taskId: "task-123",
data: { attempt: 1 }, agentId: undefined,
data: { attempt: 1, dispatchedByQueue: true },
}); });
}); });

View File

@@ -1,7 +1,9 @@
import { Injectable, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; import { Injectable, OnModuleDestroy, OnModuleInit, Optional, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import { Queue, Worker, Job } from "bullmq"; import { Queue, Worker, Job } from "bullmq";
import { ValkeyService } from "../valkey/valkey.service"; import { ValkeyService } from "../valkey/valkey.service";
import { AgentSpawnerService } from "../spawner/agent-spawner.service";
import { AgentLifecycleService } from "../spawner/agent-lifecycle.service";
import type { TaskContext } from "../valkey/types"; import type { TaskContext } from "../valkey/types";
import type { import type {
QueuedTask, QueuedTask,
@@ -16,6 +18,7 @@ import type {
*/ */
@Injectable() @Injectable()
export class QueueService implements OnModuleInit, OnModuleDestroy { export class QueueService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(QueueService.name);
private queue!: Queue<QueuedTask>; private queue!: Queue<QueuedTask>;
private worker!: Worker<QueuedTask, TaskProcessingResult>; private worker!: Worker<QueuedTask, TaskProcessingResult>;
private readonly queueName: string; private readonly queueName: string;
@@ -23,7 +26,9 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
constructor( constructor(
private readonly valkeyService: ValkeyService, private readonly valkeyService: ValkeyService,
private readonly configService: ConfigService private readonly configService: ConfigService,
@Optional() private readonly spawnerService?: AgentSpawnerService,
@Optional() private readonly lifecycleService?: AgentLifecycleService
) { ) {
this.queueName = this.configService.get<string>( this.queueName = this.configService.get<string>(
"orchestrator.queue.name", "orchestrator.queue.name",
@@ -132,6 +137,16 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
context, context,
}; };
// Ensure task state exists before queue lifecycle updates.
const getTaskState = (this.valkeyService as Partial<ValkeyService>).getTaskState;
const createTask = (this.valkeyService as Partial<ValkeyService>).createTask;
if (typeof getTaskState === "function" && typeof createTask === "function") {
const existingTask = await getTaskState.call(this.valkeyService, taskId);
if (!existingTask) {
await createTask.call(this.valkeyService, taskId, context);
}
}
// Add to BullMQ queue // Add to BullMQ queue
await this.queue.add(taskId, queuedTask, { await this.queue.add(taskId, queuedTask, {
priority: 10 - priority + 1, // BullMQ: lower number = higher priority, so invert priority: 10 - priority + 1, // BullMQ: lower number = higher priority, so invert
@@ -214,23 +229,35 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
const { taskId } = job.data; const { taskId } = job.data;
try { try {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
const agentId = session?.agentId;
if (agentId) {
if (this.lifecycleService) {
await this.lifecycleService.transitionToRunning(agentId);
}
this.spawnerService?.setSessionState(agentId, "running");
}
// Update task state to executing // Update task state to executing
await this.valkeyService.updateTaskStatus(taskId, "executing"); await this.valkeyService.updateTaskStatus(taskId, "executing", agentId);
// Publish event // Publish event
await this.valkeyService.publishEvent({ await this.valkeyService.publishEvent({
type: "task.processing", type: "task.executing",
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
taskId, taskId,
data: { attempt: job.attemptsMade + 1 }, agentId,
data: {
attempt: job.attemptsMade + 1,
dispatchedByQueue: true,
},
}); });
// Task processing will be handled by agent spawner
// For now, just mark as processing
return { return {
success: true, success: true,
metadata: { metadata: {
attempt: job.attemptsMade + 1, attempt: job.attemptsMade + 1,
...(agentId && { agentId }),
}, },
}; };
} catch (error) { } catch (error) {
@@ -270,6 +297,14 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
* Handle task failure * Handle task failure
*/ */
private async handleTaskFailure(taskId: string, error: Error): Promise<void> { private async handleTaskFailure(taskId: string, error: Error): Promise<void> {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
if (session) {
this.spawnerService?.setSessionState(session.agentId, "failed", error.message, new Date());
if (this.lifecycleService) {
await this.lifecycleService.transitionToFailed(session.agentId, error.message);
}
}
await this.valkeyService.updateTaskStatus(taskId, "failed", undefined, error.message); await this.valkeyService.updateTaskStatus(taskId, "failed", undefined, error.message);
await this.valkeyService.publishEvent({ await this.valkeyService.publishEvent({
@@ -284,12 +319,25 @@ export class QueueService implements OnModuleInit, OnModuleDestroy {
* Handle task completion * Handle task completion
*/ */
private async handleTaskCompletion(taskId: string): Promise<void> { private async handleTaskCompletion(taskId: string): Promise<void> {
const session = this.spawnerService?.findAgentSessionByTaskId(taskId);
if (session) {
this.spawnerService?.setSessionState(session.agentId, "completed", undefined, new Date());
if (this.lifecycleService) {
await this.lifecycleService.transitionToCompleted(session.agentId);
}
} else {
this.logger.warn(
`Queue completed task ${taskId} but no session was found; using queue-only completion state`
);
}
await this.valkeyService.updateTaskStatus(taskId, "completed"); await this.valkeyService.updateTaskStatus(taskId, "completed");
await this.valkeyService.publishEvent({ await this.valkeyService.publishEvent({
type: "task.completed", type: "task.completed",
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
taskId, taskId,
...(session && { agentId: session.agentId }),
}); });
} }
} }

View File

@@ -37,6 +37,24 @@ export class AgentLifecycleService {
this.logger.log("AgentLifecycleService initialized"); this.logger.log("AgentLifecycleService initialized");
} }
/**
* Register a newly spawned agent in persistent state and emit spawned event.
*/
async registerSpawnedAgent(agentId: string, taskId: string): Promise<AgentState> {
await this.valkeyService.createAgent(agentId, taskId);
const createdState = await this.getAgentState(agentId);
const event: AgentEvent = {
type: "agent.spawned",
agentId,
taskId,
timestamp: new Date().toISOString(),
};
await this.valkeyService.publishEvent(event);
return createdState;
}
/** /**
* Acquire a per-agent mutex to serialize state transitions. * Acquire a per-agent mutex to serialize state transitions.
* Uses promise chaining: each caller chains onto the previous lock, * Uses promise chaining: each caller chains onto the previous lock,

View File

@@ -116,6 +116,33 @@ export class AgentSpawnerService implements OnModuleDestroy {
return this.sessions.get(agentId); return this.sessions.get(agentId);
} }
/**
* Find an active session by task ID.
*/
findAgentSessionByTaskId(taskId: string): AgentSession | undefined {
return Array.from(this.sessions.values()).find((session) => session.taskId === taskId);
}
/**
* Update in-memory session state for visibility in list/status endpoints.
*/
setSessionState(
agentId: string,
state: AgentSession["state"],
error?: string,
completedAt?: Date
): void {
const session = this.sessions.get(agentId);
if (!session) return;
session.state = state;
session.error = error;
if (completedAt) {
session.completedAt = completedAt;
}
this.sessions.set(agentId, session);
}
/** /**
* List all agent sessions * List all agent sessions
* @returns Array of all agent sessions * @returns Array of all agent sessions

View File

@@ -0,0 +1,47 @@
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(request: NextRequest): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
const limit = request.nextUrl.searchParams.get("limit");
const query = limit ? `?limit=${encodeURIComponent(limit)}` : "";
try {
const response = await fetch(`${getOrchestratorUrl()}/agents/events/recent${query}`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,50 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(): Promise<Response> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const upstream = await fetch(`${getOrchestratorUrl()}/agents/events`, {
method: "GET",
headers: {
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
if (!upstream.ok || !upstream.body) {
const text = await upstream.text();
return new NextResponse(text || "Failed to connect to orchestrator events stream", {
status: upstream.status || 502,
});
}
return new Response(upstream.body, {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/health/ready`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function POST(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/pause`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function POST(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/resume`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const response = await fetch(`${getOrchestratorUrl()}/queue/stats`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -55,6 +55,15 @@ const WIDGET_REGISTRY = {
minWidth: 1, minWidth: 1,
minHeight: 1, minHeight: 1,
}, },
OrchestratorEventsWidget: {
name: "orchestrator-events",
displayName: "Orchestrator Events",
description: "Recent events and stream health for orchestration",
defaultWidth: 2,
defaultHeight: 2,
minWidth: 1,
minHeight: 1,
},
} as const; } as const;
type WidgetRegistryKey = keyof typeof WIDGET_REGISTRY; type WidgetRegistryKey = keyof typeof WIDGET_REGISTRY;
@@ -73,7 +82,7 @@ export function HUD({ className = "" }: HUDProps): React.JSX.Element {
const handleAddWidget = (widgetType: WidgetRegistryKey): void => { const handleAddWidget = (widgetType: WidgetRegistryKey): void => {
const widgetConfig = WIDGET_REGISTRY[widgetType]; const widgetConfig = WIDGET_REGISTRY[widgetType];
const widgetId = `${widgetType.toLowerCase()}-${String(Date.now())}`; const widgetId = `${widgetConfig.name}-${String(Date.now())}`;
// Find the next available position // Find the next available position
const maxY = currentLayout?.layout.reduce((max, w): number => Math.max(max, w.y + w.h), 0) ?? 0; const maxY = currentLayout?.layout.reduce((max, w): number => Math.max(max, w.y + w.h), 0) ?? 0;

View File

@@ -0,0 +1,47 @@
import { render, screen } from "@testing-library/react";
import { describe, it, expect, vi } from "vitest";
import { WidgetRenderer } from "./WidgetRenderer";
import type { WidgetPlacement } from "@mosaic/shared";
vi.mock("@/components/widgets", () => ({
TasksWidget: ({ id }: { id: string }): React.JSX.Element => <div>Tasks Widget {id}</div>,
CalendarWidget: ({ id }: { id: string }): React.JSX.Element => <div>Calendar Widget {id}</div>,
QuickCaptureWidget: ({ id }: { id: string }): React.JSX.Element => (
<div>Quick Capture Widget {id}</div>
),
AgentStatusWidget: ({ id }: { id: string }): React.JSX.Element => (
<div>Agent Status Widget {id}</div>
),
OrchestratorEventsWidget: ({ id }: { id: string }): React.JSX.Element => (
<div>Orchestrator Events Widget {id}</div>
),
}));
function createWidgetPlacement(id: string): WidgetPlacement {
return {
i: id,
x: 0,
y: 0,
w: 2,
h: 2,
};
}
describe("WidgetRenderer", () => {
it("renders hyphenated quick-capture widget IDs correctly", () => {
render(<WidgetRenderer widget={createWidgetPlacement("quick-capture-123")} />);
expect(screen.getByText("Quick Capture Widget quick-capture-123")).toBeInTheDocument();
});
it("renders hyphenated agent-status widget IDs correctly", () => {
render(<WidgetRenderer widget={createWidgetPlacement("agent-status-123")} />);
expect(screen.getByText("Agent Status Widget agent-status-123")).toBeInTheDocument();
});
it("renders hyphenated orchestrator-events widget IDs correctly", () => {
render(<WidgetRenderer widget={createWidgetPlacement("orchestrator-events-123")} />);
expect(
screen.getByText("Orchestrator Events Widget orchestrator-events-123")
).toBeInTheDocument();
});
});

View File

@@ -10,6 +10,7 @@ import {
CalendarWidget, CalendarWidget,
QuickCaptureWidget, QuickCaptureWidget,
AgentStatusWidget, AgentStatusWidget,
OrchestratorEventsWidget,
} from "@/components/widgets"; } from "@/components/widgets";
import type { WidgetPlacement } from "@mosaic/shared"; import type { WidgetPlacement } from "@mosaic/shared";
@@ -24,6 +25,7 @@ const WIDGET_COMPONENTS = {
calendar: CalendarWidget, calendar: CalendarWidget,
"quick-capture": QuickCaptureWidget, "quick-capture": QuickCaptureWidget,
"agent-status": AgentStatusWidget, "agent-status": AgentStatusWidget,
"orchestrator-events": OrchestratorEventsWidget,
}; };
const WIDGET_CONFIG = { const WIDGET_CONFIG = {
@@ -43,6 +45,10 @@ const WIDGET_CONFIG = {
displayName: "Agent Status", displayName: "Agent Status",
description: "View running agent sessions", description: "View running agent sessions",
}, },
"orchestrator-events": {
displayName: "Orchestrator Events",
description: "Recent orchestration events and stream health",
},
}; };
export function WidgetRenderer({ export function WidgetRenderer({
@@ -50,8 +56,12 @@ export function WidgetRenderer({
isEditing = false, isEditing = false,
onRemove, onRemove,
}: WidgetRendererProps): React.JSX.Element { }: WidgetRendererProps): React.JSX.Element {
// Extract widget type from ID (e.g., "tasks-123" -> "tasks") // Extract widget type from ID by removing the trailing unique suffix
const widgetType = widget.i.split("-")[0] as keyof typeof WIDGET_COMPONENTS; // (e.g., "agent-status-123" -> "agent-status").
const separatorIndex = widget.i.lastIndexOf("-");
const widgetType = (
separatorIndex > 0 ? widget.i.substring(0, separatorIndex) : widget.i
) as keyof typeof WIDGET_COMPONENTS;
const WidgetComponent = WIDGET_COMPONENTS[widgetType]; const WidgetComponent = WIDGET_COMPONENTS[widgetType];
const config = WIDGET_CONFIG[widgetType] || { displayName: "Widget", description: "" }; const config = WIDGET_CONFIG[widgetType] || { displayName: "Widget", description: "" };

View File

@@ -2,7 +2,7 @@
* Agent Status Widget - shows running agents * Agent Status Widget - shows running agents
*/ */
import { useState, useEffect } from "react"; import { useState, useEffect, useCallback } from "react";
import { Bot, Activity, AlertCircle, CheckCircle, Clock } from "lucide-react"; import { Bot, Activity, AlertCircle, CheckCircle, Clock } from "lucide-react";
import type { WidgetProps } from "@mosaic/shared"; import type { WidgetProps } from "@mosaic/shared";
@@ -21,46 +21,57 @@ export function AgentStatusWidget({ id: _id, config: _config }: WidgetProps): Re
const [isLoading, setIsLoading] = useState(true); const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const fetchAgents = useCallback(async (): Promise<void> => {
setIsLoading(true);
setError(null);
try {
const response = await fetch("/api/orchestrator/agents", {
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
throw new Error(`Failed to fetch agents: ${response.statusText}`);
}
const data = (await response.json()) as Agent[];
setAgents(data);
} catch (err: unknown) {
const errorMessage = err instanceof Error ? err.message : "Unknown error";
console.error("Failed to fetch agents:", errorMessage);
setError(errorMessage);
setAgents([]); // Clear agents on error
} finally {
setIsLoading(false);
}
}, []);
// Fetch agents from orchestrator API // Fetch agents from orchestrator API
useEffect(() => { useEffect(() => {
const fetchAgents = async (): Promise<void> => {
setIsLoading(true);
setError(null);
try {
const response = await fetch("/api/orchestrator/agents", {
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
throw new Error(`Failed to fetch agents: ${response.statusText}`);
}
const data = (await response.json()) as Agent[];
setAgents(data);
} catch (err: unknown) {
const errorMessage = err instanceof Error ? err.message : "Unknown error";
console.error("Failed to fetch agents:", errorMessage);
setError(errorMessage);
setAgents([]); // Clear agents on error
} finally {
setIsLoading(false);
}
};
void fetchAgents(); void fetchAgents();
// Refresh every 30 seconds // Refresh every 30 seconds
const interval = setInterval(() => { const interval = setInterval(() => {
void fetchAgents(); void fetchAgents();
}, 30000); }, 20000);
const eventSource =
typeof EventSource !== "undefined" ? new EventSource("/api/orchestrator/events") : null;
if (eventSource) {
eventSource.onmessage = (): void => {
void fetchAgents();
};
eventSource.onerror = (): void => {
// polling remains fallback
};
}
return (): void => { return (): void => {
clearInterval(interval); clearInterval(interval);
eventSource?.close();
}; };
}, []); }, [fetchAgents]);
const getStatusIcon = (status: string): React.JSX.Element => { const getStatusIcon = (status: string): React.JSX.Element => {
const statusLower = status.toLowerCase(); const statusLower = status.toLowerCase();

View File

@@ -0,0 +1,190 @@
import { useCallback, useEffect, useMemo, useState } from "react";
import { Activity, DatabaseZap, Loader2, Wifi, WifiOff } from "lucide-react";
import type { WidgetProps } from "@mosaic/shared";
interface OrchestratorEvent {
type: string;
timestamp: string;
agentId?: string;
taskId?: string;
data?: Record<string, unknown>;
}
interface RecentEventsResponse {
events: OrchestratorEvent[];
}
function isMatrixSignal(event: OrchestratorEvent): boolean {
const text = JSON.stringify(event).toLowerCase();
return (
text.includes("matrix") ||
text.includes("room") ||
text.includes("channel") ||
text.includes("thread")
);
}
export function OrchestratorEventsWidget({
id: _id,
config: _config,
}: WidgetProps): React.JSX.Element {
const [events, setEvents] = useState<OrchestratorEvent[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [streamConnected, setStreamConnected] = useState(false);
const [backendReady, setBackendReady] = useState<boolean | null>(null);
const loadRecentEvents = useCallback(async (): Promise<void> => {
try {
const response = await fetch("/api/orchestrator/events/recent?limit=25");
if (!response.ok) {
throw new Error(`Unable to load events: HTTP ${String(response.status)}`);
}
const payload = (await response.json()) as unknown;
const events =
payload &&
typeof payload === "object" &&
"events" in payload &&
Array.isArray(payload.events)
? (payload.events as RecentEventsResponse["events"])
: [];
setEvents(events);
setError(null);
} catch (err) {
setError(err instanceof Error ? err.message : "Unable to load events.");
} finally {
setIsLoading(false);
}
}, []);
const loadHealth = useCallback(async (): Promise<void> => {
try {
const response = await fetch("/api/orchestrator/health");
setBackendReady(response.ok);
} catch {
setBackendReady(false);
}
}, []);
useEffect(() => {
void loadRecentEvents();
void loadHealth();
const eventSource =
typeof EventSource !== "undefined" ? new EventSource("/api/orchestrator/events") : null;
if (eventSource) {
eventSource.onopen = (): void => {
setStreamConnected(true);
};
eventSource.onmessage = (): void => {
void loadRecentEvents();
void loadHealth();
};
eventSource.onerror = (): void => {
setStreamConnected(false);
};
}
const interval = setInterval(() => {
void loadRecentEvents();
void loadHealth();
}, 15000);
return (): void => {
clearInterval(interval);
eventSource?.close();
};
}, [loadHealth, loadRecentEvents]);
const matrixSignals = useMemo(
() => events.filter((event) => isMatrixSignal(event)).length,
[events]
);
if (isLoading) {
return (
<div className="flex items-center justify-center h-full">
<Loader2 className="w-5 h-5 text-gray-400 animate-spin" />
<span className="ml-2 text-gray-500 text-sm">Loading orchestrator events...</span>
</div>
);
}
if (error) {
return (
<div className="flex flex-col items-center justify-center h-full text-center">
<WifiOff className="w-5 h-5 text-amber-500 mb-2" />
<span className="text-sm text-amber-600">{error}</span>
</div>
);
}
return (
<div className="flex flex-col h-full space-y-3">
<div className="flex items-center justify-between text-xs">
<div className="flex items-center gap-2 text-gray-600 dark:text-gray-300">
{streamConnected ? (
<Wifi className="w-3 h-3 text-green-500" />
) : (
<WifiOff className="w-3 h-3 text-gray-400" />
)}
<span>{streamConnected ? "Live stream connected" : "Polling mode"}</span>
<span
className={`rounded px-1.5 py-0.5 ${
backendReady === true
? "bg-green-100 text-green-700 dark:bg-green-950 dark:text-green-300"
: backendReady === false
? "bg-amber-100 text-amber-700 dark:bg-amber-950 dark:text-amber-300"
: "bg-gray-100 text-gray-600 dark:bg-gray-800 dark:text-gray-300"
}`}
>
{backendReady === true ? "ready" : backendReady === false ? "degraded" : "unknown"}
</span>
</div>
<div className="flex items-center gap-1 rounded bg-blue-50 dark:bg-blue-950 px-2 py-1 text-blue-700 dark:text-blue-300">
<DatabaseZap className="w-3 h-3" />
<span>Matrix signals: {matrixSignals}</span>
</div>
</div>
<div className="flex-1 overflow-auto space-y-2">
{events.length === 0 ? (
<div className="text-center text-sm text-gray-500 py-4">
No recent orchestration events.
</div>
) : (
events
.slice()
.reverse()
.map((event, index) => (
<div
key={`${event.timestamp}-${event.type}-${String(index)}`}
className="rounded border border-gray-200 dark:border-gray-700 bg-gray-50 dark:bg-gray-900 px-2 py-2"
>
<div className="flex items-center justify-between gap-2">
<div className="flex items-center gap-2 min-w-0">
<Activity className="w-3 h-3 text-blue-500 shrink-0" />
<span className="text-xs font-medium text-gray-900 dark:text-gray-100 truncate">
{event.type}
</span>
{isMatrixSignal(event) && (
<span className="text-[10px] rounded bg-indigo-100 dark:bg-indigo-950 text-indigo-700 dark:text-indigo-300 px-1.5 py-0.5">
matrix
</span>
)}
</div>
<span className="text-[10px] text-gray-500">
{new Date(event.timestamp).toLocaleTimeString()}
</span>
</div>
<div className="mt-1 text-[11px] text-gray-600 dark:text-gray-300">
{event.taskId ? `Task ${event.taskId}` : "Task n/a"}
{event.agentId ? ` · Agent ${event.agentId.slice(0, 8)}` : ""}
</div>
</div>
))
)}
</div>
</div>
);
}

View File

@@ -5,8 +5,8 @@
* including status, elapsed time, and work item details. * including status, elapsed time, and work item details.
*/ */
import { useState, useEffect } from "react"; import { useState, useEffect, useCallback } from "react";
import { Activity, CheckCircle, XCircle, Clock, Loader2 } from "lucide-react"; import { Activity, CheckCircle, XCircle, Clock, Loader2, Pause, Play } from "lucide-react";
import type { WidgetProps } from "@mosaic/shared"; import type { WidgetProps } from "@mosaic/shared";
interface AgentTask { interface AgentTask {
@@ -19,6 +19,21 @@ interface AgentTask {
error?: string; error?: string;
} }
interface QueueStats {
pending: number;
active: number;
completed: number;
failed: number;
delayed: number;
}
interface RecentOrchestratorEvent {
type: string;
timestamp: string;
taskId?: string;
agentId?: string;
}
function getElapsedTime(spawnedAt: string, completedAt?: string): string { function getElapsedTime(spawnedAt: string, completedAt?: string): string {
const start = new Date(spawnedAt).getTime(); const start = new Date(spawnedAt).getTime();
const end = completedAt ? new Date(completedAt).getTime() : Date.now(); const end = completedAt ? new Date(completedAt).getTime() : Date.now();
@@ -94,34 +109,108 @@ function getAgentTypeLabel(agentType: string): string {
export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): React.JSX.Element { export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): React.JSX.Element {
const [tasks, setTasks] = useState<AgentTask[]>([]); const [tasks, setTasks] = useState<AgentTask[]>([]);
const [queueStats, setQueueStats] = useState<QueueStats | null>(null);
const [recentEvents, setRecentEvents] = useState<RecentOrchestratorEvent[]>([]);
const [isLoading, setIsLoading] = useState(true); const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const [isQueuePaused, setIsQueuePaused] = useState(false);
const [isActionPending, setIsActionPending] = useState(false);
const fetchTasks = useCallback(async (): Promise<void> => {
try {
const res = await fetch("/api/orchestrator/agents");
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
const data = (await res.json()) as AgentTask[];
setTasks(data);
setError(null);
setIsLoading(false);
} catch {
setError("Unable to reach orchestrator");
setIsLoading(false);
}
}, []);
const fetchQueueStats = useCallback(async (): Promise<void> => {
try {
const res = await fetch("/api/orchestrator/queue/stats");
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
const data = (await res.json()) as QueueStats;
setQueueStats(data);
// Heuristic: active=0 with pending>0 for sustained windows usually means paused.
setIsQueuePaused(data.active === 0 && data.pending > 0);
} catch {
// Keep widget functional even if queue controls are temporarily unavailable.
}
}, []);
const fetchRecentEvents = useCallback(async (): Promise<void> => {
try {
const res = await fetch("/api/orchestrator/events/recent?limit=5");
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
const payload = (await res.json()) as unknown;
const events =
payload &&
typeof payload === "object" &&
"events" in payload &&
Array.isArray(payload.events)
? (payload.events as RecentOrchestratorEvent[])
: [];
setRecentEvents(events);
} catch {
// Optional enhancement path; do not fail widget if recent-events endpoint is unavailable.
}
}, []);
const setQueueState = useCallback(
async (action: "pause" | "resume"): Promise<void> => {
setIsActionPending(true);
try {
const res = await fetch(`/api/orchestrator/queue/${action}`, {
method: "POST",
});
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
setIsQueuePaused(action === "pause");
await fetchQueueStats();
} catch {
setError("Unable to control queue state");
} finally {
setIsActionPending(false);
}
},
[fetchQueueStats]
);
useEffect(() => { useEffect(() => {
const fetchTasks = (): void => { void fetchTasks();
fetch("/api/orchestrator/agents") void fetchQueueStats();
.then((res) => { void fetchRecentEvents();
if (!res.ok) throw new Error(`HTTP ${String(res.status)}`);
return res.json() as Promise<AgentTask[]>;
})
.then((data) => {
setTasks(data);
setError(null);
setIsLoading(false);
})
.catch(() => {
setError("Unable to reach orchestrator");
setIsLoading(false);
});
};
fetchTasks(); const interval = setInterval(() => {
const interval = setInterval(fetchTasks, 15000); void fetchTasks();
void fetchQueueStats();
void fetchRecentEvents();
}, 15000);
const eventSource =
typeof EventSource !== "undefined" ? new EventSource("/api/orchestrator/events") : null;
if (eventSource) {
eventSource.onmessage = (): void => {
void fetchTasks();
void fetchQueueStats();
void fetchRecentEvents();
};
eventSource.onerror = (): void => {
// Polling remains the resilience path.
};
}
return (): void => { return (): void => {
clearInterval(interval); clearInterval(interval);
eventSource?.close();
}; };
}, []); }, [fetchTasks, fetchQueueStats, fetchRecentEvents]);
const latestEvent = recentEvents.length > 0 ? recentEvents[recentEvents.length - 1] : null;
const stats = { const stats = {
total: tasks.length, total: tasks.length,
@@ -151,6 +240,30 @@ export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): R
return ( return (
<div className="flex flex-col h-full space-y-3"> <div className="flex flex-col h-full space-y-3">
<div className="flex items-center justify-between">
<div className="text-xs text-gray-500 dark:text-gray-400">
Queue: {isQueuePaused ? "Paused" : "Running"}
</div>
<button
type="button"
onClick={(): void => {
void setQueueState(isQueuePaused ? "resume" : "pause");
}}
disabled={isActionPending}
className="inline-flex items-center gap-1 rounded border border-gray-300 dark:border-gray-700 px-2 py-1 text-xs hover:bg-gray-100 dark:hover:bg-gray-800 disabled:opacity-50"
>
{isQueuePaused ? <Play className="w-3 h-3" /> : <Pause className="w-3 h-3" />}
{isQueuePaused ? "Resume" : "Pause"}
</button>
</div>
{latestEvent && (
<div className="rounded bg-gray-50 dark:bg-gray-800 px-2 py-1 text-xs text-gray-600 dark:text-gray-300">
Latest: {latestEvent.type}
{latestEvent.taskId ? ` · ${latestEvent.taskId}` : ""}
</div>
)}
{/* Summary stats */} {/* Summary stats */}
<div className="grid grid-cols-4 gap-1 text-center text-xs"> <div className="grid grid-cols-4 gap-1 text-center text-xs">
<div className="bg-gray-50 dark:bg-gray-800 rounded p-2"> <div className="bg-gray-50 dark:bg-gray-800 rounded p-2">
@@ -173,6 +286,29 @@ export function TaskProgressWidget({ id: _id, config: _config }: WidgetProps): R
</div> </div>
</div> </div>
{queueStats && (
<div className="grid grid-cols-3 gap-1 text-center text-xs">
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.pending}
</div>
<div className="text-gray-500">Queued</div>
</div>
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.active}
</div>
<div className="text-gray-500">Workers</div>
</div>
<div className="bg-gray-50 dark:bg-gray-800 rounded p-1">
<div className="font-semibold text-gray-700 dark:text-gray-200">
{queueStats.failed}
</div>
<div className="text-gray-500">Failed</div>
</div>
</div>
)}
{/* Task list */} {/* Task list */}
<div className="flex-1 overflow-auto space-y-2"> <div className="flex-1 overflow-auto space-y-2">
{tasks.length === 0 ? ( {tasks.length === 0 ? (

View File

@@ -10,6 +10,7 @@ import { QuickCaptureWidget } from "./QuickCaptureWidget";
import { AgentStatusWidget } from "./AgentStatusWidget"; import { AgentStatusWidget } from "./AgentStatusWidget";
import { ActiveProjectsWidget } from "./ActiveProjectsWidget"; import { ActiveProjectsWidget } from "./ActiveProjectsWidget";
import { TaskProgressWidget } from "./TaskProgressWidget"; import { TaskProgressWidget } from "./TaskProgressWidget";
import { OrchestratorEventsWidget } from "./OrchestratorEventsWidget";
export interface WidgetDefinition { export interface WidgetDefinition {
name: string; name: string;
@@ -95,6 +96,17 @@ export const widgetRegistry: Record<string, WidgetDefinition> = {
minHeight: 2, minHeight: 2,
maxWidth: 3, maxWidth: 3,
}, },
OrchestratorEventsWidget: {
name: "OrchestratorEventsWidget",
displayName: "Orchestrator Events",
description: "Recent orchestration events with stream/Matrix visibility",
component: OrchestratorEventsWidget,
defaultWidth: 2,
defaultHeight: 2,
minWidth: 1,
minHeight: 2,
maxWidth: 4,
},
}; };
/** /**

View File

@@ -0,0 +1,82 @@
import { render, screen, waitFor } from "@testing-library/react";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { OrchestratorEventsWidget } from "../OrchestratorEventsWidget";
describe("OrchestratorEventsWidget", () => {
const mockFetch = vi.fn();
beforeEach(() => {
global.fetch = mockFetch as unknown as typeof fetch;
});
afterEach(() => {
vi.clearAllMocks();
});
it("renders loading state initially", () => {
mockFetch.mockImplementation(
// eslint-disable-next-line @typescript-eslint/no-empty-function
() => new Promise(() => {})
);
render(<OrchestratorEventsWidget id="orchestrator-events-1" config={{}} />);
expect(screen.getByText("Loading orchestrator events...")).toBeInTheDocument();
});
it("renders events and matrix signal count", async () => {
mockFetch.mockImplementation((input: RequestInfo | URL) => {
const url =
typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
if (url.includes("/api/orchestrator/health")) {
return Promise.resolve({
ok: true,
json: () => Promise.resolve({ status: "ok" }),
} as unknown as Response);
}
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
events: [
{
type: "task.completed",
timestamp: "2026-02-17T16:40:00.000Z",
taskId: "TASK-1",
data: { channelId: "room-123" },
},
{
type: "agent.running",
timestamp: "2026-02-17T16:41:00.000Z",
taskId: "TASK-2",
agentId: "agent-abc12345",
},
],
}),
} as unknown as Response);
});
render(<OrchestratorEventsWidget id="orchestrator-events-1" config={{}} />);
await waitFor(() => {
expect(screen.getByText("task.completed")).toBeInTheDocument();
expect(screen.getByText("agent.running")).toBeInTheDocument();
expect(screen.getByText(/Matrix signals: 1/)).toBeInTheDocument();
expect(screen.getByText("ready")).toBeInTheDocument();
});
});
it("renders error state when API fails", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 503,
});
render(<OrchestratorEventsWidget id="orchestrator-events-1" config={{}} />);
await waitFor(() => {
expect(screen.getByText(/Unable to load events: HTTP 503/)).toBeInTheDocument();
});
});
});

View File

@@ -242,4 +242,58 @@ describe("TaskProgressWidget", (): void => {
expect(taskElements[1]?.textContent).toBe("COMPLETED-TASK"); expect(taskElements[1]?.textContent).toBe("COMPLETED-TASK");
}); });
}); });
it("should display latest orchestrator event when available", async (): Promise<void> => {
mockFetch.mockImplementation((input: RequestInfo | URL) => {
let url = "";
if (typeof input === "string") {
url = input;
} else if (input instanceof URL) {
url = input.toString();
} else {
url = input.url;
}
if (url.includes("/api/orchestrator/agents")) {
return Promise.resolve({
ok: true,
json: () => Promise.resolve([]),
} as unknown as Response);
}
if (url.includes("/api/orchestrator/queue/stats")) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
pending: 0,
active: 0,
completed: 0,
failed: 0,
delayed: 0,
}),
} as unknown as Response);
}
if (url.includes("/api/orchestrator/events/recent")) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
events: [
{
type: "task.executing",
timestamp: new Date().toISOString(),
taskId: "TASK-123",
},
],
}),
} as unknown as Response);
}
return Promise.reject(new Error("Unknown endpoint"));
});
render(<TaskProgressWidget id="task-progress-1" />);
await waitFor(() => {
expect(screen.getByText(/Latest: task.executing · TASK-123/i)).toBeInTheDocument();
});
});
}); });

View File

@@ -10,6 +10,7 @@ import { widgetRegistry } from "../WidgetRegistry";
import { TasksWidget } from "../TasksWidget"; import { TasksWidget } from "../TasksWidget";
import { CalendarWidget } from "../CalendarWidget"; import { CalendarWidget } from "../CalendarWidget";
import { QuickCaptureWidget } from "../QuickCaptureWidget"; import { QuickCaptureWidget } from "../QuickCaptureWidget";
import { OrchestratorEventsWidget } from "../OrchestratorEventsWidget";
describe("WidgetRegistry", (): void => { describe("WidgetRegistry", (): void => {
it("should have a registry of widgets", (): void => { it("should have a registry of widgets", (): void => {
@@ -32,6 +33,11 @@ describe("WidgetRegistry", (): void => {
expect(widgetRegistry.QuickCaptureWidget!.component).toBe(QuickCaptureWidget); expect(widgetRegistry.QuickCaptureWidget!.component).toBe(QuickCaptureWidget);
}); });
it("should include OrchestratorEventsWidget in registry", (): void => {
expect(widgetRegistry.OrchestratorEventsWidget).toBeDefined();
expect(widgetRegistry.OrchestratorEventsWidget!.component).toBe(OrchestratorEventsWidget);
});
it("should have correct metadata for TasksWidget", (): void => { it("should have correct metadata for TasksWidget", (): void => {
const tasksWidget = widgetRegistry.TasksWidget!; const tasksWidget = widgetRegistry.TasksWidget!;
expect(tasksWidget.name).toBe("TasksWidget"); expect(tasksWidget.name).toBe("TasksWidget");

View File

@@ -6,3 +6,4 @@ export { TasksWidget } from "./TasksWidget";
export { CalendarWidget } from "./CalendarWidget"; export { CalendarWidget } from "./CalendarWidget";
export { QuickCaptureWidget } from "./QuickCaptureWidget"; export { QuickCaptureWidget } from "./QuickCaptureWidget";
export { AgentStatusWidget } from "./AgentStatusWidget"; export { AgentStatusWidget } from "./AgentStatusWidget";
export { OrchestratorEventsWidget } from "./OrchestratorEventsWidget";

View File

@@ -14,6 +14,70 @@ const DEFAULT_LAYOUT_NAME = "default";
*/ */
const WORKSPACE_KEY = "mosaic-workspace-id"; const WORKSPACE_KEY = "mosaic-workspace-id";
function createDefaultLayout(): LayoutConfig {
return {
id: DEFAULT_LAYOUT_NAME,
name: "Default Layout",
layout: [
{
i: "tasks-1",
x: 0,
y: 0,
w: 2,
h: 3,
minW: 1,
minH: 2,
isDraggable: true,
isResizable: true,
},
{
i: "calendar-1",
x: 2,
y: 0,
w: 2,
h: 2,
minW: 1,
minH: 2,
isDraggable: true,
isResizable: true,
},
{
i: "agent-status-1",
x: 2,
y: 2,
w: 2,
h: 2,
minW: 1,
minH: 1,
isDraggable: true,
isResizable: true,
},
{
i: "orchestrator-events-1",
x: 0,
y: 3,
w: 2,
h: 2,
minW: 1,
minH: 1,
isDraggable: true,
isResizable: true,
},
{
i: "quick-capture-1",
x: 2,
y: 4,
w: 2,
h: 1,
minW: 1,
minH: 1,
isDraggable: true,
isResizable: true,
},
],
};
}
interface UseLayoutReturn { interface UseLayoutReturn {
layouts: Record<string, LayoutConfig>; layouts: Record<string, LayoutConfig>;
currentLayout: LayoutConfig | undefined; currentLayout: LayoutConfig | undefined;
@@ -45,7 +109,18 @@ export function useLayout(): UseLayoutReturn {
if (stored) { if (stored) {
const emptyFallback: Record<string, LayoutConfig> = {}; const emptyFallback: Record<string, LayoutConfig> = {};
const parsed = safeJsonParse(stored, isLayoutConfigRecord, emptyFallback); const parsed = safeJsonParse(stored, isLayoutConfigRecord, emptyFallback);
setLayouts(parsed as Record<string, LayoutConfig>); const parsedLayouts = parsed as Record<string, LayoutConfig>;
if (Object.keys(parsedLayouts).length > 0) {
setLayouts(parsedLayouts);
} else {
setLayouts({
[DEFAULT_LAYOUT_NAME]: createDefaultLayout(),
});
}
} else {
setLayouts({
[DEFAULT_LAYOUT_NAME]: createDefaultLayout(),
});
} }
// Load current layout ID preference // Load current layout ID preference
@@ -195,11 +270,7 @@ export function useLayout(): UseLayoutReturn {
const resetLayout = useCallback(() => { const resetLayout = useCallback(() => {
setLayouts({ setLayouts({
[DEFAULT_LAYOUT_NAME]: { [DEFAULT_LAYOUT_NAME]: createDefaultLayout(),
id: DEFAULT_LAYOUT_NAME,
name: "Default Layout",
layout: [],
},
}); });
setCurrentLayoutId(DEFAULT_LAYOUT_NAME); setCurrentLayoutId(DEFAULT_LAYOUT_NAME);
}, []); }, []);

View File

@@ -342,3 +342,49 @@
| REV-2026-006 | done | medium | qa+architecture | `MosaicTelemetryModule` imports `AuthModule`, causing telemetry module tests to fail on unrelated `ENCRYPTION_KEY` auth config requirements. Decouple telemetry module dependencies or provide test-safe module overrides. | `apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts:36`, `apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts:1` | | REV-2026-006 | done | medium | qa+architecture | `MosaicTelemetryModule` imports `AuthModule`, causing telemetry module tests to fail on unrelated `ENCRYPTION_KEY` auth config requirements. Decouple telemetry module dependencies or provide test-safe module overrides. | `apps/api/src/mosaic-telemetry/mosaic-telemetry.module.ts:36`, `apps/api/src/mosaic-telemetry/mosaic-telemetry.module.spec.ts:1` |
| REV-2026-007 | done | medium | qa | Frontend skip cleanup completed for scoped findings: `TasksWidget`, `CalendarWidget`, and `LinkAutocomplete` coverage now runs with deterministic assertions and no stale `it.skip` markers in those suites. | `apps/web/src/components/widgets/__tests__/TasksWidget.test.tsx:1`, `apps/web/src/components/widgets/__tests__/CalendarWidget.test.tsx:1`, `apps/web/src/components/knowledge/__tests__/LinkAutocomplete.test.tsx:1` | | REV-2026-007 | done | medium | qa | Frontend skip cleanup completed for scoped findings: `TasksWidget`, `CalendarWidget`, and `LinkAutocomplete` coverage now runs with deterministic assertions and no stale `it.skip` markers in those suites. | `apps/web/src/components/widgets/__tests__/TasksWidget.test.tsx:1`, `apps/web/src/components/widgets/__tests__/CalendarWidget.test.tsx:1`, `apps/web/src/components/knowledge/__tests__/LinkAutocomplete.test.tsx:1` |
| REV-2026-008 | done | low | tooling | Repo session bootstrap reliability issue: `scripts/agent/session-start.sh` fails due stale branch tracking ref, which can silently block required lifecycle checks. Update script to tolerate missing remote branch or self-heal branch config. | `scripts/agent/session-start.sh:10`, `scripts/agent/session-start.sh:16`, `scripts/agent/session-start.sh:34` | | REV-2026-008 | done | low | tooling | Repo session bootstrap reliability issue: `scripts/agent/session-start.sh` fails due stale branch tracking ref, which can silently block required lifecycle checks. Update script to tolerate missing remote branch or self-heal branch config. | `scripts/agent/session-start.sh:10`, `scripts/agent/session-start.sh:16`, `scripts/agent/session-start.sh:34` |
---
## 2026-02-17 Orchestrator Streaming + Queue Control Follow-up
**Orchestrator:** Jarvis (Codex runtime)
**Branch:** `fix/auth-frontend-remediation`
### Tasks
| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used |
| ----------- | ------ | ------------------------------------------------------------------------------------------------------------------- | ----- | ---------------- | ----------------------------- | ------------ | ------ | ----- | ----------------- | ----------------- | -------- | ---- |
| ORCH-FU-001 | done | Add orchestrator SSE event stream endpoint and service fan-out (`/agents/events`) with initial snapshot + heartbeat | #411 | orchestrator,web | fix/auth-frontend-remediation | REV-2026-001 | | orch | 2026-02-17T15:00Z | 2026-02-17T15:18Z | 20K | 24K |
| ORCH-FU-002 | done | Add queue control API (`/queue/stats`, `/queue/pause`, `/queue/resume`) and web proxy routes | #411 | orchestrator,web | fix/auth-frontend-remediation | ORCH-FU-001 | | orch | 2026-02-17T15:18Z | 2026-02-17T15:24Z | 12K | 15K |
| ORCH-FU-003 | done | Wire `TaskProgressWidget` and `AgentStatusWidget` to live SSE updates with polling fallback | #411 | web | fix/auth-frontend-remediation | ORCH-FU-001 | | orch | 2026-02-17T15:24Z | 2026-02-17T15:33Z | 15K | 18K |
| ORCH-FU-004 | done | Persist spawned state in lifecycle + align queue state transitions/events for spawned/non-spawned paths | #411 | orchestrator | fix/auth-frontend-remediation | ORCH-FU-001 | | orch | 2026-02-17T15:33Z | 2026-02-17T15:40Z | 15K | 18K |
| ORCH-FU-005 | done | Harden repo-local Mosaic linkage paths (`~/.config/mosaic`) and ignore orchestrator runtime artifacts | #411 | docs,tooling | fix/auth-frontend-remediation | ORCH-FU-004 | | orch | 2026-02-17T15:40Z | 2026-02-17T15:45Z | 8K | 6K |
| ORCH-FU-V01 | done | Verification: orchestrator and web targeted test suites pass after follow-up changes | #411 | all | fix/auth-frontend-remediation | ORCH-FU-001 | | orch | 2026-02-17T15:45Z | 2026-02-17T15:48Z | 5K | 3K |
### Verification Snapshot
- `pnpm --filter @mosaic/orchestrator test -- src/api/queue/queue.controller.spec.ts src/api/agents/agents.controller.spec.ts src/api/agents/agents-killswitch.controller.spec.ts src/queue/queue.service.spec.ts src/config/orchestrator.config.spec.ts`: pass (`26` files, `737` tests)
- `pnpm --filter @mosaic/web test -- src/components/widgets/__tests__/TaskProgressWidget.test.tsx src/components/widgets/__tests__/AgentStatusWidget.test.tsx`: pass (`89` files, `1117` tests, `3` skipped)
---
## 2026-02-17 Orchestrator Observability Follow-up
**Orchestrator:** Jarvis (Codex runtime)
**Branch:** `feature/mosaic-stack-finalization`
### Tasks
| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used |
| ------------ | ------ | --------------------------------------------------------------------------------------------------- | ----- | ----------------- | --------------------------------- | ------------ | ------ | ----- | ----------------- | ----------------- | -------- | ---- |
| ORCH-OBS-001 | done | Add recent event buffer + endpoint (`GET /agents/events/recent?limit=`) for non-SSE polling clients | #411 | orchestrator | feature/mosaic-stack-finalization | ORCH-FU-001 | | orch | 2026-02-17T16:20Z | 2026-02-17T16:28Z | 10K | 8K |
| ORCH-OBS-002 | done | Add web proxy route for recent orchestrator events (`/api/orchestrator/events/recent`) | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:28Z | 2026-02-17T16:31Z | 5K | 4K |
| ORCH-OBS-003 | done | Add repo-level monitor script (`scripts/agent/orchestrator-events.sh`) for recent/watch modes | #411 | tooling | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:31Z | 2026-02-17T16:36Z | 8K | 5K |
| ORCH-OBS-004 | done | Add tests/docs updates for recent events and operator command usage | #411 | orchestrator,docs | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:36Z | 2026-02-17T16:40Z | 8K | 6K |
| ORCH-OBS-005 | done | Fix HUD widget ID generation/parsing for hyphenated widget types (`quick-capture`, `agent-status`) | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-004 | | orch | 2026-02-17T16:42Z | 2026-02-17T16:48Z | 8K | 6K |
| ORCH-OBS-006 | done | Add `WidgetRenderer` regression tests for hyphenated widget IDs | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-005 | | orch | 2026-02-17T16:48Z | 2026-02-17T16:50Z | 5K | 3K |
| ORCH-OBS-007 | done | Add `OrchestratorEventsWidget` for live/recent orchestration visibility with Matrix signal hints | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-002 | | orch | 2026-02-17T16:55Z | 2026-02-17T17:03Z | 12K | 9K |
| ORCH-OBS-008 | done | Integrate new widget into HUD/WidgetRegistry and extend widget regression coverage | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-007 | | orch | 2026-02-17T17:03Z | 2026-02-17T17:08Z | 10K | 7K |
| ORCH-OBS-009 | done | Seed default/reset local HUD layout with orchestration widgets so visibility works out-of-box | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-008 | | orch | 2026-02-17T17:10Z | 2026-02-17T17:14Z | 8K | 6K |
| ORCH-OBS-010 | done | Enrich `TaskProgressWidget` with latest recent-event context from `/api/orchestrator/events/recent` | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-009 | | orch | 2026-02-17T17:15Z | 2026-02-17T17:20Z | 8K | 6K |
| ORCH-OBS-011 | done | Add orchestrator health proxy and readiness badge (`ready/degraded`) in events widget | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-010 | | orch | 2026-02-17T17:22Z | 2026-02-17T17:27Z | 8K | 6K |

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=./common.sh
source "$SCRIPT_DIR/common.sh"
ensure_repo_root
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.config/mosaic}"
ORCH_DIR=".mosaic/orchestrator"
PID_FILE="$ORCH_DIR/orchestrator.pid"
LOG_FILE="$ORCH_DIR/logs/daemon.log"
usage() {
cat <<USAGE
Usage: $(basename "$0") <start|drain|stop|status> [--poll-sec N] [--no-sync]
Commands:
start Run orchestrator drain loop in background (detached)
drain Run orchestrator drain loop in foreground (until queue drained)
stop Stop background orchestrator if running
status Show background orchestrator status
Options:
--poll-sec N Poll interval (default: 15)
--no-sync Skip docs/tasks.md -> orchestrator queue sync before run
USAGE
}
cmd="${1:-status}"
if [[ $# -gt 0 ]]; then
shift
fi
poll_sec=15
sync_arg=""
while [[ $# -gt 0 ]]; do
case "$1" in
--poll-sec)
poll_sec="${2:-15}"
shift 2
;;
--no-sync)
sync_arg="--no-sync"
shift
;;
*)
echo "[agent-framework] unknown argument: $1" >&2
usage
exit 1
;;
esac
done
mkdir -p "$ORCH_DIR/logs" "$ORCH_DIR/results"
is_running() {
[[ -f "$PID_FILE" ]] || return 1
local pid
pid="$(cat "$PID_FILE" 2>/dev/null || true)"
[[ -n "$pid" ]] || return 1
kill -0 "$pid" 2>/dev/null
}
case "$cmd" in
start)
if is_running; then
echo "[agent-framework] orchestrator already running (pid=$(cat "$PID_FILE"))"
exit 0
fi
nohup "$MOSAIC_HOME/bin/mosaic-orchestrator-drain" --poll-sec "$poll_sec" $sync_arg >"$LOG_FILE" 2>&1 &
echo "$!" > "$PID_FILE"
echo "[agent-framework] orchestrator started (pid=$!, log=$LOG_FILE)"
;;
drain)
exec "$MOSAIC_HOME/bin/mosaic-orchestrator-drain" --poll-sec "$poll_sec" $sync_arg
;;
stop)
if ! is_running; then
echo "[agent-framework] orchestrator not running"
rm -f "$PID_FILE"
exit 0
fi
pid="$(cat "$PID_FILE")"
kill "$pid" || true
rm -f "$PID_FILE"
echo "[agent-framework] orchestrator stopped (pid=$pid)"
;;
status)
if is_running; then
echo "[agent-framework] orchestrator running (pid=$(cat "$PID_FILE"), log=$LOG_FILE)"
else
echo "[agent-framework] orchestrator not running"
rm -f "$PID_FILE"
fi
;;
*)
usage
exit 1
;;
esac

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=./common.sh
source "$SCRIPT_DIR/common.sh"
ensure_repo_root
ORCH_URL="${ORCHESTRATOR_URL:-http://localhost:3001}"
API_KEY="${ORCHESTRATOR_API_KEY:-}"
usage() {
cat <<USAGE
Usage: $(basename "$0") <recent|watch> [--limit N]
Commands:
recent Fetch recent orchestrator events (JSON)
watch Stream live orchestrator events (SSE)
Options:
--limit N Number of recent events to return (default: 50)
Environment:
ORCHESTRATOR_URL Orchestrator base URL (default: http://localhost:3001)
ORCHESTRATOR_API_KEY Required API key for orchestrator requests
USAGE
}
cmd="${1:-recent}"
if [[ $# -gt 0 ]]; then
shift
fi
limit=50
while [[ $# -gt 0 ]]; do
case "$1" in
--limit)
limit="${2:-50}"
shift 2
;;
*)
echo "[agent-framework] unknown argument: $1" >&2
usage
exit 1
;;
esac
done
if [[ -z "$API_KEY" ]]; then
echo "[agent-framework] ORCHESTRATOR_API_KEY is required" >&2
exit 1
fi
case "$cmd" in
recent)
curl -fsSL \
-H "X-API-Key: $API_KEY" \
-H "Content-Type: application/json" \
"$ORCH_URL/agents/events/recent?limit=$limit"
;;
watch)
curl -NfsSL \
-H "X-API-Key: $API_KEY" \
-H "Accept: text/event-stream" \
"$ORCH_URL/agents/events"
;;
*)
usage
exit 1
;;
esac

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env bash
set -euo pipefail
task_file="${1:-}"
if [[ -z "$task_file" || ! -f "$task_file" ]]; then
echo "[orchestrator-worker] missing task file argument" >&2
exit 1
fi
worker_exec="${MOSAIC_WORKER_EXEC:-}"
if [[ -z "$worker_exec" ]]; then
if command -v codex >/dev/null 2>&1; then
worker_exec="codex -p"
elif command -v opencode >/dev/null 2>&1; then
worker_exec="opencode -p"
else
echo "[orchestrator-worker] set MOSAIC_WORKER_EXEC to your worker command (example: 'codex -p' or 'opencode -p')" >&2
exit 1
fi
fi
prompt="$(python3 - "$task_file" <<'PY'
import json
import sys
from pathlib import Path
task = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8"))
task_id = str(task.get("id", "TASK"))
title = str(task.get("title", ""))
description = str(task.get("description", ""))
meta = task.get("metadata", {}) or {}
issue = str(meta.get("issue", ""))
repo = str(meta.get("repo", ""))
branch = str(meta.get("branch", ""))
depends = task.get("depends_on", [])
if isinstance(depends, list):
depends_str = ", ".join(str(x) for x in depends)
else:
depends_str = str(depends)
print(
f"""Read ~/.config/mosaic/STANDARDS.md, then AGENTS.md and SOUL.md (if present).
Complete this queued task fully.
Task ID: {task_id}
Title: {title}
Description: {description}
Issue: {issue}
Repo hint: {repo}
Branch hint: {branch}
Depends on: {depends_str}
Requirements:
- Implement and verify the task end-to-end.
- Keep changes scoped to this task.
- Run project checks and tests relevant to touched code.
- Return with a concise summary of what changed and verification results.
"""
)
PY
)"
PROMPT="$prompt" bash -lc "$worker_exec \"\$PROMPT\""