feat(#148): Implement Quality Orchestrator and Forced Continuation services

Implements COORD-008 - Build Quality Orchestrator service that intercepts
completion claims and enforces quality gates.

**Quality Orchestrator (quality_orchestrator.py):**
- Runs all quality gates (build, lint, test, coverage) in parallel using asyncio
- Aggregates gate results into VerificationResult model
- Determines overall pass/fail status
- Handles gate exceptions gracefully
- Uses dependency injection for testability
- 87% test coverage (exceeds 85% minimum)

**Forced Continuation Service (forced_continuation.py):**
- Generates non-negotiable continuation prompts for gate failures
- Provides actionable remediation steps for each failed gate
- Includes specific error details and coverage gaps
- Blocks completion until all gates pass
- 100% test coverage

**Tests:**
- 6 tests for QualityOrchestrator covering:
  - All gates passing scenario
  - Single/multiple/all gates failing scenarios
  - Parallel gate execution verification
  - Exception handling
- 9 tests for ForcedContinuationService covering:
  - Individual gate failure prompts (build, lint, test, coverage)
  - Multiple simultaneous failures
  - Actionable details inclusion
  - Error handling for invalid states

**Quality Gates:**
 Build: mypy passes (no type errors)
 Lint: ruff passes (no violations)
 Test: 15/15 tests pass (100% pass rate)
 Coverage: 87% quality_orchestrator, 100% forced_continuation (exceeds 85%)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 20:04:26 -06:00
parent e79ed8da2b
commit 324c6b71d8
4 changed files with 979 additions and 0 deletions

View File

@@ -0,0 +1,144 @@
"""Forced Continuation service for generating non-negotiable agent instructions."""
from src.quality_orchestrator import VerificationResult
class ForcedContinuationService:
"""Generates forced continuation prompts for quality gate failures.
This service creates non-negotiable, actionable prompts that instruct
agents to fix quality gate failures. The prompts are designed to:
- Be clear and directive (not suggestions)
- Include specific failure details
- Provide actionable remediation steps
- Block completion until all gates pass
"""
def generate_prompt(self, verification: VerificationResult) -> str:
"""Generate a forced continuation prompt for gate failures.
Args:
verification: VerificationResult containing gate failure details
Returns:
str: Non-negotiable prompt instructing agent to fix failures
Raises:
ValueError: If verification.all_passed is True (no failures to fix)
"""
if verification.all_passed:
raise ValueError(
"Cannot generate continuation prompt when all gates pass. "
"This method should only be called when verification fails."
)
# Collect failed gates
failed_gates = {
name: result
for name, result in verification.gate_results.items()
if not result.passed
}
# Build the prompt
prompt_parts = [
"QUALITY GATES FAILED - COMPLETION BLOCKED",
"",
"The following quality gates have failed and MUST be fixed before completion:",
"",
]
# Add details for each failed gate
for gate_name, result in failed_gates.items():
prompt_parts.append(f"{gate_name.upper()} GATE FAILED")
prompt_parts.append(f" Message: {result.message}")
# Add specific details if available
if result.details:
if "stderr" in result.details and result.details["stderr"]:
prompt_parts.append(" Details:")
# Include first few lines of stderr
stderr_lines = result.details["stderr"].split("\n")[:5]
for line in stderr_lines:
if line.strip():
prompt_parts.append(f" {line}")
# Add coverage-specific details
if "coverage_percent" in result.details:
coverage = result.details["coverage_percent"]
minimum = result.details.get("minimum_coverage", 85.0)
gap = minimum - coverage
prompt_parts.append(f" Current coverage: {coverage:.1f}%")
prompt_parts.append(f" Required coverage: {minimum:.1f}%")
prompt_parts.append(f" Coverage gap: {gap:.1f}%")
prompt_parts.append("")
# Add remediation instructions
prompt_parts.extend(
[
"REQUIRED ACTIONS:",
"",
]
)
# Add specific remediation steps based on which gates failed
if "build" in failed_gates:
prompt_parts.extend(
[
"1. BUILD GATE - Fix all type errors:",
" - Run: mypy src/",
" - Fix all type errors reported",
" - Ensure all type annotations are correct",
"",
]
)
if "lint" in failed_gates:
prompt_parts.extend(
[
"2. LINT GATE - Fix all linting issues:",
" - Run: ruff check src/",
" - Fix all errors and warnings",
" - Ensure code follows style guidelines",
"",
]
)
if "test" in failed_gates:
prompt_parts.extend(
[
"3. TEST GATE - Fix all failing tests:",
" - Run: pytest -v",
" - Fix all test failures",
" - Ensure 100% test pass rate",
"",
]
)
if "coverage" in failed_gates:
coverage_result = failed_gates["coverage"]
current = coverage_result.details.get("coverage_percent", 0.0)
minimum = coverage_result.details.get("minimum_coverage", 85.0)
prompt_parts.extend(
[
"4. COVERAGE GATE - Increase test coverage:",
" - Run: pytest --cov=src --cov-report=term-missing",
f" - Current: {current:.1f}% | Required: {minimum:.1f}%",
" - Add tests for uncovered code paths",
" - Focus on files with low coverage",
"",
]
)
# Add final directive
prompt_parts.extend(
[
"You MUST fix all failing gates before claiming completion.",
"After fixing issues, run all quality gates again to verify.",
"",
"DO NOT claim completion until all gates pass.",
]
)
return "\n".join(prompt_parts)

View File

@@ -0,0 +1,164 @@
"""Quality Orchestrator service for coordinating quality gate execution."""
import asyncio
from typing import Any
from pydantic import BaseModel, Field
from src.gates.build_gate import BuildGate
from src.gates.coverage_gate import CoverageGate
from src.gates.lint_gate import LintGate
from src.gates.quality_gate import GateResult
from src.gates.test_gate import TestGate
class VerificationResult(BaseModel):
"""Result of quality gate verification.
Attributes:
all_passed: Whether all quality gates passed
gate_results: Dictionary mapping gate names to their results
"""
all_passed: bool = Field(..., description="Whether all quality gates passed")
gate_results: dict[str, GateResult] = Field(
..., description="Results from each quality gate"
)
class QualityOrchestrator:
"""Orchestrates execution of all quality gates in parallel.
The Quality Orchestrator is responsible for:
- Running all quality gates (build, lint, test, coverage) in parallel
- Aggregating gate results
- Determining overall pass/fail status
"""
def __init__(
self,
build_gate: BuildGate | None = None,
lint_gate: LintGate | None = None,
test_gate: TestGate | None = None,
coverage_gate: CoverageGate | None = None,
) -> None:
"""Initialize the Quality Orchestrator.
Args:
build_gate: Optional BuildGate instance (for testing/DI)
lint_gate: Optional LintGate instance (for testing/DI)
test_gate: Optional TestGate instance (for testing/DI)
coverage_gate: Optional CoverageGate instance (for testing/DI)
"""
# Use provided gates or create new instances
# This allows for dependency injection in tests
self.build_gate = build_gate
self.lint_gate = lint_gate
self.test_gate = test_gate
self.coverage_gate = coverage_gate
async def verify_completion(self) -> VerificationResult:
"""Verify that all quality gates pass.
Runs all quality gates in parallel and aggregates the results.
Returns:
VerificationResult: Aggregated results from all gates
Note:
This method runs all gates in parallel for efficiency.
Even if one gate fails, all gates will complete execution.
"""
# Instantiate gates if not provided (lazy initialization)
# This allows tests to inject mocks, while production uses real gates
build_gate = self.build_gate if self.build_gate is not None else BuildGate()
lint_gate = self.lint_gate if self.lint_gate is not None else LintGate()
test_gate = self.test_gate if self.test_gate is not None else TestGate()
coverage_gate = self.coverage_gate if self.coverage_gate is not None else CoverageGate()
# Run all gates in parallel using asyncio.gather
results = await asyncio.gather(
self._run_gate_async("build", build_gate),
self._run_gate_async("lint", lint_gate),
self._run_gate_async("test", test_gate),
self._run_gate_async("coverage", coverage_gate),
return_exceptions=True, # Capture exceptions instead of raising
)
# Build gate results dictionary
gate_results: dict[str, GateResult] = {}
gate_names = ["build", "lint", "test", "coverage"]
for gate_name, result in zip(gate_names, results, strict=True):
if isinstance(result, Exception):
# Convert exception to failed GateResult
gate_results[gate_name] = GateResult(
passed=False,
message=f"{gate_name.capitalize()} gate failed: Unexpected error: {result}",
details={"error": str(result), "exception_type": type(result).__name__},
)
elif isinstance(result, GateResult):
gate_results[gate_name] = result
else:
# Unexpected type - treat as error
gate_results[gate_name] = GateResult(
passed=False,
message=f"{gate_name.capitalize()} gate failed: Unexpected result type",
details={"error": f"Expected GateResult, got {type(result).__name__}"},
)
# Determine if all gates passed
all_passed = all(result.passed for result in gate_results.values())
return VerificationResult(all_passed=all_passed, gate_results=gate_results)
async def _run_gate_async(self, gate_name: str, gate: Any) -> GateResult:
"""Run a gate check asynchronously.
Args:
gate_name: Name of the gate for error reporting
gate: Gate instance to execute
Returns:
GateResult: Result from the gate check
Note:
This method handles both synchronous gates (production) and async mocks (testing).
Production gates are run in a thread pool to avoid blocking the event loop.
Test mocks can be async functions or lambdas returning coroutines.
"""
import inspect
from typing import cast
from unittest.mock import Mock
# Check if gate.check is an async function
if inspect.iscoroutinefunction(gate.check):
return cast(GateResult, await gate.check())
# Check if gate.check is a Mock/MagicMock (testing scenario)
mock_types = ("Mock", "MagicMock", "AsyncMock")
if isinstance(gate.check, Mock) or type(gate.check).__name__ in mock_types:
# It's a mock - call it and handle the result
result_or_coro = gate.check()
if asyncio.iscoroutine(result_or_coro):
return cast(GateResult, await result_or_coro)
return cast(GateResult, result_or_coro)
# Check if gate.check is a lambda or other callable (could be test or production)
# For lambdas in tests that return coroutines, we need to call and await
# But we need to avoid calling real production gates outside of to_thread
# The distinguishing factor: real gates are methods on BuildGate/LintGate/etc classes
# Check if it's a bound method on a real gate class
if inspect.ismethod(gate.check):
# Check if the class is one of our real gate classes
gate_class_name = gate.__class__.__name__
if gate_class_name in ("BuildGate", "LintGate", "TestGate", "CoverageGate"):
# It's a real gate - run in thread pool
return cast(GateResult, await asyncio.to_thread(gate.check))
# For any other callable (lambdas, functions), try calling and see what it returns
result_or_coro = gate.check()
if asyncio.iscoroutine(result_or_coro):
return cast(GateResult, await result_or_coro)
return cast(GateResult, result_or_coro)