Files
agent-skills/skills/code-review-excellence/reference/python.md
Jason Woltje d9bcdc4a8d feat: Initial agent-skills repo — 4 adapted skills for Mosaic Stack
Skills included:
- pr-reviewer: Adapted for Gitea/GitHub via platform-aware scripts
  (dropped fetch_pr_data.py and add_inline_comment.py, kept generate_review_files.py)
- code-review-excellence: Methodology and checklists (React, TS, Python, etc.)
- vercel-react-best-practices: 57 rules for React/Next.js performance
- tailwind-design-system: Tailwind CSS v4 patterns, CVA, design tokens

New shell scripts added to ~/.claude/scripts/git/:
- pr-diff.sh: Get PR diff (GitHub gh / Gitea API)
- pr-metadata.sh: Get PR metadata as normalized JSON

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:03:39 -06:00

24 KiB
Raw Blame History

Python Code Review Guide

Python 代码审查指南覆盖类型注解、async/await、测试、异常处理、性能优化等核心主题。

目录


类型注解

基础类型注解

# ❌ 没有类型注解IDE 无法提供帮助
def process_data(data, count):
    return data[:count]

# ✅ 使用类型注解
def process_data(data: str, count: int) -> str:
    return data[:count]

# ✅ 复杂类型使用 typing 模块
from typing import Optional, Union

def find_user(user_id: int) -> Optional[User]:
    """返回用户或 None"""
    return db.get(user_id)

def handle_input(value: Union[str, int]) -> str:
    """接受字符串或整数"""
    return str(value)

容器类型注解

from typing import List, Dict, Set, Tuple, Sequence

# ❌ 不精确的类型
def get_names(users: list) -> list:
    return [u.name for u in users]

# ✅ 精确的容器类型Python 3.9+ 可直接用 list[User]
def get_names(users: List[User]) -> List[str]:
    return [u.name for u in users]

# ✅ 只读序列用 Sequence更灵活
def process_items(items: Sequence[str]) -> int:
    return len(items)

# ✅ 字典类型
def count_words(text: str) -> Dict[str, int]:
    words: Dict[str, int] = {}
    for word in text.split():
        words[word] = words.get(word, 0) + 1
    return words

# ✅ 元组(固定长度和类型)
def get_point() -> Tuple[float, float]:
    return (1.0, 2.0)

# ✅ 可变长度元组
def get_scores() -> Tuple[int, ...]:
    return (90, 85, 92, 88)

泛型与 TypeVar

from typing import TypeVar, Generic, List, Callable

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

# ✅ 泛型函数
def first(items: List[T]) -> T | None:
    return items[0] if items else None

# ✅ 有约束的 TypeVar
from typing import Hashable
H = TypeVar('H', bound=Hashable)

def dedupe(items: List[H]) -> List[H]:
    return list(set(items))

# ✅ 泛型类
class Cache(Generic[K, V]):
    def __init__(self) -> None:
        self._data: Dict[K, V] = {}

    def get(self, key: K) -> V | None:
        return self._data.get(key)

    def set(self, key: K, value: V) -> None:
        self._data[key] = value

Callable 与回调函数

from typing import Callable, Awaitable

# ✅ 函数类型注解
Handler = Callable[[str, int], bool]

def register_handler(name: str, handler: Handler) -> None:
    handlers[name] = handler

# ✅ 异步回调
AsyncHandler = Callable[[str], Awaitable[dict]]

async def fetch_with_handler(
    url: str,
    handler: AsyncHandler
) -> dict:
    return await handler(url)

# ✅ 返回函数的函数
def create_multiplier(factor: int) -> Callable[[int], int]:
    def multiplier(x: int) -> int:
        return x * factor
    return multiplier

TypedDict 与结构化数据

from typing import TypedDict, Required, NotRequired

# ✅ 定义字典结构
class UserDict(TypedDict):
    id: int
    name: str
    email: str
    age: NotRequired[int]  # Python 3.11+

def create_user(data: UserDict) -> User:
    return User(**data)

# ✅ 部分必需字段
class ConfigDict(TypedDict, total=False):
    debug: bool
    timeout: int
    host: Required[str]  # 这个必须有

Protocol 与结构化子类型

from typing import Protocol, runtime_checkable

# ✅ 定义协议(鸭子类型的类型检查)
class Readable(Protocol):
    def read(self, size: int = -1) -> bytes: ...

class Closeable(Protocol):
    def close(self) -> None: ...

# 组合协议
class ReadableCloseable(Readable, Closeable, Protocol):
    pass

def process_stream(stream: Readable) -> bytes:
    return stream.read()

# ✅ 运行时可检查的协议
@runtime_checkable
class Drawable(Protocol):
    def draw(self) -> None: ...

def render(obj: object) -> None:
    if isinstance(obj, Drawable):  # 运行时检查
        obj.draw()

异步编程

async/await 基础

import asyncio

# ❌ 同步阻塞调用
def fetch_all_sync(urls: list[str]) -> list[str]:
    results = []
    for url in urls:
        results.append(requests.get(url).text)  # 串行执行
    return results

# ✅ 异步并发调用
async def fetch_url(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def fetch_all(urls: list[str]) -> list[str]:
    tasks = [fetch_url(url) for url in urls]
    return await asyncio.gather(*tasks)  # 并发执行

异步上下文管理器

from contextlib import asynccontextmanager
from typing import AsyncIterator

# ✅ 异步上下文管理器类
class AsyncDatabase:
    async def __aenter__(self) -> 'AsyncDatabase':
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.disconnect()

# ✅ 使用装饰器
@asynccontextmanager
async def get_connection() -> AsyncIterator[Connection]:
    conn = await create_connection()
    try:
        yield conn
    finally:
        await conn.close()

async def query_data():
    async with get_connection() as conn:
        return await conn.fetch("SELECT * FROM users")

异步迭代器

from typing import AsyncIterator

# ✅ 异步生成器
async def fetch_pages(url: str) -> AsyncIterator[dict]:
    page = 1
    while True:
        data = await fetch_page(url, page)
        if not data['items']:
            break
        yield data
        page += 1

# ✅ 使用异步迭代
async def process_all_pages():
    async for page in fetch_pages("https://api.example.com"):
        await process_page(page)

任务管理与取消

import asyncio

# ❌ 忘记处理取消
async def bad_worker():
    while True:
        await do_work()  # 无法正常取消

# ✅ 正确处理取消
async def good_worker():
    try:
        while True:
            await do_work()
    except asyncio.CancelledError:
        await cleanup()  # 清理资源
        raise  # 重新抛出,让调用者知道已取消

# ✅ 超时控制
async def fetch_with_timeout(url: str) -> str:
    try:
        async with asyncio.timeout(10):  # Python 3.11+
            return await fetch_url(url)
    except asyncio.TimeoutError:
        return ""

# ✅ 任务组Python 3.11+
async def fetch_multiple():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_url("url1"))
        task2 = tg.create_task(fetch_url("url2"))
    # 所有任务完成后自动等待,异常会传播
    return task1.result(), task2.result()

同步与异步混合

import asyncio
from concurrent.futures import ThreadPoolExecutor

# ✅ 在异步代码中运行同步函数
async def run_sync_in_async():
    loop = asyncio.get_event_loop()
    # 使用线程池执行阻塞操作
    result = await loop.run_in_executor(
        None,  # 默认线程池
        blocking_io_function,
        arg1, arg2
    )
    return result

# ✅ 在同步代码中运行异步函数
def run_async_in_sync():
    return asyncio.run(async_function())

# ❌ 不要在异步代码中使用 time.sleep
async def bad_delay():
    time.sleep(1)  # 会阻塞整个事件循环!

# ✅ 使用 asyncio.sleep
async def good_delay():
    await asyncio.sleep(1)

信号量与限流

import asyncio

# ✅ 使用信号量限制并发
async def fetch_with_limit(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(url: str) -> str:
        async with semaphore:
            return await fetch_url(url)

    return await asyncio.gather(*[fetch_one(url) for url in urls])

# ✅ 使用 asyncio.Queue 实现生产者-消费者
async def producer_consumer():
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=100)

    async def producer():
        for item in items:
            await queue.put(item)
        await queue.put(None)  # 结束信号

    async def consumer():
        while True:
            item = await queue.get()
            if item is None:
                break
            await process(item)
            queue.task_done()

    await asyncio.gather(producer(), consumer())

异常处理

异常捕获最佳实践

# ❌ Catching too broad
try:
    result = risky_operation()
except:  # Catches everything, even KeyboardInterrupt!
    pass

# ❌ 捕获 Exception 但不处理
try:
    result = risky_operation()
except Exception:
    pass  # 吞掉所有异常,难以调试

# ✅ Catch specific exceptions
try:
    result = risky_operation()
except ValueError as e:
    logger.error(f"Invalid value: {e}")
    raise
except IOError as e:
    logger.error(f"IO error: {e}")
    return default_value

# ✅ 多个异常类型
try:
    result = parse_and_process(data)
except (ValueError, TypeError, KeyError) as e:
    logger.error(f"Data error: {e}")
    raise DataProcessingError(str(e)) from e

异常链

# ❌ 丢失原始异常信息
try:
    result = external_api.call()
except APIError as e:
    raise RuntimeError("API failed")  # 丢失了原因

# ✅ 使用 from 保留异常链
try:
    result = external_api.call()
except APIError as e:
    raise RuntimeError("API failed") from e

# ✅ 显式断开异常链(少见情况)
try:
    result = external_api.call()
except APIError:
    raise RuntimeError("API failed") from None

自定义异常

# ✅ 定义业务异常层次结构
class AppError(Exception):
    """应用基础异常"""
    pass

class ValidationError(AppError):
    """数据验证错误"""
    def __init__(self, field: str, message: str):
        self.field = field
        self.message = message
        super().__init__(f"{field}: {message}")

class NotFoundError(AppError):
    """资源未找到"""
    def __init__(self, resource: str, id: str | int):
        self.resource = resource
        self.id = id
        super().__init__(f"{resource} with id {id} not found")

# 使用
def get_user(user_id: int) -> User:
    user = db.get(user_id)
    if not user:
        raise NotFoundError("User", user_id)
    return user

上下文管理器中的异常

from contextlib import contextmanager

# ✅ 正确处理上下文管理器中的异常
@contextmanager
def transaction():
    conn = get_connection()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

# ✅ 使用 ExceptionGroupPython 3.11+
def process_batch(items: list) -> None:
    errors = []
    for item in items:
        try:
            process(item)
        except Exception as e:
            errors.append(e)

    if errors:
        raise ExceptionGroup("Batch processing failed", errors)

常见陷阱

可变默认参数

# ❌ Mutable default arguments
def add_item(item, items=[]):  # Bug! Shared across calls
    items.append(item)
    return items

# 问题演示
add_item(1)  # [1]
add_item(2)  # [1, 2] 而不是 [2]

# ✅ Use None as default
def add_item(item, items=None):
    if items is None:
        items = []
    items.append(item)
    return items

# ✅ 或使用 dataclass 的 field
from dataclasses import dataclass, field

@dataclass
class Container:
    items: list = field(default_factory=list)

可变类属性

# ❌ Using mutable class attributes
class User:
    permissions = []  # Shared across all instances!

# 问题演示
u1 = User()
u2 = User()
u1.permissions.append("admin")
print(u2.permissions)  # ["admin"] - 被意外共享!

# ✅ Initialize in __init__
class User:
    def __init__(self):
        self.permissions = []

# ✅ 使用 dataclass
@dataclass
class User:
    permissions: list = field(default_factory=list)

循环中的闭包

# ❌ 闭包捕获循环变量
funcs = []
for i in range(3):
    funcs.append(lambda: i)

print([f() for f in funcs])  # [2, 2, 2] 而不是 [0, 1, 2]

# ✅ 使用默认参数捕获值
funcs = []
for i in range(3):
    funcs.append(lambda i=i: i)

print([f() for f in funcs])  # [0, 1, 2]

# ✅ 使用 functools.partial
from functools import partial

funcs = [partial(lambda x: x, i) for i in range(3)]

is vs ==

# ❌ 用 is 比较值
if x is 1000:  # 可能不工作!
    pass

# Python 会缓存小整数 (-5 到 256)
a = 256
b = 256
a is b  # True

a = 257
b = 257
a is b  # False

# ✅ 用 == 比较值
if x == 1000:
    pass

# ✅ is 只用于 None 和单例
if x is None:
    pass

if x is True:  # 严格检查布尔值
    pass

字符串拼接性能

# ❌ 循环中拼接字符串
result = ""
for item in large_list:
    result += str(item)  # O(n²) 复杂度

# ✅ 使用 join
result = "".join(str(item) for item in large_list)  # O(n)

# ✅ 使用 StringIO 构建大字符串
from io import StringIO

buffer = StringIO()
for item in large_list:
    buffer.write(str(item))
result = buffer.getvalue()

测试最佳实践

pytest 基础

import pytest

# ✅ 清晰的测试命名
def test_user_creation_with_valid_email():
    user = User(email="test@example.com")
    assert user.email == "test@example.com"

def test_user_creation_with_invalid_email_raises_error():
    with pytest.raises(ValidationError):
        User(email="invalid")

# ✅ 使用参数化测试
@pytest.mark.parametrize("input,expected", [
    ("hello", "HELLO"),
    ("World", "WORLD"),
    ("", ""),
    ("123", "123"),
])
def test_uppercase(input: str, expected: str):
    assert input.upper() == expected

# ✅ 测试异常
def test_division_by_zero():
    with pytest.raises(ZeroDivisionError) as exc_info:
        1 / 0
    assert "division by zero" in str(exc_info.value)

Fixtures

import pytest
from typing import Generator

# ✅ 基础 fixture
@pytest.fixture
def user() -> User:
    return User(name="Test User", email="test@example.com")

def test_user_name(user: User):
    assert user.name == "Test User"

# ✅ 带清理的 fixture
@pytest.fixture
def database() -> Generator[Database, None, None]:
    db = Database()
    db.connect()
    yield db
    db.disconnect()  # 测试后清理

# ✅ 异步 fixture
@pytest.fixture
async def async_client() -> AsyncGenerator[AsyncClient, None]:
    async with AsyncClient() as client:
        yield client

# ✅ 共享 fixtureconftest.py
# conftest.py
@pytest.fixture(scope="session")
def app():
    """整个测试会话共享的 app 实例"""
    return create_app()

@pytest.fixture(scope="module")
def db(app):
    """每个测试模块共享的数据库连接"""
    return app.db

Mock 与 Patch

from unittest.mock import Mock, patch, AsyncMock

# ✅ Mock 外部依赖
def test_send_email():
    mock_client = Mock()
    mock_client.send.return_value = True

    service = EmailService(client=mock_client)
    result = service.send_welcome_email("user@example.com")

    assert result is True
    mock_client.send.assert_called_once_with(
        to="user@example.com",
        subject="Welcome!",
        body=ANY,
    )

# ✅ Patch 模块级函数
@patch("myapp.services.external_api.call")
def test_with_patched_api(mock_call):
    mock_call.return_value = {"status": "ok"}

    result = process_data()

    assert result["status"] == "ok"

# ✅ 异步 Mock
async def test_async_function():
    mock_fetch = AsyncMock(return_value={"data": "test"})

    with patch("myapp.client.fetch", mock_fetch):
        result = await get_data()

    assert result == {"data": "test"}

测试组织

# ✅ 使用类组织相关测试
class TestUserAuthentication:
    """用户认证相关测试"""

    def test_login_with_valid_credentials(self, user):
        assert authenticate(user.email, "password") is True

    def test_login_with_invalid_password(self, user):
        assert authenticate(user.email, "wrong") is False

    def test_login_locks_after_failed_attempts(self, user):
        for _ in range(5):
            authenticate(user.email, "wrong")
        assert user.is_locked is True

# ✅ 使用 mark 标记测试
@pytest.mark.slow
def test_large_data_processing():
    pass

@pytest.mark.integration
def test_database_connection():
    pass

# 运行特定标记的测试pytest -m "not slow"

覆盖率与质量

# pytest.ini 或 pyproject.toml
[tool.pytest.ini_options]
addopts = "--cov=myapp --cov-report=term-missing --cov-fail-under=80"
testpaths = ["tests"]

# ✅ 测试边界情况
def test_empty_input():
    assert process([]) == []

def test_none_input():
    with pytest.raises(TypeError):
        process(None)

def test_large_input():
    large_data = list(range(100000))
    result = process(large_data)
    assert len(result) == 100000

性能优化

数据结构选择

# ❌ 列表查找 O(n)
if item in large_list:  # 慢
    pass

# ✅ 集合查找 O(1)
large_set = set(large_list)
if item in large_set:  # 快
    pass

# ✅ 使用 collections 模块
from collections import Counter, defaultdict, deque

# 计数
word_counts = Counter(words)
most_common = word_counts.most_common(10)

# 默认字典
graph = defaultdict(list)
graph[node].append(neighbor)

# 双端队列(两端操作 O(1)
queue = deque()
queue.appendleft(item)  # O(1) vs list.insert(0, item) O(n)

生成器与迭代器

# ❌ 一次性加载所有数据
def get_all_users():
    return [User(row) for row in db.fetch_all()]  # 内存占用大

# ✅ 使用生成器
def get_all_users():
    for row in db.fetch_all():
        yield User(row)  # 懒加载

# ✅ 生成器表达式
sum_of_squares = sum(x**2 for x in range(1000000))  # 不创建列表

# ✅ itertools 模块
from itertools import islice, chain, groupby

# 只取前 10 个
first_10 = list(islice(infinite_generator(), 10))

# 链接多个迭代器
all_items = chain(list1, list2, list3)

# 分组
for key, group in groupby(sorted(items, key=get_key), key=get_key):
    process_group(key, list(group))

缓存

from functools import lru_cache, cache

# ✅ LRU 缓存
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
    return sum(i**2 for i in range(n))

# ✅ 无限缓存Python 3.9+
@cache
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# ✅ 手动缓存(需要更多控制时)
class DataService:
    def __init__(self):
        self._cache: dict[str, Any] = {}
        self._cache_ttl: dict[str, float] = {}

    def get_data(self, key: str) -> Any:
        if key in self._cache:
            if time.time() < self._cache_ttl[key]:
                return self._cache[key]

        data = self._fetch_data(key)
        self._cache[key] = data
        self._cache_ttl[key] = time.time() + 300  # 5 分钟
        return data

并行处理

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# ✅ IO 密集型使用线程池
def fetch_all_urls(urls: list[str]) -> list[str]:
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(fetch_url, urls))
    return results

# ✅ CPU 密集型使用进程池
def process_large_dataset(data: list) -> list:
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(heavy_computation, data))
    return results

# ✅ 使用 as_completed 获取最先完成的结果
from concurrent.futures import as_completed

with ThreadPoolExecutor() as executor:
    futures = {executor.submit(fetch, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
        except Exception as e:
            print(f"{url} failed: {e}")

代码风格

PEP 8 要点

# ✅ 命名规范
class MyClass:  # 类名 PascalCase
    MAX_SIZE = 100  # 常量 UPPER_SNAKE_CASE

    def method_name(self):  # 方法 snake_case
        local_var = 1  # 变量 snake_case

# ✅ 导入顺序
# 1. 标准库
import os
import sys
from typing import Optional

# 2. 第三方库
import numpy as np
import pandas as pd

# 3. 本地模块
from myapp import config
from myapp.utils import helper

# ✅ 行长度限制79 或 88 字符)
# 长表达式的换行
result = (
    long_function_name(arg1, arg2, arg3)
    + another_long_function(arg4, arg5)
)

# ✅ 空行规范
class MyClass:
    """类文档字符串"""

    def method_one(self):
        pass

    def method_two(self):  # 方法间一个空行
        pass


def top_level_function():  # 顶层定义间两个空行
    pass

文档字符串

# ✅ Google 风格文档字符串
def calculate_area(width: float, height: float) -> float:
    """计算矩形面积。

    Args:
        width: 矩形的宽度(必须为正数)。
        height: 矩形的高度(必须为正数)。

    Returns:
        矩形的面积。

    Raises:
        ValueError: 如果 width 或 height 为负数。

    Example:
        >>> calculate_area(3, 4)
        12.0
    """
    if width < 0 or height < 0:
        raise ValueError("Dimensions must be positive")
    return width * height

# ✅ 类文档字符串
class DataProcessor:
    """处理和转换数据的工具类。

    Attributes:
        source: 数据来源路径。
        format: 输出格式('json' 或 'csv')。

    Example:
        >>> processor = DataProcessor("data.csv")
        >>> processor.process()
    """

现代 Python 特性

# ✅ f-stringPython 3.6+
name = "World"
print(f"Hello, {name}!")

# 带表达式
print(f"Result: {1 + 2 = }")  # "Result: 1 + 2 = 3"

# ✅ 海象运算符Python 3.8+
if (n := len(items)) > 10:
    print(f"List has {n} items")

# ✅ 位置参数分隔符Python 3.8+
def greet(name, /, greeting="Hello", *, punctuation="!"):
    """name 只能位置传参punctuation 只能关键字传参"""
    return f"{greeting}, {name}{punctuation}"

# ✅ 模式匹配Python 3.10+
def handle_response(response: dict):
    match response:
        case {"status": "ok", "data": data}:
            return process_data(data)
        case {"status": "error", "message": msg}:
            raise APIError(msg)
        case _:
            raise ValueError("Unknown response format")

Review Checklist

类型安全

  • 函数有类型注解(参数和返回值)
  • 使用 Optional 明确可能为 None
  • 泛型类型正确使用
  • mypy 检查通过(无错误)
  • 避免使用 Any,必要时添加注释说明

异步代码

  • async/await 正确配对使用
  • 没有在异步代码中使用阻塞调用
  • 正确处理 CancelledError
  • 使用 asyncio.gatherTaskGroup 并发执行
  • 资源正确清理async context manager

异常处理

  • 捕获特定异常类型,不使用裸 except:
  • 异常链使用 from 保留原因
  • 自定义异常继承自合适的基类
  • 异常信息有意义,便于调试

数据结构

  • 没有使用可变默认参数list、dict、set
  • 类属性不是可变对象
  • 选择正确的数据结构set vs list 查找)
  • 大数据集使用生成器而非列表

测试

  • 测试覆盖率达标(建议 ≥80%
  • 测试命名清晰描述测试场景
  • 边界情况有测试覆盖
  • Mock 正确隔离外部依赖
  • 异步代码有对应的异步测试

代码风格

  • 遵循 PEP 8 风格指南
  • 函数和类有 docstring
  • 导入顺序正确(标准库、第三方、本地)
  • 命名一致且有意义
  • 使用现代 Python 特性f-string、walrus operator 等)

性能

  • 避免循环中重复创建对象
  • 字符串拼接使用 join
  • 合理使用缓存(@lru_cache
  • IO/CPU 密集型使用合适的并行方式