diff --git a/docs/3-architecture/assignment-based-trigger-architecture.md b/docs/3-architecture/assignment-based-trigger-architecture.md new file mode 100644 index 0000000..d9b1c0a --- /dev/null +++ b/docs/3-architecture/assignment-based-trigger-architecture.md @@ -0,0 +1,826 @@ +# Assignment-Based Trigger Architecture + +**Status:** Proposed (Phase 0 - Foundation) +**Related Issues:** #142 (Epic), Phase 0 issues +**Priority:** Critical (P0) - Required before orchestration can function +**Milestone:** M4.1-Coordinator (0.0.4) + +--- + +## Executive Summary + +The coordinator uses **issue assignment** as the trigger mechanism. When a user assigns an issue to the special `@mosaic` user in Gitea, a webhook fires, the coordinator parses the issue metadata, adds it to the queue, and begins work. + +**Key insight:** Assignment is the perfect trigger because it's: + +- ✅ Natural Gitea workflow (users already assign issues) +- ✅ Visual feedback (can see what coordinator is working on) +- ✅ Webhook-friendly (Gitea sends events on assignment) +- ✅ Granular control (assign what you want, when you want) +- ✅ No CLI needed (just use Gitea UI) + +--- + +## Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ GITEA REPOSITORY │ +│ │ +│ User assigns issue to @mosaic │ +│ │ │ +│ └──> Webhook: issue.assigned │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ COORDINATOR WEBHOOK RECEIVER │ +│ │ +│ 1. Verify webhook signature │ +│ 2. Check assignee == "mosaic" │ +│ 3. Launch AI agent to parse issue │ +│ 4. Add to queue │ +│ 5. Comment on issue: "Added to queue..." │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ ISSUE PARSER AGENT │ +│ (AI - Sonnet/GLM) │ +│ │ +│ Input: Issue markdown body │ +│ Output: Structured metadata │ +│ - estimated_context (tokens) │ +│ - difficulty (low/medium/high) │ +│ - assigned_agent (opus/sonnet/glm) │ +│ - dependencies (blocked_by) │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ COORDINATOR QUEUE │ +│ │ +│ [Issue #154] COORD-001 (46.8K, medium, glm) → Ready │ +│ [Issue #155] COORD-002 (49.4K, medium, glm) → Blocked │ +│ [Issue #143] COORD-003 (40.3K, low, glm) → Blocked │ +└─────────────────────┬───────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ COORDINATOR ORCHESTRATION LOOP │ +│ │ +│ 1. Get next ready issue from queue │ +│ 2. Spawn agent (type from metadata) │ +│ 3. Monitor context usage │ +│ 4. Run quality gates on completion │ +│ 5. Comment progress to Gitea │ +│ 6. Mark complete, move to next │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## User Workflow + +### Initial Setup (One Time) + +**1. Create coordinator user in Gitea:** + +``` +Gitea Admin → Users → Create User + Username: mosaic + Email: mosaic@mosaicstack.dev + Type: Bot account +``` + +**2. Register webhook in repo:** + +``` +Gitea Repo → Settings → Webhooks → Add Webhook + URL: https://coordinator.example.com/webhook/gitea + Secret: [shared secret] + Events: Issues (assigned, unassigned, closed, reopened) +``` + +**3. Start coordinator service:** + +```bash +docker compose up -d coordinator +# or +python coordinator/main.py --config coordinator.yml +``` + +### Daily Usage + +**Start work on issues:** + +``` +Gitea UI → Issue #154 → Assign to @mosaic +``` + +**Start entire milestone:** + +``` +Gitea UI → Milestone M4.1-Coordinator → Bulk assign to @mosaic +``` + +**Pause work:** + +``` +Gitea UI → Issue #154 → Unassign from @mosaic +``` + +**Check status:** + +``` +Gitea UI → Issue #154 → View comments from @mosaic +``` + +--- + +## Component Details + +### 1. Webhook Receiver + +**Technology:** FastAPI (Python) + +**Endpoints:** + +```python +@app.post('/webhook/gitea') +async def handle_gitea_webhook( + request: Request, + x_gitea_signature: str = Header(...) +): + """Handle Gitea webhook events.""" + + # 1. Verify signature + payload = await request.json() + if not verify_signature(payload, x_gitea_signature): + raise HTTPException(401, "Invalid signature") + + # 2. Route by action + if payload['action'] == 'assigned': + await handle_issue_assigned(payload) + elif payload['action'] == 'unassigned': + await handle_issue_unassigned(payload) + elif payload['action'] == 'closed': + await handle_issue_closed(payload) + + return {"status": "ok"} + + +async def handle_issue_assigned(payload: dict): + """User assigned issue to coordinator.""" + + # Check if assigned to coordinator user + if payload['assignee']['username'] != COORDINATOR_USERNAME: + return # Not for us + + issue = payload['issue'] + + # Launch parser agent + metadata = await parse_issue_metadata(issue) + + # Add to queue + queue_manager.enqueue( + issue_number=issue['number'], + title=issue['title'], + metadata=metadata + ) + + # Comment on issue + await gitea_client.comment_on_issue( + issue['number'], + f"🤖 Added to coordinator queue.\n\n" + f"**Metadata:**\n" + f"- Estimated context: {metadata['estimated_context']:,} tokens\n" + f"- Difficulty: {metadata['difficulty']}\n" + f"- Assigned agent: {metadata['assigned_agent']}\n\n" + f"Starting work..." + ) + + # Trigger orchestration + await coordinator.process_queue() +``` + +**Security:** + +```python +def verify_signature(payload: dict, signature: str) -> bool: + """Verify Gitea webhook signature.""" + import hmac + import hashlib + + secret = os.getenv('GITEA_WEBHOOK_SECRET').encode() + computed = hmac.new( + secret, + json.dumps(payload).encode(), + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(computed, signature) +``` + +### 2. Issue Parser Agent + +**Purpose:** Extract structured metadata from issue markdown body + +**Agent type:** Sonnet (cheap, good at parsing) + +**Input:** Issue markdown body + +```markdown +## Objective + +Implement context estimation formula... + +## Context Estimate + +- Files to modify: 3 +- Implementation complexity: medium (20000 tokens) +- Test requirements: medium (10000 tokens) +- Documentation: light (2000 tokens) +- **Total estimated: 46800 tokens** +- **Recommended agent: glm** + +## Difficulty + +medium + +## Dependencies + +- Blocked by: None +- Blocks: #155 (COORD-002) +``` + +**Output:** Structured JSON + +```json +{ + "estimated_context": 46800, + "difficulty": "medium", + "assigned_agent": "glm", + "blocks": ["155"], + "blocked_by": [] +} +``` + +**Implementation:** + +```python +async def parse_issue_metadata(issue: dict) -> dict: + """Parse issue body with AI agent to extract metadata.""" + + client = anthropic.Anthropic() + + response = await client.messages.create( + model="claude-sonnet-4-5", + max_tokens=1000, + messages=[{ + "role": "user", + "content": f"""Parse this GitHub/Gitea issue and extract metadata. + +Issue Title: {issue['title']} +Issue Body: +{issue['body']} + +Extract and return ONLY a JSON object with: +{{ + "estimated_context": , + "difficulty": "low" | "medium" | "high", + "assigned_agent": "opus" | "sonnet" | "glm" | "haiku" | "minimax", + "blocks": [], + "blocked_by": [] +}} + +If any field is missing, use reasonable defaults: +- estimated_context: 50000 +- difficulty: "medium" +- assigned_agent: "sonnet" +- blocks: [] +- blocked_by: [] +""" + }] + ) + + # Parse JSON from response + metadata = json.loads(response.content[0].text) + + return metadata +``` + +### 3. Queue Manager + +**Purpose:** Track work queue, dependencies, status + +**Data structure:** + +```python +@dataclass +class QueueItem: + issue_number: int + title: str + estimated_context: int + difficulty: str + assigned_agent: str + blocks: List[int] + blocked_by: List[int] + status: str # 'queued', 'ready', 'in_progress', 'completed', 'failed' + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + agent_id: Optional[str] = None + + +class QueueManager: + def __init__(self): + self.queue: List[QueueItem] = [] + + def enqueue(self, issue_number: int, title: str, metadata: dict): + """Add issue to queue.""" + item = QueueItem( + issue_number=issue_number, + title=title, + estimated_context=metadata['estimated_context'], + difficulty=metadata['difficulty'], + assigned_agent=metadata['assigned_agent'], + blocks=metadata['blocks'], + blocked_by=metadata['blocked_by'], + status='queued' + ) + + self.queue.append(item) + self._update_ready_status() + + def dequeue(self, issue_number: int): + """Remove issue from queue (user unassigned).""" + self.queue = [q for q in self.queue if q.issue_number != issue_number] + + def get_next_ready(self) -> Optional[QueueItem]: + """Get next issue with no blockers.""" + for item in self.queue: + if item.status == 'ready': + return item + return None + + def mark_complete(self, issue_number: int): + """Mark issue complete, unblock dependents.""" + for item in self.queue: + if item.issue_number == issue_number: + item.status = 'completed' + item.completed_at = datetime.now() + + self._update_ready_status() + + def _update_ready_status(self): + """Update which issues are ready (no blockers).""" + for item in self.queue: + if item.status != 'queued': + continue + + # Check if all blockers are complete + blockers_complete = all( + self._is_complete(blocker) + for blocker in item.blocked_by + ) + + if blockers_complete: + item.status = 'ready' + + def _is_complete(self, issue_number: int) -> bool: + """Check if issue is complete.""" + for item in self.queue: + if item.issue_number == issue_number: + return item.status == 'completed' + return True # Not in queue = already complete +``` + +### 4. Orchestration Loop + +**Purpose:** Process queue, spawn agents, monitor progress + +```python +class Coordinator: + def __init__(self, queue_manager: QueueManager): + self.queue = queue_manager + self.current_agent: Optional[Agent] = None + + async def process_queue(self): + """Main orchestration loop.""" + + while True: + # Get next ready issue + item = self.queue.get_next_ready() + + if not item: + logger.info("No ready issues in queue") + break + + # Mark as in progress + item.status = 'in_progress' + item.started_at = datetime.now() + + # Comment on issue + await gitea_client.comment_on_issue( + item.issue_number, + f"🔄 Starting work with {item.assigned_agent} agent..." + ) + + try: + # Spawn agent + agent = await self.spawn_agent(item) + self.current_agent = agent + + # Monitor until complete + await self.monitor_agent(agent, item) + + # Mark complete + self.queue.mark_complete(item.issue_number) + + # Comment success + await gitea_client.comment_on_issue( + item.issue_number, + f"✅ Work complete! All quality gates passed.\n\n" + f"Closing issue." + ) + + # Close issue + await gitea_client.close_issue(item.issue_number) + + except Exception as e: + logger.error(f"Agent failed: {e}") + item.status = 'failed' + + await gitea_client.comment_on_issue( + item.issue_number, + f"❌ Work failed: {e}\n\n" + f"Unassigning from coordinator. Please review." + ) + + # Unassign from coordinator + await gitea_client.unassign_issue( + item.issue_number, + COORDINATOR_USERNAME + ) + + async def spawn_agent(self, item: QueueItem) -> Agent: + """Spawn AI agent for issue.""" + + # Build agent instructions + instructions = f""" +Complete issue #{item.issue_number}: {item.title} + +Follow quality-rails: +- All code must pass: build, lint, test, coverage (85% min) +- Use TDD workflow (test first) +- Follow project CLAUDE.md guidelines + +When complete, claim "done" and the coordinator will verify. +""" + + # Spawn agent via API + agent = Agent( + type=item.assigned_agent, + instructions=instructions, + issue_number=item.issue_number + ) + + await agent.start() + + return agent + + async def monitor_agent(self, agent: Agent, item: QueueItem): + """Monitor agent progress, context usage, quality gates.""" + + while not agent.is_complete(): + # Check context usage + context = await agent.get_context_usage() + + if context > 0.95: + # Rotate session + logger.info(f"Agent {agent.id} at 95% context, rotating") + new_agent = await self.rotate_session(agent, item) + agent = new_agent + + elif context > 0.80: + # Compact session + logger.info(f"Agent {agent.id} at 80% context, compacting") + await agent.compact() + + # Comment progress periodically + if time.time() % 300 == 0: # Every 5 minutes + await gitea_client.comment_on_issue( + item.issue_number, + f"🔄 Progress update:\n" + f"- Context usage: {context:.0%}\n" + f"- Status: Working..." + ) + + await asyncio.sleep(10) # Check every 10 seconds + + # Agent claimed completion - verify with quality gates + if not await self.verify_quality_gates(item): + # Gates failed - force continuation + continuation = self.generate_continuation_prompt(item) + await agent.send_message(continuation) + + # Continue monitoring + await self.monitor_agent(agent, item) +``` + +--- + +## User Control Mechanisms + +### Start Work + +**Assign to `@mosaic`** + +- Single issue: Gitea UI → Issue → Assign to @mosaic +- Bulk: Gitea UI → Milestone → Bulk assign to @mosaic +- Result: Coordinator adds to queue, starts work + +### Pause Work + +**Unassign from `@mosaic`** + +- Gitea UI → Issue → Unassign from @mosaic +- Result: Coordinator removes from queue, stops agent + +### Cancel Work + +**Close issue** + +- Gitea UI → Issue → Close +- Result: Coordinator stops agent, marks complete + +### Prioritize + +**Assignment order** + +- Assign high-priority issues first +- Coordinator processes in assignment order (respecting dependencies) + +### Check Status + +**View comments** + +- Gitea UI → Issue → Comments from @mosaic +- Shows: Queue add, work start, progress updates, completion + +--- + +## Visual Feedback + +**User can see in Gitea:** + +``` +M4.1-Coordinator (0.0.4) - 13 issues + +✅ #154 [COORD-001] Implement context estimator + Assigned: @mosaic + Status: ✅ Complete (closed 2 hours ago) + +🔄 #155 [COORD-002] Build basic context monitor + Assigned: @mosaic + Status: 🔄 In Progress (context: 45%) + Comments: + 🤖 mosaic: Added to queue... (3 hours ago) + 🤖 mosaic: Starting work with glm agent... (2 hours ago) + 🤖 mosaic: Progress update: 45% context... (5 min ago) + +⏸️ #143 [COORD-003] Validate 50% rule + Assigned: @mosaic + Status: ⏸️ Queued (blocked by #155) + Comments: + 🤖 mosaic: Added to queue. Waiting for dependencies... + +⚪ #144 [COORD-004] Implement agent profiles + Unassigned + Status: ⚪ Not started +``` + +--- + +## Configuration + +```yaml +# coordinator.yml + +gitea: + url: https://git.mosaicstack.dev + api_token: ${GITEA_API_TOKEN} + webhook_secret: ${GITEA_WEBHOOK_SECRET} + +coordinator: + username: mosaic + email: mosaic@mosaicstack.dev + +server: + host: 0.0.0.0 + port: 8080 + public_url: https://coordinator.example.com + +agents: + opus: + context_limit: 200000 + cost_per_mtok: 15.00 + sonnet: + context_limit: 200000 + cost_per_mtok: 3.00 + glm: + context_limit: 128000 + cost_per_mtok: 0.00 # Self-hosted + haiku: + context_limit: 200000 + cost_per_mtok: 0.80 + minimax: + context_limit: 128000 + cost_per_mtok: 0.00 # Self-hosted + +behavior: + comment_on_queue_add: true + comment_on_start: true + comment_frequency: 300 # seconds + close_issue_on_complete: true + unassign_on_failure: true + +quality_gates: + enabled: true + gates: [build, lint, test, coverage] + coverage_minimum: 85 +``` + +--- + +## Deployment + +### Docker Compose + +```yaml +# docker-compose.yml (add to existing) + +services: + coordinator: + build: ./coordinator + ports: + - "8080:8080" + environment: + - GITEA_URL=https://git.mosaicstack.dev + - GITEA_API_TOKEN=${GITEA_API_TOKEN} + - GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET} + - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} + volumes: + - ./coordinator.yml:/app/coordinator.yml + - coordinator-data:/app/data + restart: unless-stopped + +volumes: + coordinator-data: +``` + +### Systemd Service + +```ini +[Unit] +Description=Mosaic Stack Coordinator +After=network.target + +[Service] +Type=simple +User=mosaic +WorkingDirectory=/opt/coordinator +ExecStart=/opt/coordinator/venv/bin/python main.py +Restart=always +RestartSec=10 + +Environment="GITEA_URL=https://git.mosaicstack.dev" +Environment="GITEA_API_TOKEN=..." +Environment="GITEA_WEBHOOK_SECRET=..." +Environment="ANTHROPIC_API_KEY=..." + +[Install] +WantedBy=multi-user.target +``` + +--- + +## Error Handling + +### Webhook Signature Invalid + +```python +# Return 401, log security alert +logger.security(f"Invalid webhook signature from {request.client.host}") +return Response(status_code=401) +``` + +### Issue Parse Failure + +```python +# Comment on issue, use defaults +await gitea_client.comment_on_issue( + issue_number, + "⚠️ Could not parse issue metadata. Using defaults:\n" + "- estimated_context: 50000\n" + "- difficulty: medium\n" + "- assigned_agent: sonnet" +) +``` + +### Agent Failure + +```python +# Comment on issue, unassign from coordinator +await gitea_client.comment_on_issue( + issue_number, + f"❌ Agent failed: {error}\n\n" + f"Unassigning from coordinator. Please review and reassign." +) +await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME) +``` + +### Quality Gates Failure (Repeated) + +```python +# After 3 attempts, give up +if attempt_count > 3: + await gitea_client.comment_on_issue( + issue_number, + "❌ Quality gates failed after 3 attempts.\n\n" + "Unassigning from coordinator. Manual intervention needed." + ) + await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME) +``` + +--- + +## Success Criteria + +### Phase 0 Complete When: + +- ✅ Coordinator user created in Gitea +- ✅ Webhook registered and verified +- ✅ Webhook receiver handles assignment events +- ✅ Issue parser extracts metadata correctly +- ✅ Queue manager tracks dependencies +- ✅ Test: Assign issue → Coordinator adds to queue → Comments on issue + +### Integration Test: + +```bash +# 1. Create test issue with metadata +# 2. Assign to @mosaic +# 3. Verify webhook fires +# 4. Verify comment added: "Added to queue..." +# 5. Verify queue contains issue +# 6. Success! +``` + +--- + +## Future Enhancements + +### Bidirectional Status Updates + +- Coordinator comments progress every 5 minutes +- Coordinator updates issue labels (in-progress, blocked, etc.) +- Coordinator creates checkboxes for acceptance criteria, checks them off + +### Multi-Repo Support + +- Register multiple repos +- Single coordinator handles all +- Cross-repo dependencies + +### Priority Queuing + +- Respect issue priority labels (p0 > p1 > p2 > p3) +- Allow user to bump priority via label change + +### Cost Tracking + +- Track token usage per issue +- Comment cost estimate before starting +- Comment actual cost after completion + +--- + +## Conclusion + +Assignment-based triggering provides: + +- ✅ **Natural workflow** - Users assign issues like normal +- ✅ **Visual feedback** - See coordinator status in Gitea UI +- ✅ **Granular control** - Start/stop/pause per issue +- ✅ **Event-driven** - No polling, immediate response +- ✅ **Standard integration** - Uses Gitea's native webhooks + +This is the foundation that enables autonomous orchestration. + +--- + +**Document Version:** 1.0 +**Created:** 2026-01-31 +**Authors:** Jason Woltje + Claude Opus 4.5 +**Status:** Proposed - Ready for Phase 0 implementation