Files
stack/docs/3-architecture/assignment-based-trigger-architecture.md
Jason Woltje 3be60ccd18 docs: Add assignment-based trigger architecture
Implements Phase 0 foundation for non-AI coordinator.

Key features:
- User assigns issue to @mosaic bot user → triggers webhook
- Webhook receiver processes assignment events
- AI agent parses issue metadata (context, difficulty, agent)
- Queue manager tracks dependencies and status
- Orchestration loop spawns agents and monitors progress

Benefits:
- Natural Gitea workflow (just assign issues)
- Visual feedback in Gitea UI
- Granular control (assign what you want)
- Event-driven (webhooks, not polling)
- No CLI needed

Phase 0 issues: #156-161 (6 issues, 290.6K tokens)

Refs #142

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 15:30:21 -06:00

827 lines
23 KiB
Markdown

# Assignment-Based Trigger Architecture
**Status:** Proposed (Phase 0 - Foundation)
**Related Issues:** #142 (Epic), Phase 0 issues
**Priority:** Critical (P0) - Required before orchestration can function
**Milestone:** M4.1-Coordinator (0.0.4)
---
## Executive Summary
The coordinator uses **issue assignment** as the trigger mechanism. When a user assigns an issue to the special `@mosaic` user in Gitea, a webhook fires, the coordinator parses the issue metadata, adds it to the queue, and begins work.
**Key insight:** Assignment is the perfect trigger because it's:
- ✅ Natural Gitea workflow (users already assign issues)
- ✅ Visual feedback (can see what coordinator is working on)
- ✅ Webhook-friendly (Gitea sends events on assignment)
- ✅ Granular control (assign what you want, when you want)
- ✅ No CLI needed (just use Gitea UI)
---
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────┐
│ GITEA REPOSITORY │
│ │
│ User assigns issue to @mosaic │
│ │ │
│ └──> Webhook: issue.assigned │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR WEBHOOK RECEIVER │
│ │
│ 1. Verify webhook signature │
│ 2. Check assignee == "mosaic" │
│ 3. Launch AI agent to parse issue │
│ 4. Add to queue │
│ 5. Comment on issue: "Added to queue..." │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ISSUE PARSER AGENT │
│ (AI - Sonnet/GLM) │
│ │
│ Input: Issue markdown body │
│ Output: Structured metadata │
│ - estimated_context (tokens) │
│ - difficulty (low/medium/high) │
│ - assigned_agent (opus/sonnet/glm) │
│ - dependencies (blocked_by) │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR QUEUE │
│ │
│ [Issue #154] COORD-001 (46.8K, medium, glm) → Ready │
│ [Issue #155] COORD-002 (49.4K, medium, glm) → Blocked │
│ [Issue #143] COORD-003 (40.3K, low, glm) → Blocked │
└─────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR ORCHESTRATION LOOP │
│ │
│ 1. Get next ready issue from queue │
│ 2. Spawn agent (type from metadata) │
│ 3. Monitor context usage │
│ 4. Run quality gates on completion │
│ 5. Comment progress to Gitea │
│ 6. Mark complete, move to next │
└─────────────────────────────────────────────────────────────┘
```
---
## User Workflow
### Initial Setup (One Time)
**1. Create coordinator user in Gitea:**
```
Gitea Admin → Users → Create User
Username: mosaic
Email: mosaic@mosaicstack.dev
Type: Bot account
```
**2. Register webhook in repo:**
```
Gitea Repo → Settings → Webhooks → Add Webhook
URL: https://coordinator.example.com/webhook/gitea
Secret: [shared secret]
Events: Issues (assigned, unassigned, closed, reopened)
```
**3. Start coordinator service:**
```bash
docker compose up -d coordinator
# or
python coordinator/main.py --config coordinator.yml
```
### Daily Usage
**Start work on issues:**
```
Gitea UI → Issue #154 → Assign to @mosaic
```
**Start entire milestone:**
```
Gitea UI → Milestone M4.1-Coordinator → Bulk assign to @mosaic
```
**Pause work:**
```
Gitea UI → Issue #154 → Unassign from @mosaic
```
**Check status:**
```
Gitea UI → Issue #154 → View comments from @mosaic
```
---
## Component Details
### 1. Webhook Receiver
**Technology:** FastAPI (Python)
**Endpoints:**
```python
@app.post('/webhook/gitea')
async def handle_gitea_webhook(
request: Request,
x_gitea_signature: str = Header(...)
):
"""Handle Gitea webhook events."""
# 1. Verify signature
payload = await request.json()
if not verify_signature(payload, x_gitea_signature):
raise HTTPException(401, "Invalid signature")
# 2. Route by action
if payload['action'] == 'assigned':
await handle_issue_assigned(payload)
elif payload['action'] == 'unassigned':
await handle_issue_unassigned(payload)
elif payload['action'] == 'closed':
await handle_issue_closed(payload)
return {"status": "ok"}
async def handle_issue_assigned(payload: dict):
"""User assigned issue to coordinator."""
# Check if assigned to coordinator user
if payload['assignee']['username'] != COORDINATOR_USERNAME:
return # Not for us
issue = payload['issue']
# Launch parser agent
metadata = await parse_issue_metadata(issue)
# Add to queue
queue_manager.enqueue(
issue_number=issue['number'],
title=issue['title'],
metadata=metadata
)
# Comment on issue
await gitea_client.comment_on_issue(
issue['number'],
f"🤖 Added to coordinator queue.\n\n"
f"**Metadata:**\n"
f"- Estimated context: {metadata['estimated_context']:,} tokens\n"
f"- Difficulty: {metadata['difficulty']}\n"
f"- Assigned agent: {metadata['assigned_agent']}\n\n"
f"Starting work..."
)
# Trigger orchestration
await coordinator.process_queue()
```
**Security:**
```python
def verify_signature(payload: dict, signature: str) -> bool:
"""Verify Gitea webhook signature."""
import hmac
import hashlib
secret = os.getenv('GITEA_WEBHOOK_SECRET').encode()
computed = hmac.new(
secret,
json.dumps(payload).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed, signature)
```
### 2. Issue Parser Agent
**Purpose:** Extract structured metadata from issue markdown body
**Agent type:** Sonnet (cheap, good at parsing)
**Input:** Issue markdown body
```markdown
## Objective
Implement context estimation formula...
## Context Estimate
- Files to modify: 3
- Implementation complexity: medium (20000 tokens)
- Test requirements: medium (10000 tokens)
- Documentation: light (2000 tokens)
- **Total estimated: 46800 tokens**
- **Recommended agent: glm**
## Difficulty
medium
## Dependencies
- Blocked by: None
- Blocks: #155 (COORD-002)
```
**Output:** Structured JSON
```json
{
"estimated_context": 46800,
"difficulty": "medium",
"assigned_agent": "glm",
"blocks": ["155"],
"blocked_by": []
}
```
**Implementation:**
```python
async def parse_issue_metadata(issue: dict) -> dict:
"""Parse issue body with AI agent to extract metadata."""
client = anthropic.Anthropic()
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"""Parse this GitHub/Gitea issue and extract metadata.
Issue Title: {issue['title']}
Issue Body:
{issue['body']}
Extract and return ONLY a JSON object with:
{{
"estimated_context": <integer tokens>,
"difficulty": "low" | "medium" | "high",
"assigned_agent": "opus" | "sonnet" | "glm" | "haiku" | "minimax",
"blocks": [<array of issue numbers>],
"blocked_by": [<array of issue numbers>]
}}
If any field is missing, use reasonable defaults:
- estimated_context: 50000
- difficulty: "medium"
- assigned_agent: "sonnet"
- blocks: []
- blocked_by: []
"""
}]
)
# Parse JSON from response
metadata = json.loads(response.content[0].text)
return metadata
```
### 3. Queue Manager
**Purpose:** Track work queue, dependencies, status
**Data structure:**
```python
@dataclass
class QueueItem:
issue_number: int
title: str
estimated_context: int
difficulty: str
assigned_agent: str
blocks: List[int]
blocked_by: List[int]
status: str # 'queued', 'ready', 'in_progress', 'completed', 'failed'
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
agent_id: Optional[str] = None
class QueueManager:
def __init__(self):
self.queue: List[QueueItem] = []
def enqueue(self, issue_number: int, title: str, metadata: dict):
"""Add issue to queue."""
item = QueueItem(
issue_number=issue_number,
title=title,
estimated_context=metadata['estimated_context'],
difficulty=metadata['difficulty'],
assigned_agent=metadata['assigned_agent'],
blocks=metadata['blocks'],
blocked_by=metadata['blocked_by'],
status='queued'
)
self.queue.append(item)
self._update_ready_status()
def dequeue(self, issue_number: int):
"""Remove issue from queue (user unassigned)."""
self.queue = [q for q in self.queue if q.issue_number != issue_number]
def get_next_ready(self) -> Optional[QueueItem]:
"""Get next issue with no blockers."""
for item in self.queue:
if item.status == 'ready':
return item
return None
def mark_complete(self, issue_number: int):
"""Mark issue complete, unblock dependents."""
for item in self.queue:
if item.issue_number == issue_number:
item.status = 'completed'
item.completed_at = datetime.now()
self._update_ready_status()
def _update_ready_status(self):
"""Update which issues are ready (no blockers)."""
for item in self.queue:
if item.status != 'queued':
continue
# Check if all blockers are complete
blockers_complete = all(
self._is_complete(blocker)
for blocker in item.blocked_by
)
if blockers_complete:
item.status = 'ready'
def _is_complete(self, issue_number: int) -> bool:
"""Check if issue is complete."""
for item in self.queue:
if item.issue_number == issue_number:
return item.status == 'completed'
return True # Not in queue = already complete
```
### 4. Orchestration Loop
**Purpose:** Process queue, spawn agents, monitor progress
```python
class Coordinator:
def __init__(self, queue_manager: QueueManager):
self.queue = queue_manager
self.current_agent: Optional[Agent] = None
async def process_queue(self):
"""Main orchestration loop."""
while True:
# Get next ready issue
item = self.queue.get_next_ready()
if not item:
logger.info("No ready issues in queue")
break
# Mark as in progress
item.status = 'in_progress'
item.started_at = datetime.now()
# Comment on issue
await gitea_client.comment_on_issue(
item.issue_number,
f"🔄 Starting work with {item.assigned_agent} agent..."
)
try:
# Spawn agent
agent = await self.spawn_agent(item)
self.current_agent = agent
# Monitor until complete
await self.monitor_agent(agent, item)
# Mark complete
self.queue.mark_complete(item.issue_number)
# Comment success
await gitea_client.comment_on_issue(
item.issue_number,
f"✅ Work complete! All quality gates passed.\n\n"
f"Closing issue."
)
# Close issue
await gitea_client.close_issue(item.issue_number)
except Exception as e:
logger.error(f"Agent failed: {e}")
item.status = 'failed'
await gitea_client.comment_on_issue(
item.issue_number,
f"❌ Work failed: {e}\n\n"
f"Unassigning from coordinator. Please review."
)
# Unassign from coordinator
await gitea_client.unassign_issue(
item.issue_number,
COORDINATOR_USERNAME
)
async def spawn_agent(self, item: QueueItem) -> Agent:
"""Spawn AI agent for issue."""
# Build agent instructions
instructions = f"""
Complete issue #{item.issue_number}: {item.title}
Follow quality-rails:
- All code must pass: build, lint, test, coverage (85% min)
- Use TDD workflow (test first)
- Follow project CLAUDE.md guidelines
When complete, claim "done" and the coordinator will verify.
"""
# Spawn agent via API
agent = Agent(
type=item.assigned_agent,
instructions=instructions,
issue_number=item.issue_number
)
await agent.start()
return agent
async def monitor_agent(self, agent: Agent, item: QueueItem):
"""Monitor agent progress, context usage, quality gates."""
while not agent.is_complete():
# Check context usage
context = await agent.get_context_usage()
if context > 0.95:
# Rotate session
logger.info(f"Agent {agent.id} at 95% context, rotating")
new_agent = await self.rotate_session(agent, item)
agent = new_agent
elif context > 0.80:
# Compact session
logger.info(f"Agent {agent.id} at 80% context, compacting")
await agent.compact()
# Comment progress periodically
if time.time() % 300 == 0: # Every 5 minutes
await gitea_client.comment_on_issue(
item.issue_number,
f"🔄 Progress update:\n"
f"- Context usage: {context:.0%}\n"
f"- Status: Working..."
)
await asyncio.sleep(10) # Check every 10 seconds
# Agent claimed completion - verify with quality gates
if not await self.verify_quality_gates(item):
# Gates failed - force continuation
continuation = self.generate_continuation_prompt(item)
await agent.send_message(continuation)
# Continue monitoring
await self.monitor_agent(agent, item)
```
---
## User Control Mechanisms
### Start Work
**Assign to `@mosaic`**
- Single issue: Gitea UI → Issue → Assign to @mosaic
- Bulk: Gitea UI → Milestone → Bulk assign to @mosaic
- Result: Coordinator adds to queue, starts work
### Pause Work
**Unassign from `@mosaic`**
- Gitea UI → Issue → Unassign from @mosaic
- Result: Coordinator removes from queue, stops agent
### Cancel Work
**Close issue**
- Gitea UI → Issue → Close
- Result: Coordinator stops agent, marks complete
### Prioritize
**Assignment order**
- Assign high-priority issues first
- Coordinator processes in assignment order (respecting dependencies)
### Check Status
**View comments**
- Gitea UI → Issue → Comments from @mosaic
- Shows: Queue add, work start, progress updates, completion
---
## Visual Feedback
**User can see in Gitea:**
```
M4.1-Coordinator (0.0.4) - 13 issues
✅ #154 [COORD-001] Implement context estimator
Assigned: @mosaic
Status: ✅ Complete (closed 2 hours ago)
🔄 #155 [COORD-002] Build basic context monitor
Assigned: @mosaic
Status: 🔄 In Progress (context: 45%)
Comments:
🤖 mosaic: Added to queue... (3 hours ago)
🤖 mosaic: Starting work with glm agent... (2 hours ago)
🤖 mosaic: Progress update: 45% context... (5 min ago)
⏸️ #143 [COORD-003] Validate 50% rule
Assigned: @mosaic
Status: ⏸️ Queued (blocked by #155)
Comments:
🤖 mosaic: Added to queue. Waiting for dependencies...
⚪ #144 [COORD-004] Implement agent profiles
Unassigned
Status: ⚪ Not started
```
---
## Configuration
```yaml
# coordinator.yml
gitea:
url: https://git.mosaicstack.dev
api_token: ${GITEA_API_TOKEN}
webhook_secret: ${GITEA_WEBHOOK_SECRET}
coordinator:
username: mosaic
email: mosaic@mosaicstack.dev
server:
host: 0.0.0.0
port: 8080
public_url: https://coordinator.example.com
agents:
opus:
context_limit: 200000
cost_per_mtok: 15.00
sonnet:
context_limit: 200000
cost_per_mtok: 3.00
glm:
context_limit: 128000
cost_per_mtok: 0.00 # Self-hosted
haiku:
context_limit: 200000
cost_per_mtok: 0.80
minimax:
context_limit: 128000
cost_per_mtok: 0.00 # Self-hosted
behavior:
comment_on_queue_add: true
comment_on_start: true
comment_frequency: 300 # seconds
close_issue_on_complete: true
unassign_on_failure: true
quality_gates:
enabled: true
gates: [build, lint, test, coverage]
coverage_minimum: 85
```
---
## Deployment
### Docker Compose
```yaml
# docker-compose.yml (add to existing)
services:
coordinator:
build: ./coordinator
ports:
- "8080:8080"
environment:
- GITEA_URL=https://git.mosaicstack.dev
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
volumes:
- ./coordinator.yml:/app/coordinator.yml
- coordinator-data:/app/data
restart: unless-stopped
volumes:
coordinator-data:
```
### Systemd Service
```ini
[Unit]
Description=Mosaic Stack Coordinator
After=network.target
[Service]
Type=simple
User=mosaic
WorkingDirectory=/opt/coordinator
ExecStart=/opt/coordinator/venv/bin/python main.py
Restart=always
RestartSec=10
Environment="GITEA_URL=https://git.mosaicstack.dev"
Environment="GITEA_API_TOKEN=..."
Environment="GITEA_WEBHOOK_SECRET=..."
Environment="ANTHROPIC_API_KEY=..."
[Install]
WantedBy=multi-user.target
```
---
## Error Handling
### Webhook Signature Invalid
```python
# Return 401, log security alert
logger.security(f"Invalid webhook signature from {request.client.host}")
return Response(status_code=401)
```
### Issue Parse Failure
```python
# Comment on issue, use defaults
await gitea_client.comment_on_issue(
issue_number,
"⚠️ Could not parse issue metadata. Using defaults:\n"
"- estimated_context: 50000\n"
"- difficulty: medium\n"
"- assigned_agent: sonnet"
)
```
### Agent Failure
```python
# Comment on issue, unassign from coordinator
await gitea_client.comment_on_issue(
issue_number,
f"❌ Agent failed: {error}\n\n"
f"Unassigning from coordinator. Please review and reassign."
)
await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)
```
### Quality Gates Failure (Repeated)
```python
# After 3 attempts, give up
if attempt_count > 3:
await gitea_client.comment_on_issue(
issue_number,
"❌ Quality gates failed after 3 attempts.\n\n"
"Unassigning from coordinator. Manual intervention needed."
)
await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)
```
---
## Success Criteria
### Phase 0 Complete When:
- ✅ Coordinator user created in Gitea
- ✅ Webhook registered and verified
- ✅ Webhook receiver handles assignment events
- ✅ Issue parser extracts metadata correctly
- ✅ Queue manager tracks dependencies
- ✅ Test: Assign issue → Coordinator adds to queue → Comments on issue
### Integration Test:
```bash
# 1. Create test issue with metadata
# 2. Assign to @mosaic
# 3. Verify webhook fires
# 4. Verify comment added: "Added to queue..."
# 5. Verify queue contains issue
# 6. Success!
```
---
## Future Enhancements
### Bidirectional Status Updates
- Coordinator comments progress every 5 minutes
- Coordinator updates issue labels (in-progress, blocked, etc.)
- Coordinator creates checkboxes for acceptance criteria, checks them off
### Multi-Repo Support
- Register multiple repos
- Single coordinator handles all
- Cross-repo dependencies
### Priority Queuing
- Respect issue priority labels (p0 > p1 > p2 > p3)
- Allow user to bump priority via label change
### Cost Tracking
- Track token usage per issue
- Comment cost estimate before starting
- Comment actual cost after completion
---
## Conclusion
Assignment-based triggering provides:
-**Natural workflow** - Users assign issues like normal
-**Visual feedback** - See coordinator status in Gitea UI
-**Granular control** - Start/stop/pause per issue
-**Event-driven** - No polling, immediate response
-**Standard integration** - Uses Gitea's native webhooks
This is the foundation that enables autonomous orchestration.
---
**Document Version:** 1.0
**Created:** 2026-01-31
**Authors:** Jason Woltje + Claude Opus 4.5
**Status:** Proposed - Ready for Phase 0 implementation