# Assignment-Based Trigger Architecture **Status:** Proposed (Phase 0 - Foundation) **Related Issues:** #142 (Epic), Phase 0 issues **Priority:** Critical (P0) - Required before orchestration can function **Milestone:** M4.1-Coordinator (0.0.4) --- ## Executive Summary The coordinator uses **issue assignment** as the trigger mechanism. When a user assigns an issue to the special `@mosaic` user in Gitea, a webhook fires, the coordinator parses the issue metadata, adds it to the queue, and begins work. **Key insight:** Assignment is the perfect trigger because it's: - ✅ Natural Gitea workflow (users already assign issues) - ✅ Visual feedback (can see what coordinator is working on) - ✅ Webhook-friendly (Gitea sends events on assignment) - ✅ Granular control (assign what you want, when you want) - ✅ No CLI needed (just use Gitea UI) --- ## Architecture Overview ``` ┌─────────────────────────────────────────────────────────────┐ │ GITEA REPOSITORY │ │ │ │ User assigns issue to @mosaic │ │ │ │ │ └──> Webhook: issue.assigned │ └─────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ COORDINATOR WEBHOOK RECEIVER │ │ │ │ 1. Verify webhook signature │ │ 2. Check assignee == "mosaic" │ │ 3. Launch AI agent to parse issue │ │ 4. Add to queue │ │ 5. Comment on issue: "Added to queue..." │ └─────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ ISSUE PARSER AGENT │ │ (AI - Sonnet/GLM) │ │ │ │ Input: Issue markdown body │ │ Output: Structured metadata │ │ - estimated_context (tokens) │ │ - difficulty (low/medium/high) │ │ - assigned_agent (opus/sonnet/glm) │ │ - dependencies (blocked_by) │ └─────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ COORDINATOR QUEUE │ │ │ │ [Issue #154] COORD-001 (46.8K, medium, glm) → Ready │ │ [Issue #155] COORD-002 (49.4K, medium, glm) → Blocked │ │ [Issue #143] COORD-003 (40.3K, low, glm) → Blocked │ └─────────────────────┬───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ COORDINATOR ORCHESTRATION LOOP │ │ │ │ 1. Get next ready issue from queue │ │ 2. Spawn agent (type from metadata) │ │ 3. Monitor context usage │ │ 4. Run quality gates on completion │ │ 5. Comment progress to Gitea │ │ 6. Mark complete, move to next │ └─────────────────────────────────────────────────────────────┘ ``` --- ## User Workflow ### Initial Setup (One Time) **1. Create coordinator user in Gitea:** ``` Gitea Admin → Users → Create User Username: mosaic Email: mosaic@mosaicstack.dev Type: Bot account ``` **2. Register webhook in repo:** ``` Gitea Repo → Settings → Webhooks → Add Webhook URL: https://coordinator.example.com/webhook/gitea Secret: [shared secret] Events: Issues (assigned, unassigned, closed, reopened) ``` **3. Start coordinator service:** ```bash docker compose up -d coordinator # or python coordinator/main.py --config coordinator.yml ``` ### Daily Usage **Start work on issues:** ``` Gitea UI → Issue #154 → Assign to @mosaic ``` **Start entire milestone:** ``` Gitea UI → Milestone M4.1-Coordinator → Bulk assign to @mosaic ``` **Pause work:** ``` Gitea UI → Issue #154 → Unassign from @mosaic ``` **Check status:** ``` Gitea UI → Issue #154 → View comments from @mosaic ``` --- ## Component Details ### 1. Webhook Receiver **Technology:** FastAPI (Python) **Endpoints:** ```python @app.post('/webhook/gitea') async def handle_gitea_webhook( request: Request, x_gitea_signature: str = Header(...) ): """Handle Gitea webhook events.""" # 1. Verify signature payload = await request.json() if not verify_signature(payload, x_gitea_signature): raise HTTPException(401, "Invalid signature") # 2. Route by action if payload['action'] == 'assigned': await handle_issue_assigned(payload) elif payload['action'] == 'unassigned': await handle_issue_unassigned(payload) elif payload['action'] == 'closed': await handle_issue_closed(payload) return {"status": "ok"} async def handle_issue_assigned(payload: dict): """User assigned issue to coordinator.""" # Check if assigned to coordinator user if payload['assignee']['username'] != COORDINATOR_USERNAME: return # Not for us issue = payload['issue'] # Launch parser agent metadata = await parse_issue_metadata(issue) # Add to queue queue_manager.enqueue( issue_number=issue['number'], title=issue['title'], metadata=metadata ) # Comment on issue await gitea_client.comment_on_issue( issue['number'], f"🤖 Added to coordinator queue.\n\n" f"**Metadata:**\n" f"- Estimated context: {metadata['estimated_context']:,} tokens\n" f"- Difficulty: {metadata['difficulty']}\n" f"- Assigned agent: {metadata['assigned_agent']}\n\n" f"Starting work..." ) # Trigger orchestration await coordinator.process_queue() ``` **Security:** ```python def verify_signature(payload: dict, signature: str) -> bool: """Verify Gitea webhook signature.""" import hmac import hashlib secret = os.getenv('GITEA_WEBHOOK_SECRET').encode() computed = hmac.new( secret, json.dumps(payload).encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest(computed, signature) ``` ### 2. Issue Parser Agent **Purpose:** Extract structured metadata from issue markdown body **Agent type:** Sonnet (cheap, good at parsing) **Input:** Issue markdown body ```markdown ## Objective Implement context estimation formula... ## Context Estimate - Files to modify: 3 - Implementation complexity: medium (20000 tokens) - Test requirements: medium (10000 tokens) - Documentation: light (2000 tokens) - **Total estimated: 46800 tokens** - **Recommended agent: glm** ## Difficulty medium ## Dependencies - Blocked by: None - Blocks: #155 (COORD-002) ``` **Output:** Structured JSON ```json { "estimated_context": 46800, "difficulty": "medium", "assigned_agent": "glm", "blocks": ["155"], "blocked_by": [] } ``` **Implementation:** ```python async def parse_issue_metadata(issue: dict) -> dict: """Parse issue body with AI agent to extract metadata.""" client = anthropic.Anthropic() response = await client.messages.create( model="claude-sonnet-4-5", max_tokens=1000, messages=[{ "role": "user", "content": f"""Parse this GitHub/Gitea issue and extract metadata. Issue Title: {issue['title']} Issue Body: {issue['body']} Extract and return ONLY a JSON object with: {{ "estimated_context": , "difficulty": "low" | "medium" | "high", "assigned_agent": "opus" | "sonnet" | "glm" | "haiku" | "minimax", "blocks": [], "blocked_by": [] }} If any field is missing, use reasonable defaults: - estimated_context: 50000 - difficulty: "medium" - assigned_agent: "sonnet" - blocks: [] - blocked_by: [] """ }] ) # Parse JSON from response metadata = json.loads(response.content[0].text) return metadata ``` ### 3. Queue Manager **Purpose:** Track work queue, dependencies, status **Data structure:** ```python @dataclass class QueueItem: issue_number: int title: str estimated_context: int difficulty: str assigned_agent: str blocks: List[int] blocked_by: List[int] status: str # 'queued', 'ready', 'in_progress', 'completed', 'failed' started_at: Optional[datetime] = None completed_at: Optional[datetime] = None agent_id: Optional[str] = None class QueueManager: def __init__(self): self.queue: List[QueueItem] = [] def enqueue(self, issue_number: int, title: str, metadata: dict): """Add issue to queue.""" item = QueueItem( issue_number=issue_number, title=title, estimated_context=metadata['estimated_context'], difficulty=metadata['difficulty'], assigned_agent=metadata['assigned_agent'], blocks=metadata['blocks'], blocked_by=metadata['blocked_by'], status='queued' ) self.queue.append(item) self._update_ready_status() def dequeue(self, issue_number: int): """Remove issue from queue (user unassigned).""" self.queue = [q for q in self.queue if q.issue_number != issue_number] def get_next_ready(self) -> Optional[QueueItem]: """Get next issue with no blockers.""" for item in self.queue: if item.status == 'ready': return item return None def mark_complete(self, issue_number: int): """Mark issue complete, unblock dependents.""" for item in self.queue: if item.issue_number == issue_number: item.status = 'completed' item.completed_at = datetime.now() self._update_ready_status() def _update_ready_status(self): """Update which issues are ready (no blockers).""" for item in self.queue: if item.status != 'queued': continue # Check if all blockers are complete blockers_complete = all( self._is_complete(blocker) for blocker in item.blocked_by ) if blockers_complete: item.status = 'ready' def _is_complete(self, issue_number: int) -> bool: """Check if issue is complete.""" for item in self.queue: if item.issue_number == issue_number: return item.status == 'completed' return True # Not in queue = already complete ``` ### 4. Orchestration Loop **Purpose:** Process queue, spawn agents, monitor progress ```python class Coordinator: def __init__(self, queue_manager: QueueManager): self.queue = queue_manager self.current_agent: Optional[Agent] = None async def process_queue(self): """Main orchestration loop.""" while True: # Get next ready issue item = self.queue.get_next_ready() if not item: logger.info("No ready issues in queue") break # Mark as in progress item.status = 'in_progress' item.started_at = datetime.now() # Comment on issue await gitea_client.comment_on_issue( item.issue_number, f"🔄 Starting work with {item.assigned_agent} agent..." ) try: # Spawn agent agent = await self.spawn_agent(item) self.current_agent = agent # Monitor until complete await self.monitor_agent(agent, item) # Mark complete self.queue.mark_complete(item.issue_number) # Comment success await gitea_client.comment_on_issue( item.issue_number, f"✅ Work complete! All quality gates passed.\n\n" f"Closing issue." ) # Close issue await gitea_client.close_issue(item.issue_number) except Exception as e: logger.error(f"Agent failed: {e}") item.status = 'failed' await gitea_client.comment_on_issue( item.issue_number, f"❌ Work failed: {e}\n\n" f"Unassigning from coordinator. Please review." ) # Unassign from coordinator await gitea_client.unassign_issue( item.issue_number, COORDINATOR_USERNAME ) async def spawn_agent(self, item: QueueItem) -> Agent: """Spawn AI agent for issue.""" # Build agent instructions instructions = f""" Complete issue #{item.issue_number}: {item.title} Follow quality-rails: - All code must pass: build, lint, test, coverage (85% min) - Use TDD workflow (test first) - Follow project CLAUDE.md guidelines When complete, claim "done" and the coordinator will verify. """ # Spawn agent via API agent = Agent( type=item.assigned_agent, instructions=instructions, issue_number=item.issue_number ) await agent.start() return agent async def monitor_agent(self, agent: Agent, item: QueueItem): """Monitor agent progress, context usage, quality gates.""" while not agent.is_complete(): # Check context usage context = await agent.get_context_usage() if context > 0.95: # Rotate session logger.info(f"Agent {agent.id} at 95% context, rotating") new_agent = await self.rotate_session(agent, item) agent = new_agent elif context > 0.80: # Compact session logger.info(f"Agent {agent.id} at 80% context, compacting") await agent.compact() # Comment progress periodically if time.time() % 300 == 0: # Every 5 minutes await gitea_client.comment_on_issue( item.issue_number, f"🔄 Progress update:\n" f"- Context usage: {context:.0%}\n" f"- Status: Working..." ) await asyncio.sleep(10) # Check every 10 seconds # Agent claimed completion - verify with quality gates if not await self.verify_quality_gates(item): # Gates failed - force continuation continuation = self.generate_continuation_prompt(item) await agent.send_message(continuation) # Continue monitoring await self.monitor_agent(agent, item) ``` --- ## User Control Mechanisms ### Start Work **Assign to `@mosaic`** - Single issue: Gitea UI → Issue → Assign to @mosaic - Bulk: Gitea UI → Milestone → Bulk assign to @mosaic - Result: Coordinator adds to queue, starts work ### Pause Work **Unassign from `@mosaic`** - Gitea UI → Issue → Unassign from @mosaic - Result: Coordinator removes from queue, stops agent ### Cancel Work **Close issue** - Gitea UI → Issue → Close - Result: Coordinator stops agent, marks complete ### Prioritize **Assignment order** - Assign high-priority issues first - Coordinator processes in assignment order (respecting dependencies) ### Check Status **View comments** - Gitea UI → Issue → Comments from @mosaic - Shows: Queue add, work start, progress updates, completion --- ## Visual Feedback **User can see in Gitea:** ``` M4.1-Coordinator (0.0.4) - 13 issues ✅ #154 [COORD-001] Implement context estimator Assigned: @mosaic Status: ✅ Complete (closed 2 hours ago) 🔄 #155 [COORD-002] Build basic context monitor Assigned: @mosaic Status: 🔄 In Progress (context: 45%) Comments: 🤖 mosaic: Added to queue... (3 hours ago) 🤖 mosaic: Starting work with glm agent... (2 hours ago) 🤖 mosaic: Progress update: 45% context... (5 min ago) ⏸️ #143 [COORD-003] Validate 50% rule Assigned: @mosaic Status: ⏸️ Queued (blocked by #155) Comments: 🤖 mosaic: Added to queue. Waiting for dependencies... ⚪ #144 [COORD-004] Implement agent profiles Unassigned Status: ⚪ Not started ``` --- ## Configuration ```yaml # coordinator.yml gitea: url: https://git.mosaicstack.dev api_token: ${GITEA_API_TOKEN} webhook_secret: ${GITEA_WEBHOOK_SECRET} coordinator: username: mosaic email: mosaic@mosaicstack.dev server: host: 0.0.0.0 port: 8080 public_url: https://coordinator.example.com agents: opus: context_limit: 200000 cost_per_mtok: 15.00 sonnet: context_limit: 200000 cost_per_mtok: 3.00 glm: context_limit: 128000 cost_per_mtok: 0.00 # Self-hosted haiku: context_limit: 200000 cost_per_mtok: 0.80 minimax: context_limit: 128000 cost_per_mtok: 0.00 # Self-hosted behavior: comment_on_queue_add: true comment_on_start: true comment_frequency: 300 # seconds close_issue_on_complete: true unassign_on_failure: true quality_gates: enabled: true gates: [build, lint, test, coverage] coverage_minimum: 85 ``` --- ## Deployment ### Docker Compose ```yaml # docker-compose.yml (add to existing) services: coordinator: build: ./coordinator ports: - "8080:8080" environment: - GITEA_URL=https://git.mosaicstack.dev - GITEA_API_TOKEN=${GITEA_API_TOKEN} - GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} volumes: - ./coordinator.yml:/app/coordinator.yml - coordinator-data:/app/data restart: unless-stopped volumes: coordinator-data: ``` ### Systemd Service ```ini [Unit] Description=Mosaic Stack Coordinator After=network.target [Service] Type=simple User=mosaic WorkingDirectory=/opt/coordinator ExecStart=/opt/coordinator/venv/bin/python main.py Restart=always RestartSec=10 Environment="GITEA_URL=https://git.mosaicstack.dev" Environment="GITEA_API_TOKEN=..." Environment="GITEA_WEBHOOK_SECRET=..." Environment="ANTHROPIC_API_KEY=..." [Install] WantedBy=multi-user.target ``` --- ## Error Handling ### Webhook Signature Invalid ```python # Return 401, log security alert logger.security(f"Invalid webhook signature from {request.client.host}") return Response(status_code=401) ``` ### Issue Parse Failure ```python # Comment on issue, use defaults await gitea_client.comment_on_issue( issue_number, "⚠️ Could not parse issue metadata. Using defaults:\n" "- estimated_context: 50000\n" "- difficulty: medium\n" "- assigned_agent: sonnet" ) ``` ### Agent Failure ```python # Comment on issue, unassign from coordinator await gitea_client.comment_on_issue( issue_number, f"❌ Agent failed: {error}\n\n" f"Unassigning from coordinator. Please review and reassign." ) await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME) ``` ### Quality Gates Failure (Repeated) ```python # After 3 attempts, give up if attempt_count > 3: await gitea_client.comment_on_issue( issue_number, "❌ Quality gates failed after 3 attempts.\n\n" "Unassigning from coordinator. Manual intervention needed." ) await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME) ``` --- ## Success Criteria ### Phase 0 Complete When: - ✅ Coordinator user created in Gitea - ✅ Webhook registered and verified - ✅ Webhook receiver handles assignment events - ✅ Issue parser extracts metadata correctly - ✅ Queue manager tracks dependencies - ✅ Test: Assign issue → Coordinator adds to queue → Comments on issue ### Integration Test: ```bash # 1. Create test issue with metadata # 2. Assign to @mosaic # 3. Verify webhook fires # 4. Verify comment added: "Added to queue..." # 5. Verify queue contains issue # 6. Success! ``` --- ## Future Enhancements ### Bidirectional Status Updates - Coordinator comments progress every 5 minutes - Coordinator updates issue labels (in-progress, blocked, etc.) - Coordinator creates checkboxes for acceptance criteria, checks them off ### Multi-Repo Support - Register multiple repos - Single coordinator handles all - Cross-repo dependencies ### Priority Queuing - Respect issue priority labels (p0 > p1 > p2 > p3) - Allow user to bump priority via label change ### Cost Tracking - Track token usage per issue - Comment cost estimate before starting - Comment actual cost after completion --- ## Conclusion Assignment-based triggering provides: - ✅ **Natural workflow** - Users assign issues like normal - ✅ **Visual feedback** - See coordinator status in Gitea UI - ✅ **Granular control** - Start/stop/pause per issue - ✅ **Event-driven** - No polling, immediate response - ✅ **Standard integration** - Uses Gitea's native webhooks This is the foundation that enables autonomous orchestration. --- **Document Version:** 1.0 **Created:** 2026-01-31 **Authors:** Jason Woltje + Claude Opus 4.5 **Status:** Proposed - Ready for Phase 0 implementation