Files
stack/docs/3-architecture/assignment-based-trigger-architecture.md
Jason Woltje 3be60ccd18 docs: Add assignment-based trigger architecture
Implements Phase 0 foundation for non-AI coordinator.

Key features:
- User assigns issue to @mosaic bot user → triggers webhook
- Webhook receiver processes assignment events
- AI agent parses issue metadata (context, difficulty, agent)
- Queue manager tracks dependencies and status
- Orchestration loop spawns agents and monitors progress

Benefits:
- Natural Gitea workflow (just assign issues)
- Visual feedback in Gitea UI
- Granular control (assign what you want)
- Event-driven (webhooks, not polling)
- No CLI needed

Phase 0 issues: #156-161 (6 issues, 290.6K tokens)

Refs #142

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 15:30:21 -06:00

23 KiB

Assignment-Based Trigger Architecture

Status: Proposed (Phase 0 - Foundation) Related Issues: #142 (Epic), Phase 0 issues Priority: Critical (P0) - Required before orchestration can function Milestone: M4.1-Coordinator (0.0.4)


Executive Summary

The coordinator uses issue assignment as the trigger mechanism. When a user assigns an issue to the special @mosaic user in Gitea, a webhook fires, the coordinator parses the issue metadata, adds it to the queue, and begins work.

Key insight: Assignment is the perfect trigger because it's:

  • Natural Gitea workflow (users already assign issues)
  • Visual feedback (can see what coordinator is working on)
  • Webhook-friendly (Gitea sends events on assignment)
  • Granular control (assign what you want, when you want)
  • No CLI needed (just use Gitea UI)

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                    GITEA REPOSITORY                         │
│                                                             │
│  User assigns issue to @mosaic                             │
│         │                                                   │
│         └──> Webhook: issue.assigned                       │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│              COORDINATOR WEBHOOK RECEIVER                   │
│                                                             │
│  1. Verify webhook signature                               │
│  2. Check assignee == "mosaic"                             │
│  3. Launch AI agent to parse issue                         │
│  4. Add to queue                                            │
│  5. Comment on issue: "Added to queue..."                  │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                   ISSUE PARSER AGENT                        │
│                   (AI - Sonnet/GLM)                         │
│                                                             │
│  Input: Issue markdown body                                │
│  Output: Structured metadata                               │
│    - estimated_context (tokens)                            │
│    - difficulty (low/medium/high)                          │
│    - assigned_agent (opus/sonnet/glm)                      │
│    - dependencies (blocked_by)                             │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                    COORDINATOR QUEUE                        │
│                                                             │
│  [Issue #154] COORD-001 (46.8K, medium, glm) → Ready       │
│  [Issue #155] COORD-002 (49.4K, medium, glm) → Blocked     │
│  [Issue #143] COORD-003 (40.3K, low, glm)    → Blocked     │
└─────────────────────┬───────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────┐
│              COORDINATOR ORCHESTRATION LOOP                 │
│                                                             │
│  1. Get next ready issue from queue                        │
│  2. Spawn agent (type from metadata)                       │
│  3. Monitor context usage                                  │
│  4. Run quality gates on completion                        │
│  5. Comment progress to Gitea                              │
│  6. Mark complete, move to next                            │
└─────────────────────────────────────────────────────────────┘

User Workflow

Initial Setup (One Time)

1. Create coordinator user in Gitea:

Gitea Admin → Users → Create User
  Username: mosaic
  Email: mosaic@mosaicstack.dev
  Type: Bot account

2. Register webhook in repo:

Gitea Repo → Settings → Webhooks → Add Webhook
  URL: https://coordinator.example.com/webhook/gitea
  Secret: [shared secret]
  Events: Issues (assigned, unassigned, closed, reopened)

3. Start coordinator service:

docker compose up -d coordinator
# or
python coordinator/main.py --config coordinator.yml

Daily Usage

Start work on issues:

Gitea UI → Issue #154 → Assign to @mosaic

Start entire milestone:

Gitea UI → Milestone M4.1-Coordinator → Bulk assign to @mosaic

Pause work:

Gitea UI → Issue #154 → Unassign from @mosaic

Check status:

Gitea UI → Issue #154 → View comments from @mosaic

Component Details

1. Webhook Receiver

Technology: FastAPI (Python)

Endpoints:

@app.post('/webhook/gitea')
async def handle_gitea_webhook(
    request: Request,
    x_gitea_signature: str = Header(...)
):
    """Handle Gitea webhook events."""

    # 1. Verify signature
    payload = await request.json()
    if not verify_signature(payload, x_gitea_signature):
        raise HTTPException(401, "Invalid signature")

    # 2. Route by action
    if payload['action'] == 'assigned':
        await handle_issue_assigned(payload)
    elif payload['action'] == 'unassigned':
        await handle_issue_unassigned(payload)
    elif payload['action'] == 'closed':
        await handle_issue_closed(payload)

    return {"status": "ok"}


async def handle_issue_assigned(payload: dict):
    """User assigned issue to coordinator."""

    # Check if assigned to coordinator user
    if payload['assignee']['username'] != COORDINATOR_USERNAME:
        return  # Not for us

    issue = payload['issue']

    # Launch parser agent
    metadata = await parse_issue_metadata(issue)

    # Add to queue
    queue_manager.enqueue(
        issue_number=issue['number'],
        title=issue['title'],
        metadata=metadata
    )

    # Comment on issue
    await gitea_client.comment_on_issue(
        issue['number'],
        f"🤖 Added to coordinator queue.\n\n"
        f"**Metadata:**\n"
        f"- Estimated context: {metadata['estimated_context']:,} tokens\n"
        f"- Difficulty: {metadata['difficulty']}\n"
        f"- Assigned agent: {metadata['assigned_agent']}\n\n"
        f"Starting work..."
    )

    # Trigger orchestration
    await coordinator.process_queue()

Security:

def verify_signature(payload: dict, signature: str) -> bool:
    """Verify Gitea webhook signature."""
    import hmac
    import hashlib

    secret = os.getenv('GITEA_WEBHOOK_SECRET').encode()
    computed = hmac.new(
        secret,
        json.dumps(payload).encode(),
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(computed, signature)

2. Issue Parser Agent

Purpose: Extract structured metadata from issue markdown body

Agent type: Sonnet (cheap, good at parsing)

Input: Issue markdown body

## Objective

Implement context estimation formula...

## Context Estimate

- Files to modify: 3
- Implementation complexity: medium (20000 tokens)
- Test requirements: medium (10000 tokens)
- Documentation: light (2000 tokens)
- **Total estimated: 46800 tokens**
- **Recommended agent: glm**

## Difficulty

medium

## Dependencies

- Blocked by: None
- Blocks: #155 (COORD-002)

Output: Structured JSON

{
  "estimated_context": 46800,
  "difficulty": "medium",
  "assigned_agent": "glm",
  "blocks": ["155"],
  "blocked_by": []
}

Implementation:

async def parse_issue_metadata(issue: dict) -> dict:
    """Parse issue body with AI agent to extract metadata."""

    client = anthropic.Anthropic()

    response = await client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1000,
        messages=[{
            "role": "user",
            "content": f"""Parse this GitHub/Gitea issue and extract metadata.

Issue Title: {issue['title']}
Issue Body:
{issue['body']}

Extract and return ONLY a JSON object with:
{{
  "estimated_context": <integer tokens>,
  "difficulty": "low" | "medium" | "high",
  "assigned_agent": "opus" | "sonnet" | "glm" | "haiku" | "minimax",
  "blocks": [<array of issue numbers>],
  "blocked_by": [<array of issue numbers>]
}}

If any field is missing, use reasonable defaults:
- estimated_context: 50000
- difficulty: "medium"
- assigned_agent: "sonnet"
- blocks: []
- blocked_by: []
"""
        }]
    )

    # Parse JSON from response
    metadata = json.loads(response.content[0].text)

    return metadata

3. Queue Manager

Purpose: Track work queue, dependencies, status

Data structure:

@dataclass
class QueueItem:
    issue_number: int
    title: str
    estimated_context: int
    difficulty: str
    assigned_agent: str
    blocks: List[int]
    blocked_by: List[int]
    status: str  # 'queued', 'ready', 'in_progress', 'completed', 'failed'
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    agent_id: Optional[str] = None


class QueueManager:
    def __init__(self):
        self.queue: List[QueueItem] = []

    def enqueue(self, issue_number: int, title: str, metadata: dict):
        """Add issue to queue."""
        item = QueueItem(
            issue_number=issue_number,
            title=title,
            estimated_context=metadata['estimated_context'],
            difficulty=metadata['difficulty'],
            assigned_agent=metadata['assigned_agent'],
            blocks=metadata['blocks'],
            blocked_by=metadata['blocked_by'],
            status='queued'
        )

        self.queue.append(item)
        self._update_ready_status()

    def dequeue(self, issue_number: int):
        """Remove issue from queue (user unassigned)."""
        self.queue = [q for q in self.queue if q.issue_number != issue_number]

    def get_next_ready(self) -> Optional[QueueItem]:
        """Get next issue with no blockers."""
        for item in self.queue:
            if item.status == 'ready':
                return item
        return None

    def mark_complete(self, issue_number: int):
        """Mark issue complete, unblock dependents."""
        for item in self.queue:
            if item.issue_number == issue_number:
                item.status = 'completed'
                item.completed_at = datetime.now()

        self._update_ready_status()

    def _update_ready_status(self):
        """Update which issues are ready (no blockers)."""
        for item in self.queue:
            if item.status != 'queued':
                continue

            # Check if all blockers are complete
            blockers_complete = all(
                self._is_complete(blocker)
                for blocker in item.blocked_by
            )

            if blockers_complete:
                item.status = 'ready'

    def _is_complete(self, issue_number: int) -> bool:
        """Check if issue is complete."""
        for item in self.queue:
            if item.issue_number == issue_number:
                return item.status == 'completed'
        return True  # Not in queue = already complete

4. Orchestration Loop

Purpose: Process queue, spawn agents, monitor progress

class Coordinator:
    def __init__(self, queue_manager: QueueManager):
        self.queue = queue_manager
        self.current_agent: Optional[Agent] = None

    async def process_queue(self):
        """Main orchestration loop."""

        while True:
            # Get next ready issue
            item = self.queue.get_next_ready()

            if not item:
                logger.info("No ready issues in queue")
                break

            # Mark as in progress
            item.status = 'in_progress'
            item.started_at = datetime.now()

            # Comment on issue
            await gitea_client.comment_on_issue(
                item.issue_number,
                f"🔄 Starting work with {item.assigned_agent} agent..."
            )

            try:
                # Spawn agent
                agent = await self.spawn_agent(item)
                self.current_agent = agent

                # Monitor until complete
                await self.monitor_agent(agent, item)

                # Mark complete
                self.queue.mark_complete(item.issue_number)

                # Comment success
                await gitea_client.comment_on_issue(
                    item.issue_number,
                    f"✅ Work complete! All quality gates passed.\n\n"
                    f"Closing issue."
                )

                # Close issue
                await gitea_client.close_issue(item.issue_number)

            except Exception as e:
                logger.error(f"Agent failed: {e}")
                item.status = 'failed'

                await gitea_client.comment_on_issue(
                    item.issue_number,
                    f"❌ Work failed: {e}\n\n"
                    f"Unassigning from coordinator. Please review."
                )

                # Unassign from coordinator
                await gitea_client.unassign_issue(
                    item.issue_number,
                    COORDINATOR_USERNAME
                )

    async def spawn_agent(self, item: QueueItem) -> Agent:
        """Spawn AI agent for issue."""

        # Build agent instructions
        instructions = f"""
Complete issue #{item.issue_number}: {item.title}

Follow quality-rails:
- All code must pass: build, lint, test, coverage (85% min)
- Use TDD workflow (test first)
- Follow project CLAUDE.md guidelines

When complete, claim "done" and the coordinator will verify.
"""

        # Spawn agent via API
        agent = Agent(
            type=item.assigned_agent,
            instructions=instructions,
            issue_number=item.issue_number
        )

        await agent.start()

        return agent

    async def monitor_agent(self, agent: Agent, item: QueueItem):
        """Monitor agent progress, context usage, quality gates."""

        while not agent.is_complete():
            # Check context usage
            context = await agent.get_context_usage()

            if context > 0.95:
                # Rotate session
                logger.info(f"Agent {agent.id} at 95% context, rotating")
                new_agent = await self.rotate_session(agent, item)
                agent = new_agent

            elif context > 0.80:
                # Compact session
                logger.info(f"Agent {agent.id} at 80% context, compacting")
                await agent.compact()

            # Comment progress periodically
            if time.time() % 300 == 0:  # Every 5 minutes
                await gitea_client.comment_on_issue(
                    item.issue_number,
                    f"🔄 Progress update:\n"
                    f"- Context usage: {context:.0%}\n"
                    f"- Status: Working..."
                )

            await asyncio.sleep(10)  # Check every 10 seconds

        # Agent claimed completion - verify with quality gates
        if not await self.verify_quality_gates(item):
            # Gates failed - force continuation
            continuation = self.generate_continuation_prompt(item)
            await agent.send_message(continuation)

            # Continue monitoring
            await self.monitor_agent(agent, item)

User Control Mechanisms

Start Work

Assign to @mosaic

  • Single issue: Gitea UI → Issue → Assign to @mosaic
  • Bulk: Gitea UI → Milestone → Bulk assign to @mosaic
  • Result: Coordinator adds to queue, starts work

Pause Work

Unassign from @mosaic

  • Gitea UI → Issue → Unassign from @mosaic
  • Result: Coordinator removes from queue, stops agent

Cancel Work

Close issue

  • Gitea UI → Issue → Close
  • Result: Coordinator stops agent, marks complete

Prioritize

Assignment order

  • Assign high-priority issues first
  • Coordinator processes in assignment order (respecting dependencies)

Check Status

View comments

  • Gitea UI → Issue → Comments from @mosaic
  • Shows: Queue add, work start, progress updates, completion

Visual Feedback

User can see in Gitea:

M4.1-Coordinator (0.0.4) - 13 issues

✅ #154 [COORD-001] Implement context estimator
     Assigned: @mosaic
     Status: ✅ Complete (closed 2 hours ago)

🔄 #155 [COORD-002] Build basic context monitor
     Assigned: @mosaic
     Status: 🔄 In Progress (context: 45%)
     Comments:
       🤖 mosaic: Added to queue... (3 hours ago)
       🤖 mosaic: Starting work with glm agent... (2 hours ago)
       🤖 mosaic: Progress update: 45% context... (5 min ago)

⏸️ #143 [COORD-003] Validate 50% rule
     Assigned: @mosaic
     Status: ⏸️ Queued (blocked by #155)
     Comments:
       🤖 mosaic: Added to queue. Waiting for dependencies...

⚪ #144 [COORD-004] Implement agent profiles
     Unassigned
     Status: ⚪ Not started

Configuration

# coordinator.yml

gitea:
  url: https://git.mosaicstack.dev
  api_token: ${GITEA_API_TOKEN}
  webhook_secret: ${GITEA_WEBHOOK_SECRET}

coordinator:
  username: mosaic
  email: mosaic@mosaicstack.dev

server:
  host: 0.0.0.0
  port: 8080
  public_url: https://coordinator.example.com

agents:
  opus:
    context_limit: 200000
    cost_per_mtok: 15.00
  sonnet:
    context_limit: 200000
    cost_per_mtok: 3.00
  glm:
    context_limit: 128000
    cost_per_mtok: 0.00 # Self-hosted
  haiku:
    context_limit: 200000
    cost_per_mtok: 0.80
  minimax:
    context_limit: 128000
    cost_per_mtok: 0.00 # Self-hosted

behavior:
  comment_on_queue_add: true
  comment_on_start: true
  comment_frequency: 300 # seconds
  close_issue_on_complete: true
  unassign_on_failure: true

quality_gates:
  enabled: true
  gates: [build, lint, test, coverage]
  coverage_minimum: 85

Deployment

Docker Compose

# docker-compose.yml (add to existing)

services:
  coordinator:
    build: ./coordinator
    ports:
      - "8080:8080"
    environment:
      - GITEA_URL=https://git.mosaicstack.dev
      - GITEA_API_TOKEN=${GITEA_API_TOKEN}
      - GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
    volumes:
      - ./coordinator.yml:/app/coordinator.yml
      - coordinator-data:/app/data
    restart: unless-stopped

volumes:
  coordinator-data:

Systemd Service

[Unit]
Description=Mosaic Stack Coordinator
After=network.target

[Service]
Type=simple
User=mosaic
WorkingDirectory=/opt/coordinator
ExecStart=/opt/coordinator/venv/bin/python main.py
Restart=always
RestartSec=10

Environment="GITEA_URL=https://git.mosaicstack.dev"
Environment="GITEA_API_TOKEN=..."
Environment="GITEA_WEBHOOK_SECRET=..."
Environment="ANTHROPIC_API_KEY=..."

[Install]
WantedBy=multi-user.target

Error Handling

Webhook Signature Invalid

# Return 401, log security alert
logger.security(f"Invalid webhook signature from {request.client.host}")
return Response(status_code=401)

Issue Parse Failure

# Comment on issue, use defaults
await gitea_client.comment_on_issue(
    issue_number,
    "⚠️ Could not parse issue metadata. Using defaults:\n"
    "- estimated_context: 50000\n"
    "- difficulty: medium\n"
    "- assigned_agent: sonnet"
)

Agent Failure

# Comment on issue, unassign from coordinator
await gitea_client.comment_on_issue(
    issue_number,
    f"❌ Agent failed: {error}\n\n"
    f"Unassigning from coordinator. Please review and reassign."
)
await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)

Quality Gates Failure (Repeated)

# After 3 attempts, give up
if attempt_count > 3:
    await gitea_client.comment_on_issue(
        issue_number,
        "❌ Quality gates failed after 3 attempts.\n\n"
        "Unassigning from coordinator. Manual intervention needed."
    )
    await gitea_client.unassign_issue(issue_number, COORDINATOR_USERNAME)

Success Criteria

Phase 0 Complete When:

  • Coordinator user created in Gitea
  • Webhook registered and verified
  • Webhook receiver handles assignment events
  • Issue parser extracts metadata correctly
  • Queue manager tracks dependencies
  • Test: Assign issue → Coordinator adds to queue → Comments on issue

Integration Test:

# 1. Create test issue with metadata
# 2. Assign to @mosaic
# 3. Verify webhook fires
# 4. Verify comment added: "Added to queue..."
# 5. Verify queue contains issue
# 6. Success!

Future Enhancements

Bidirectional Status Updates

  • Coordinator comments progress every 5 minutes
  • Coordinator updates issue labels (in-progress, blocked, etc.)
  • Coordinator creates checkboxes for acceptance criteria, checks them off

Multi-Repo Support

  • Register multiple repos
  • Single coordinator handles all
  • Cross-repo dependencies

Priority Queuing

  • Respect issue priority labels (p0 > p1 > p2 > p3)
  • Allow user to bump priority via label change

Cost Tracking

  • Track token usage per issue
  • Comment cost estimate before starting
  • Comment actual cost after completion

Conclusion

Assignment-based triggering provides:

  • Natural workflow - Users assign issues like normal
  • Visual feedback - See coordinator status in Gitea UI
  • Granular control - Start/stop/pause per issue
  • Event-driven - No polling, immediate response
  • Standard integration - Uses Gitea's native webhooks

This is the foundation that enables autonomous orchestration.


Document Version: 1.0 Created: 2026-01-31 Authors: Jason Woltje + Claude Opus 4.5 Status: Proposed - Ready for Phase 0 implementation