Implements Phase 0 foundation for non-AI coordinator. Key features: - User assigns issue to @mosaic bot user → triggers webhook - Webhook receiver processes assignment events - AI agent parses issue metadata (context, difficulty, agent) - Queue manager tracks dependencies and status - Orchestration loop spawns agents and monitors progress Benefits: - Natural Gitea workflow (just assign issues) - Visual feedback in Gitea UI - Granular control (assign what you want) - Event-driven (webhooks, not polling) - No CLI needed Phase 0 issues: #156-161 (6 issues, 290.6K tokens) Refs #142 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
23 KiB
Assignment-Based Trigger Architecture
Status: Proposed (Phase 0 - Foundation) Related Issues: #142 (Epic), Phase 0 issues Priority: Critical (P0) - Required before orchestration can function Milestone: M4.1-Coordinator (0.0.4)
Executive Summary
The coordinator uses issue assignment as the trigger mechanism. When a user assigns an issue to the special @mosaic user in Gitea, a webhook fires, the coordinator parses the issue metadata, adds it to the queue, and begins work.
Key insight: Assignment is the perfect trigger because it's:
- ✅ Natural Gitea workflow (users already assign issues)
- ✅ Visual feedback (can see what coordinator is working on)
- ✅ Webhook-friendly (Gitea sends events on assignment)
- ✅ Granular control (assign what you want, when you want)
- ✅ No CLI needed (just use Gitea UI)
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ GITEA REPOSITORY │
│ │
│ User assigns issue to @mosaic │
│ │ │
│ └──> Webhook: issue.assigned │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR WEBHOOK RECEIVER │
│ │
│ 1. Verify webhook signature │
│ 2. Check assignee == "mosaic" │
│ 3. Launch AI agent to parse issue │
│ 4. Add to queue │
│ 5. Comment on issue: "Added to queue..." │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ISSUE PARSER AGENT │
│ (AI - Sonnet/GLM) │
│ │
│ Input: Issue markdown body │
│ Output: Structured metadata │
│ - estimated_context (tokens) │
│ - difficulty (low/medium/high) │
│ - assigned_agent (opus/sonnet/glm) │
│ - dependencies (blocked_by) │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR QUEUE │
│ │
│ [Issue #154] COORD-001 (46.8K, medium, glm) → Ready │
│ [Issue #155] COORD-002 (49.4K, medium, glm) → Blocked │
│ [Issue #143] COORD-003 (40.3K, low, glm) → Blocked │
└─────────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ COORDINATOR ORCHESTRATION LOOP │
│ │
│ 1. Get next ready issue from queue │
│ 2. Spawn agent (type from metadata) │
│ 3. Monitor context usage │
│ 4. Run quality gates on completion │
│ 5. Comment progress to Gitea │
│ 6. Mark complete, move to next │
└─────────────────────────────────────────────────────────────┘
User Workflow
Initial Setup (One Time)
1. Create coordinator user in Gitea:
Gitea Admin → Users → Create User
Username: mosaic
Email: mosaic@mosaicstack.dev
Type: Bot account
2. Register webhook in repo:
Gitea Repo → Settings → Webhooks → Add Webhook
URL: https://coordinator.example.com/webhook/gitea
Secret: [shared secret]
Events: Issues (assigned, unassigned, closed, reopened)
3. Start coordinator service:
docker compose up -d coordinator
# or
python coordinator/main.py --config coordinator.yml
Daily Usage
Start work on issues:
Gitea UI → Issue #154 → Assign to @mosaic
Start entire milestone:
Gitea UI → Milestone M4.1-Coordinator → Bulk assign to @mosaic
Pause work:
Gitea UI → Issue #154 → Unassign from @mosaic
Check status:
Gitea UI → Issue #154 → View comments from @mosaic
Component Details
1. Webhook Receiver
Technology: FastAPI (Python)
Endpoints:
@app.post('/webhook/gitea')
async def handle_gitea_webhook(
request: Request,
x_gitea_signature: str = Header(...)
):
"""Handle Gitea webhook events."""
# 1. Verify signature
payload = await request.json()
if not verify_signature(payload, x_gitea_signature):
raise HTTPException(401, "Invalid signature")
# 2. Route by action
if payload['action'] == 'assigned':
await handle_issue_assigned(payload)
elif payload['action'] == 'unassigned':
await handle_issue_unassigned(payload)
elif payload['action'] == 'closed':
await handle_issue_closed(payload)
return {"status": "ok"}
async def handle_issue_assigned(payload: dict):
"""User assigned issue to coordinator."""
# Check if assigned to coordinator user
if payload['assignee']['username'] != COORDINATOR_USERNAME:
return # Not for us
issue = payload['issue']
# Launch parser agent
metadata = await parse_issue_metadata(issue)
# Add to queue
queue_manager.enqueue(
issue_number=issue['number'],
title=issue['title'],
metadata=metadata
)
# Comment on issue
await gitea_client.comment_on_issue(
issue['number'],
f"🤖 Added to coordinator queue.\n\n"
f"**Metadata:**\n"
f"- Estimated context: {metadata['estimated_context']:,} tokens\n"
f"- Difficulty: {metadata['difficulty']}\n"
f"- Assigned agent: {metadata['assigned_agent']}\n\n"
f"Starting work..."
)
# Trigger orchestration
await coordinator.process_queue()
Security:
def verify_signature(payload: dict, signature: str) -> bool:
"""Verify Gitea webhook signature."""
import hmac
import hashlib
secret = os.getenv('GITEA_WEBHOOK_SECRET').encode()
computed = hmac.new(
secret,
json.dumps(payload).encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed, signature)
2. Issue Parser Agent
Purpose: Extract structured metadata from issue markdown body
Agent type: Sonnet (cheap, good at parsing)
Input: Issue markdown body
## Objective
Implement context estimation formula...
## Context Estimate
- Files to modify: 3
- Implementation complexity: medium (20000 tokens)
- Test requirements: medium (10000 tokens)
- Documentation: light (2000 tokens)
- **Total estimated: 46800 tokens**
- **Recommended agent: glm**
## Difficulty
medium
## Dependencies
- Blocked by: None
- Blocks: #155 (COORD-002)
Output: Structured JSON
{
"estimated_context": 46800,
"difficulty": "medium",
"assigned_agent": "glm",
"blocks": ["155"],
"blocked_by": []
}
Implementation:
async def parse_issue_metadata(issue: dict) -> dict:
"""Parse issue body with AI agent to extract metadata."""
client = anthropic.Anthropic()
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"""Parse this GitHub/Gitea issue and extract metadata.
Issue Title: {issue['title']}
Issue Body:
{issue['body']}
Extract and return ONLY a JSON object with:
{{
"estimated_context": <integer tokens>,
"difficulty": "low" | "medium" | "high",
"assigned_agent": "opus" | "sonnet" | "glm" | "haiku" | "minimax",
"blocks": [<array of issue numbers>],
"blocked_by": [<array of issue numbers>]
}}
If any field is missing, use reasonable defaults:
- estimated_context: 50000
- difficulty: "medium"
- assigned_agent: "sonnet"
- blocks: []
- blocked_by: []
"""
}]
)
# Parse JSON from response
metadata = json.loads(response.content[0].text)
return metadata
3. Queue Manager
Purpose: Track work queue, dependencies, status
Data structure:
@dataclass
class QueueItem:
issue_number: int
title: str
estimated_context: int
difficulty: str
assigned_agent: str
blocks: List[int]
blocked_by: List[int]
status: str # 'queued', 'ready', 'in_progress', 'completed', 'failed'
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
agent_id: Optional[str] = None
class QueueManager:
def __init__(self):
self.queue: List[QueueItem] = []
def enqueue(self, issue_number: int, title: str, metadata: dict):
"""Add issue to queue."""
item = QueueItem(
issue_number=issue_number,
title=title,
estimated_context=metadata['estimated_context'],
difficulty=metadata['difficulty'],
assigned_agent=metadata['assigned_agent'],
blocks=metadata['blocks'],
blocked_by=metadata['blocked_by'],
status='queued'
)
self.queue.append(item)
self._update_ready_status()
def dequeue(self, issue_number: int):
"""Remove issue from queue (user unassigned)."""
self.queue = [q for q in self.queue if q.issue_number != issue_number]
def get_next_ready(self) -> Optional[QueueItem]:
"""Get next issue with no blockers."""
for item in self.queue:
if item.status == 'ready':
return item
return None
def mark_complete(self, issue_number: int):
"""Mark issue complete, unblock dependents."""
for item in self.queue:
if item.issue_number == issue_number:
item.status = 'completed'
item.completed_at = datetime.now()
self._update_ready_status()
def _update_ready_status(self):
"""Update which issues are ready (no blockers)."""
for item in self.queue:
if item.status != 'queued':
continue
# Check if all blockers are complete
blockers_complete = all(
self._is_complete(blocker)
for blocker in item.blocked_by
)
if blockers_complete:
item.status = 'ready'
def _is_complete(self, issue_number: int) -> bool:
"""Check if issue is complete."""
for item in self.queue:
if item.issue_number == issue_number:
return item.status == 'completed'
return True # Not in queue = already complete
4. Orchestration Loop
Purpose: Process queue, spawn agents, monitor progress
class Coordinator:
def __init__(self, queue_manager: QueueManager):
self.queue = queue_manager
self.current_agent: Optional[Agent] = None
async def process_queue(self):
"""Main orchestration loop."""
while True:
# Get next ready issue
item = self.queue.get_next_ready()
if not item:
logger.info("No ready issues in queue")
break
# Mark as in progress
item.status = 'in_progress'
item.started_at = datetime.now()
# Comment on issue
await gitea_client.comment_on_issue(
item.issue_number,
f"🔄 Starting work with {item.assigned_agent} agent..."
)
try:
# Spawn agent
agent = await self.spawn_agent(item)
self.current_agent = agent
# Monitor until complete
await self.monitor_agent(agent, item)
# Mark complete
self.queue.mark_complete(item.issue_number)
# Comment success
await gitea_client.comment_on_issue(
item.issue_number,
f"✅ Work complete! All quality gates passed.\n\n"
f"Closing issue."
)
# Close issue
await gitea_client.close_issue(item.issue_number)
except Exception as e:
logger.error(f"Agent failed: {e}")
item.status = 'failed'
await gitea_client.comment_on_issue(
item.issue_number,
f"❌ Work failed: {e}\n\n"
f"Unassigning from coordinator. Please review."
)
# Unassign from coordinator
await gitea_client.unassign_issue(
item.issue_number,
COORDINATOR_USERNAME
)
async def spawn_agent(self, item: QueueItem) -> Agent:
"""Spawn AI agent for issue."""
# Build agent instructions
instructions = f"""
Complete issue #{item.issue_number}: {item.title}
Follow quality-rails:
- All code must pass: build, lint, test, coverage (85% min)
- Use TDD workflow (test first)
- Follow project CLAUDE.md guidelines
When complete, claim "done" and the coordinator will verify.
"""
# Spawn agent via API
agent = Agent(
type=item.assigned_agent,
instructions=instructions,
issue_number=item.issue_number
)
await agent.start()
return agent
async def monitor_agent(self, agent: Agent, item: QueueItem):
"""Monitor agent progress, context usage, quality gates."""
while not agent.is_complete():
# Check context usage
context = await agent.get_context_usage()
if context > 0.95:
# Rotate session
logger.info(f"Agent {agent.id} at 95% context, rotating")
new_agent = await self.rotate_session(agent, item)
agent = new_agent
elif context > 0.80:
# Compact session
logger.info(f"Agent {agent.id} at 80% context, compacting")
await agent.compact()
# Comment progress periodically
if time.time() % 300 == 0: # Every 5 minutes
await gitea_client.comment_on_issue(
item.issue_number,
f"🔄 Progress update:\n"
f"- Context usage: {context:.0%}\n"
f"- Status: Working..."
)
await asyncio.sleep(10) # Check every 10 seconds
# Agent claimed completion - verify with quality gates
if not await self.verify_quality_gates(item):
# Gates failed - force continuation
continuation = self.generate_continuation_prompt(item)
await agent.send_message(continuation)
# Continue monitoring
await self.monitor_agent(agent, item)
User Control Mechanisms
Start Work
Assign to @mosaic
- Single issue: Gitea UI → Issue → Assign to @mosaic
- Bulk: Gitea UI → Milestone → Bulk assign to @mosaic
- Result: Coordinator adds to queue, starts work
Pause Work
Unassign from @mosaic
- Gitea UI → Issue → Unassign from @mosaic
- Result: Coordinator removes from queue, stops agent
Cancel Work
Close issue
- Gitea UI → Issue → Close
- Result: Coordinator stops agent, marks complete
Prioritize
Assignment order
- Assign high-priority issues first
- Coordinator processes in assignment order (respecting dependencies)
Check Status
View comments
- Gitea UI → Issue → Comments from @mosaic
- Shows: Queue add, work start, progress updates, completion
Visual Feedback
User can see in Gitea:
M4.1-Coordinator (0.0.4) - 13 issues
✅ #154 [COORD-001] Implement context estimator
Assigned: @mosaic
Status: ✅ Complete (closed 2 hours ago)
🔄 #155 [COORD-002] Build basic context monitor
Assigned: @mosaic
Status: 🔄 In Progress (context: 45%)
Comments:
🤖 mosaic: Added to queue... (3 hours ago)
🤖 mosaic: Starting work with glm agent... (2 hours ago)
🤖 mosaic: Progress update: 45% context... (5 min ago)
⏸️ #143 [COORD-003] Validate 50% rule
Assigned: @mosaic
Status: ⏸️ Queued (blocked by #155)
Comments:
🤖 mosaic: Added to queue. Waiting for dependencies...
⚪ #144 [COORD-004] Implement agent profiles
Unassigned
Status: ⚪ Not started
Configuration
# coordinator.yml
gitea:
url: https://git.mosaicstack.dev
api_token: ${GITEA_API_TOKEN}
webhook_secret: ${GITEA_WEBHOOK_SECRET}
coordinator:
username: mosaic
email: mosaic@mosaicstack.dev
server:
host: 0.0.0.0
port: 8080
public_url: https://coordinator.example.com
agents:
opus:
context_limit: 200000
cost_per_mtok: 15.00
sonnet:
context_limit: 200000
cost_per_mtok: 3.00
glm:
context_limit: 128000
cost_per_mtok: 0.00 # Self-hosted
haiku:
context_limit: 200000
cost_per_mtok: 0.80
minimax:
context_limit: 128000
cost_per_mtok: 0.00 # Self-hosted
behavior:
comment_on_queue_add: true
comment_on_start: true
comment_frequency: 300 # seconds
close_issue_on_complete: true
unassign_on_failure: true
quality_gates:
enabled: true
gates: [build, lint, test, coverage]
coverage_minimum: 85
Deployment
Docker Compose
# docker-compose.yml (add to existing)
services:
coordinator:
build: ./coordinator
ports:
- "8080:8080"
environment:
- GITEA_URL=https://git.mosaicstack.dev
- GITEA_API_TOKEN=${GITEA_API_TOKEN}
- GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
volumes:
- ./coordinator.yml:/app/coordinator.yml
- coordinator-data:/app/data
restart: unless-stopped
volumes:
coordinator-data:
Systemd Service
[Unit]
Description=Mosaic Stack Coordinator
After=network.target
[Service]
Type=simple
User=mosaic
WorkingDirectory=/opt/coordinator
ExecStart=/opt/coordinator/venv/bin/python main.py
Restart=always
RestartSec=10
Environment="GITEA_URL=https://git.mosaicstack.dev"
Environment="GITEA_API_TOKEN=..."
Environment="GITEA_WEBHOOK_SECRET=..."
Environment="ANTHROPIC_API_KEY=..."
[Install]
WantedBy=multi-user.target
Error Handling
Webhook Signature Invalid
# Return 401, log security alert
logger.security(f"Invalid webhook signature from {request.client.host}")
return Response(status_code=401)
Issue Parse Failure
# Comment on issue, use defaults
await gitea_client.comment_on_issue(
issue_number,
"⚠️ Could not parse issue metadata. Using defaults:\n"
"- estimated_context: 50000\n"
"- difficulty: medium\n"
"- assigned_agent: sonnet"
)
Agent Failure
# Comment on issue, unassign from coordinator
await gitea_client.comment_on_issue(
issue_number,
f"❌ Agent failed: {error}\n\n"
f"Unassigning from coordinator. Please review and reassign."
)
await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)
Quality Gates Failure (Repeated)
# After 3 attempts, give up
if attempt_count > 3:
await gitea_client.comment_on_issue(
issue_number,
"❌ Quality gates failed after 3 attempts.\n\n"
"Unassigning from coordinator. Manual intervention needed."
)
await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)
Success Criteria
Phase 0 Complete When:
- ✅ Coordinator user created in Gitea
- ✅ Webhook registered and verified
- ✅ Webhook receiver handles assignment events
- ✅ Issue parser extracts metadata correctly
- ✅ Queue manager tracks dependencies
- ✅ Test: Assign issue → Coordinator adds to queue → Comments on issue
Integration Test:
# 1. Create test issue with metadata
# 2. Assign to @mosaic
# 3. Verify webhook fires
# 4. Verify comment added: "Added to queue..."
# 5. Verify queue contains issue
# 6. Success!
Future Enhancements
Bidirectional Status Updates
- Coordinator comments progress every 5 minutes
- Coordinator updates issue labels (in-progress, blocked, etc.)
- Coordinator creates checkboxes for acceptance criteria, checks them off
Multi-Repo Support
- Register multiple repos
- Single coordinator handles all
- Cross-repo dependencies
Priority Queuing
- Respect issue priority labels (p0 > p1 > p2 > p3)
- Allow user to bump priority via label change
Cost Tracking
- Track token usage per issue
- Comment cost estimate before starting
- Comment actual cost after completion
Conclusion
Assignment-based triggering provides:
- ✅ Natural workflow - Users assign issues like normal
- ✅ Visual feedback - See coordinator status in Gitea UI
- ✅ Granular control - Start/stop/pause per issue
- ✅ Event-driven - No polling, immediate response
- ✅ Standard integration - Uses Gitea's native webhooks
This is the foundation that enables autonomous orchestration.
Document Version: 1.0 Created: 2026-01-31 Authors: Jason Woltje + Claude Opus 4.5 Status: Proposed - Ready for Phase 0 implementation