Files
Jason Woltje d9bcdc4a8d feat: Initial agent-skills repo — 4 adapted skills for Mosaic Stack
Skills included:
- pr-reviewer: Adapted for Gitea/GitHub via platform-aware scripts
  (dropped fetch_pr_data.py and add_inline_comment.py, kept generate_review_files.py)
- code-review-excellence: Methodology and checklists (React, TS, Python, etc.)
- vercel-react-best-practices: 57 rules for React/Next.js performance
- tailwind-design-system: Tailwind CSS v4 patterns, CVA, design tokens

New shell scripts added to ~/.claude/scripts/git/:
- pr-diff.sh: Get PR diff (GitHub gh / Gitea API)
- pr-metadata.sh: Get PR metadata as normalized JSON

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:03:39 -06:00

1070 lines
24 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Python Code Review Guide
> Python 代码审查指南覆盖类型注解、async/await、测试、异常处理、性能优化等核心主题。
## 目录
- [类型注解](#类型注解)
- [异步编程](#异步编程)
- [异常处理](#异常处理)
- [常见陷阱](#常见陷阱)
- [测试最佳实践](#测试最佳实践)
- [性能优化](#性能优化)
- [代码风格](#代码风格)
- [Review Checklist](#review-checklist)
---
## 类型注解
### 基础类型注解
```python
# ❌ 没有类型注解IDE 无法提供帮助
def process_data(data, count):
return data[:count]
# ✅ 使用类型注解
def process_data(data: str, count: int) -> str:
return data[:count]
# ✅ 复杂类型使用 typing 模块
from typing import Optional, Union
def find_user(user_id: int) -> Optional[User]:
"""返回用户或 None"""
return db.get(user_id)
def handle_input(value: Union[str, int]) -> str:
"""接受字符串或整数"""
return str(value)
```
### 容器类型注解
```python
from typing import List, Dict, Set, Tuple, Sequence
# ❌ 不精确的类型
def get_names(users: list) -> list:
return [u.name for u in users]
# ✅ 精确的容器类型Python 3.9+ 可直接用 list[User]
def get_names(users: List[User]) -> List[str]:
return [u.name for u in users]
# ✅ 只读序列用 Sequence更灵活
def process_items(items: Sequence[str]) -> int:
return len(items)
# ✅ 字典类型
def count_words(text: str) -> Dict[str, int]:
words: Dict[str, int] = {}
for word in text.split():
words[word] = words.get(word, 0) + 1
return words
# ✅ 元组(固定长度和类型)
def get_point() -> Tuple[float, float]:
return (1.0, 2.0)
# ✅ 可变长度元组
def get_scores() -> Tuple[int, ...]:
return (90, 85, 92, 88)
```
### 泛型与 TypeVar
```python
from typing import TypeVar, Generic, List, Callable
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
# ✅ 泛型函数
def first(items: List[T]) -> T | None:
return items[0] if items else None
# ✅ 有约束的 TypeVar
from typing import Hashable
H = TypeVar('H', bound=Hashable)
def dedupe(items: List[H]) -> List[H]:
return list(set(items))
# ✅ 泛型类
class Cache(Generic[K, V]):
def __init__(self) -> None:
self._data: Dict[K, V] = {}
def get(self, key: K) -> V | None:
return self._data.get(key)
def set(self, key: K, value: V) -> None:
self._data[key] = value
```
### Callable 与回调函数
```python
from typing import Callable, Awaitable
# ✅ 函数类型注解
Handler = Callable[[str, int], bool]
def register_handler(name: str, handler: Handler) -> None:
handlers[name] = handler
# ✅ 异步回调
AsyncHandler = Callable[[str], Awaitable[dict]]
async def fetch_with_handler(
url: str,
handler: AsyncHandler
) -> dict:
return await handler(url)
# ✅ 返回函数的函数
def create_multiplier(factor: int) -> Callable[[int], int]:
def multiplier(x: int) -> int:
return x * factor
return multiplier
```
### TypedDict 与结构化数据
```python
from typing import TypedDict, Required, NotRequired
# ✅ 定义字典结构
class UserDict(TypedDict):
id: int
name: str
email: str
age: NotRequired[int] # Python 3.11+
def create_user(data: UserDict) -> User:
return User(**data)
# ✅ 部分必需字段
class ConfigDict(TypedDict, total=False):
debug: bool
timeout: int
host: Required[str] # 这个必须有
```
### Protocol 与结构化子类型
```python
from typing import Protocol, runtime_checkable
# ✅ 定义协议(鸭子类型的类型检查)
class Readable(Protocol):
def read(self, size: int = -1) -> bytes: ...
class Closeable(Protocol):
def close(self) -> None: ...
# 组合协议
class ReadableCloseable(Readable, Closeable, Protocol):
pass
def process_stream(stream: Readable) -> bytes:
return stream.read()
# ✅ 运行时可检查的协议
@runtime_checkable
class Drawable(Protocol):
def draw(self) -> None: ...
def render(obj: object) -> None:
if isinstance(obj, Drawable): # 运行时检查
obj.draw()
```
---
## 异步编程
### async/await 基础
```python
import asyncio
# ❌ 同步阻塞调用
def fetch_all_sync(urls: list[str]) -> list[str]:
results = []
for url in urls:
results.append(requests.get(url).text) # 串行执行
return results
# ✅ 异步并发调用
async def fetch_url(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls: list[str]) -> list[str]:
tasks = [fetch_url(url) for url in urls]
return await asyncio.gather(*tasks) # 并发执行
```
### 异步上下文管理器
```python
from contextlib import asynccontextmanager
from typing import AsyncIterator
# ✅ 异步上下文管理器类
class AsyncDatabase:
async def __aenter__(self) -> 'AsyncDatabase':
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.disconnect()
# ✅ 使用装饰器
@asynccontextmanager
async def get_connection() -> AsyncIterator[Connection]:
conn = await create_connection()
try:
yield conn
finally:
await conn.close()
async def query_data():
async with get_connection() as conn:
return await conn.fetch("SELECT * FROM users")
```
### 异步迭代器
```python
from typing import AsyncIterator
# ✅ 异步生成器
async def fetch_pages(url: str) -> AsyncIterator[dict]:
page = 1
while True:
data = await fetch_page(url, page)
if not data['items']:
break
yield data
page += 1
# ✅ 使用异步迭代
async def process_all_pages():
async for page in fetch_pages("https://api.example.com"):
await process_page(page)
```
### 任务管理与取消
```python
import asyncio
# ❌ 忘记处理取消
async def bad_worker():
while True:
await do_work() # 无法正常取消
# ✅ 正确处理取消
async def good_worker():
try:
while True:
await do_work()
except asyncio.CancelledError:
await cleanup() # 清理资源
raise # 重新抛出,让调用者知道已取消
# ✅ 超时控制
async def fetch_with_timeout(url: str) -> str:
try:
async with asyncio.timeout(10): # Python 3.11+
return await fetch_url(url)
except asyncio.TimeoutError:
return ""
# ✅ 任务组Python 3.11+
async def fetch_multiple():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_url("url1"))
task2 = tg.create_task(fetch_url("url2"))
# 所有任务完成后自动等待,异常会传播
return task1.result(), task2.result()
```
### 同步与异步混合
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
# ✅ 在异步代码中运行同步函数
async def run_sync_in_async():
loop = asyncio.get_event_loop()
# 使用线程池执行阻塞操作
result = await loop.run_in_executor(
None, # 默认线程池
blocking_io_function,
arg1, arg2
)
return result
# ✅ 在同步代码中运行异步函数
def run_async_in_sync():
return asyncio.run(async_function())
# ❌ 不要在异步代码中使用 time.sleep
async def bad_delay():
time.sleep(1) # 会阻塞整个事件循环!
# ✅ 使用 asyncio.sleep
async def good_delay():
await asyncio.sleep(1)
```
### 信号量与限流
```python
import asyncio
# ✅ 使用信号量限制并发
async def fetch_with_limit(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(url: str) -> str:
async with semaphore:
return await fetch_url(url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
# ✅ 使用 asyncio.Queue 实现生产者-消费者
async def producer_consumer():
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=100)
async def producer():
for item in items:
await queue.put(item)
await queue.put(None) # 结束信号
async def consumer():
while True:
item = await queue.get()
if item is None:
break
await process(item)
queue.task_done()
await asyncio.gather(producer(), consumer())
```
---
## 异常处理
### 异常捕获最佳实践
```python
# ❌ Catching too broad
try:
result = risky_operation()
except: # Catches everything, even KeyboardInterrupt!
pass
# ❌ 捕获 Exception 但不处理
try:
result = risky_operation()
except Exception:
pass # 吞掉所有异常,难以调试
# ✅ Catch specific exceptions
try:
result = risky_operation()
except ValueError as e:
logger.error(f"Invalid value: {e}")
raise
except IOError as e:
logger.error(f"IO error: {e}")
return default_value
# ✅ 多个异常类型
try:
result = parse_and_process(data)
except (ValueError, TypeError, KeyError) as e:
logger.error(f"Data error: {e}")
raise DataProcessingError(str(e)) from e
```
### 异常链
```python
# ❌ 丢失原始异常信息
try:
result = external_api.call()
except APIError as e:
raise RuntimeError("API failed") # 丢失了原因
# ✅ 使用 from 保留异常链
try:
result = external_api.call()
except APIError as e:
raise RuntimeError("API failed") from e
# ✅ 显式断开异常链(少见情况)
try:
result = external_api.call()
except APIError:
raise RuntimeError("API failed") from None
```
### 自定义异常
```python
# ✅ 定义业务异常层次结构
class AppError(Exception):
"""应用基础异常"""
pass
class ValidationError(AppError):
"""数据验证错误"""
def __init__(self, field: str, message: str):
self.field = field
self.message = message
super().__init__(f"{field}: {message}")
class NotFoundError(AppError):
"""资源未找到"""
def __init__(self, resource: str, id: str | int):
self.resource = resource
self.id = id
super().__init__(f"{resource} with id {id} not found")
# 使用
def get_user(user_id: int) -> User:
user = db.get(user_id)
if not user:
raise NotFoundError("User", user_id)
return user
```
### 上下文管理器中的异常
```python
from contextlib import contextmanager
# ✅ 正确处理上下文管理器中的异常
@contextmanager
def transaction():
conn = get_connection()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# ✅ 使用 ExceptionGroupPython 3.11+
def process_batch(items: list) -> None:
errors = []
for item in items:
try:
process(item)
except Exception as e:
errors.append(e)
if errors:
raise ExceptionGroup("Batch processing failed", errors)
```
---
## 常见陷阱
### 可变默认参数
```python
# ❌ Mutable default arguments
def add_item(item, items=[]): # Bug! Shared across calls
items.append(item)
return items
# 问题演示
add_item(1) # [1]
add_item(2) # [1, 2] 而不是 [2]
# ✅ Use None as default
def add_item(item, items=None):
if items is None:
items = []
items.append(item)
return items
# ✅ 或使用 dataclass 的 field
from dataclasses import dataclass, field
@dataclass
class Container:
items: list = field(default_factory=list)
```
### 可变类属性
```python
# ❌ Using mutable class attributes
class User:
permissions = [] # Shared across all instances!
# 问题演示
u1 = User()
u2 = User()
u1.permissions.append("admin")
print(u2.permissions) # ["admin"] - 被意外共享!
# ✅ Initialize in __init__
class User:
def __init__(self):
self.permissions = []
# ✅ 使用 dataclass
@dataclass
class User:
permissions: list = field(default_factory=list)
```
### 循环中的闭包
```python
# ❌ 闭包捕获循环变量
funcs = []
for i in range(3):
funcs.append(lambda: i)
print([f() for f in funcs]) # [2, 2, 2] 而不是 [0, 1, 2]
# ✅ 使用默认参数捕获值
funcs = []
for i in range(3):
funcs.append(lambda i=i: i)
print([f() for f in funcs]) # [0, 1, 2]
# ✅ 使用 functools.partial
from functools import partial
funcs = [partial(lambda x: x, i) for i in range(3)]
```
### is vs ==
```python
# ❌ 用 is 比较值
if x is 1000: # 可能不工作!
pass
# Python 会缓存小整数 (-5 到 256)
a = 256
b = 256
a is b # True
a = 257
b = 257
a is b # False
# ✅ 用 == 比较值
if x == 1000:
pass
# ✅ is 只用于 None 和单例
if x is None:
pass
if x is True: # 严格检查布尔值
pass
```
### 字符串拼接性能
```python
# ❌ 循环中拼接字符串
result = ""
for item in large_list:
result += str(item) # O(n²) 复杂度
# ✅ 使用 join
result = "".join(str(item) for item in large_list) # O(n)
# ✅ 使用 StringIO 构建大字符串
from io import StringIO
buffer = StringIO()
for item in large_list:
buffer.write(str(item))
result = buffer.getvalue()
```
---
## 测试最佳实践
### pytest 基础
```python
import pytest
# ✅ 清晰的测试命名
def test_user_creation_with_valid_email():
user = User(email="test@example.com")
assert user.email == "test@example.com"
def test_user_creation_with_invalid_email_raises_error():
with pytest.raises(ValidationError):
User(email="invalid")
# ✅ 使用参数化测试
@pytest.mark.parametrize("input,expected", [
("hello", "HELLO"),
("World", "WORLD"),
("", ""),
("123", "123"),
])
def test_uppercase(input: str, expected: str):
assert input.upper() == expected
# ✅ 测试异常
def test_division_by_zero():
with pytest.raises(ZeroDivisionError) as exc_info:
1 / 0
assert "division by zero" in str(exc_info.value)
```
### Fixtures
```python
import pytest
from typing import Generator
# ✅ 基础 fixture
@pytest.fixture
def user() -> User:
return User(name="Test User", email="test@example.com")
def test_user_name(user: User):
assert user.name == "Test User"
# ✅ 带清理的 fixture
@pytest.fixture
def database() -> Generator[Database, None, None]:
db = Database()
db.connect()
yield db
db.disconnect() # 测试后清理
# ✅ 异步 fixture
@pytest.fixture
async def async_client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient() as client:
yield client
# ✅ 共享 fixtureconftest.py
# conftest.py
@pytest.fixture(scope="session")
def app():
"""整个测试会话共享的 app 实例"""
return create_app()
@pytest.fixture(scope="module")
def db(app):
"""每个测试模块共享的数据库连接"""
return app.db
```
### Mock 与 Patch
```python
from unittest.mock import Mock, patch, AsyncMock
# ✅ Mock 外部依赖
def test_send_email():
mock_client = Mock()
mock_client.send.return_value = True
service = EmailService(client=mock_client)
result = service.send_welcome_email("user@example.com")
assert result is True
mock_client.send.assert_called_once_with(
to="user@example.com",
subject="Welcome!",
body=ANY,
)
# ✅ Patch 模块级函数
@patch("myapp.services.external_api.call")
def test_with_patched_api(mock_call):
mock_call.return_value = {"status": "ok"}
result = process_data()
assert result["status"] == "ok"
# ✅ 异步 Mock
async def test_async_function():
mock_fetch = AsyncMock(return_value={"data": "test"})
with patch("myapp.client.fetch", mock_fetch):
result = await get_data()
assert result == {"data": "test"}
```
### 测试组织
```python
# ✅ 使用类组织相关测试
class TestUserAuthentication:
"""用户认证相关测试"""
def test_login_with_valid_credentials(self, user):
assert authenticate(user.email, "password") is True
def test_login_with_invalid_password(self, user):
assert authenticate(user.email, "wrong") is False
def test_login_locks_after_failed_attempts(self, user):
for _ in range(5):
authenticate(user.email, "wrong")
assert user.is_locked is True
# ✅ 使用 mark 标记测试
@pytest.mark.slow
def test_large_data_processing():
pass
@pytest.mark.integration
def test_database_connection():
pass
# 运行特定标记的测试pytest -m "not slow"
```
### 覆盖率与质量
```python
# pytest.ini 或 pyproject.toml
[tool.pytest.ini_options]
addopts = "--cov=myapp --cov-report=term-missing --cov-fail-under=80"
testpaths = ["tests"]
# ✅ 测试边界情况
def test_empty_input():
assert process([]) == []
def test_none_input():
with pytest.raises(TypeError):
process(None)
def test_large_input():
large_data = list(range(100000))
result = process(large_data)
assert len(result) == 100000
```
---
## 性能优化
### 数据结构选择
```python
# ❌ 列表查找 O(n)
if item in large_list: # 慢
pass
# ✅ 集合查找 O(1)
large_set = set(large_list)
if item in large_set: # 快
pass
# ✅ 使用 collections 模块
from collections import Counter, defaultdict, deque
# 计数
word_counts = Counter(words)
most_common = word_counts.most_common(10)
# 默认字典
graph = defaultdict(list)
graph[node].append(neighbor)
# 双端队列(两端操作 O(1)
queue = deque()
queue.appendleft(item) # O(1) vs list.insert(0, item) O(n)
```
### 生成器与迭代器
```python
# ❌ 一次性加载所有数据
def get_all_users():
return [User(row) for row in db.fetch_all()] # 内存占用大
# ✅ 使用生成器
def get_all_users():
for row in db.fetch_all():
yield User(row) # 懒加载
# ✅ 生成器表达式
sum_of_squares = sum(x**2 for x in range(1000000)) # 不创建列表
# ✅ itertools 模块
from itertools import islice, chain, groupby
# 只取前 10 个
first_10 = list(islice(infinite_generator(), 10))
# 链接多个迭代器
all_items = chain(list1, list2, list3)
# 分组
for key, group in groupby(sorted(items, key=get_key), key=get_key):
process_group(key, list(group))
```
### 缓存
```python
from functools import lru_cache, cache
# ✅ LRU 缓存
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
return sum(i**2 for i in range(n))
# ✅ 无限缓存Python 3.9+
@cache
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# ✅ 手动缓存(需要更多控制时)
class DataService:
def __init__(self):
self._cache: dict[str, Any] = {}
self._cache_ttl: dict[str, float] = {}
def get_data(self, key: str) -> Any:
if key in self._cache:
if time.time() < self._cache_ttl[key]:
return self._cache[key]
data = self._fetch_data(key)
self._cache[key] = data
self._cache_ttl[key] = time.time() + 300 # 5 分钟
return data
```
### 并行处理
```python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ✅ IO 密集型使用线程池
def fetch_all_urls(urls: list[str]) -> list[str]:
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
return results
# ✅ CPU 密集型使用进程池
def process_large_dataset(data: list) -> list:
with ProcessPoolExecutor() as executor:
results = list(executor.map(heavy_computation, data))
return results
# ✅ 使用 as_completed 获取最先完成的结果
from concurrent.futures import as_completed
with ThreadPoolExecutor() as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
except Exception as e:
print(f"{url} failed: {e}")
```
---
## 代码风格
### PEP 8 要点
```python
# ✅ 命名规范
class MyClass: # 类名 PascalCase
MAX_SIZE = 100 # 常量 UPPER_SNAKE_CASE
def method_name(self): # 方法 snake_case
local_var = 1 # 变量 snake_case
# ✅ 导入顺序
# 1. 标准库
import os
import sys
from typing import Optional
# 2. 第三方库
import numpy as np
import pandas as pd
# 3. 本地模块
from myapp import config
from myapp.utils import helper
# ✅ 行长度限制79 或 88 字符)
# 长表达式的换行
result = (
long_function_name(arg1, arg2, arg3)
+ another_long_function(arg4, arg5)
)
# ✅ 空行规范
class MyClass:
"""类文档字符串"""
def method_one(self):
pass
def method_two(self): # 方法间一个空行
pass
def top_level_function(): # 顶层定义间两个空行
pass
```
### 文档字符串
```python
# ✅ Google 风格文档字符串
def calculate_area(width: float, height: float) -> float:
"""计算矩形面积。
Args:
width: 矩形的宽度(必须为正数)。
height: 矩形的高度(必须为正数)。
Returns:
矩形的面积。
Raises:
ValueError: 如果 width 或 height 为负数。
Example:
>>> calculate_area(3, 4)
12.0
"""
if width < 0 or height < 0:
raise ValueError("Dimensions must be positive")
return width * height
# ✅ 类文档字符串
class DataProcessor:
"""处理和转换数据的工具类。
Attributes:
source: 数据来源路径。
format: 输出格式('json''csv')。
Example:
>>> processor = DataProcessor("data.csv")
>>> processor.process()
"""
```
### 现代 Python 特性
```python
# ✅ f-stringPython 3.6+
name = "World"
print(f"Hello, {name}!")
# 带表达式
print(f"Result: {1 + 2 = }") # "Result: 1 + 2 = 3"
# ✅ 海象运算符Python 3.8+
if (n := len(items)) > 10:
print(f"List has {n} items")
# ✅ 位置参数分隔符Python 3.8+
def greet(name, /, greeting="Hello", *, punctuation="!"):
"""name 只能位置传参punctuation 只能关键字传参"""
return f"{greeting}, {name}{punctuation}"
# ✅ 模式匹配Python 3.10+
def handle_response(response: dict):
match response:
case {"status": "ok", "data": data}:
return process_data(data)
case {"status": "error", "message": msg}:
raise APIError(msg)
case _:
raise ValueError("Unknown response format")
```
---
## Review Checklist
### 类型安全
- [ ] 函数有类型注解(参数和返回值)
- [ ] 使用 `Optional` 明确可能为 None
- [ ] 泛型类型正确使用
- [ ] mypy 检查通过(无错误)
- [ ] 避免使用 `Any`,必要时添加注释说明
### 异步代码
- [ ] async/await 正确配对使用
- [ ] 没有在异步代码中使用阻塞调用
- [ ] 正确处理 `CancelledError`
- [ ] 使用 `asyncio.gather``TaskGroup` 并发执行
- [ ] 资源正确清理async context manager
### 异常处理
- [ ] 捕获特定异常类型,不使用裸 `except:`
- [ ] 异常链使用 `from` 保留原因
- [ ] 自定义异常继承自合适的基类
- [ ] 异常信息有意义,便于调试
### 数据结构
- [ ] 没有使用可变默认参数list、dict、set
- [ ] 类属性不是可变对象
- [ ] 选择正确的数据结构set vs list 查找)
- [ ] 大数据集使用生成器而非列表
### 测试
- [ ] 测试覆盖率达标(建议 ≥80%
- [ ] 测试命名清晰描述测试场景
- [ ] 边界情况有测试覆盖
- [ ] Mock 正确隔离外部依赖
- [ ] 异步代码有对应的异步测试
### 代码风格
- [ ] 遵循 PEP 8 风格指南
- [ ] 函数和类有 docstring
- [ ] 导入顺序正确(标准库、第三方、本地)
- [ ] 命名一致且有意义
- [ ] 使用现代 Python 特性f-string、walrus operator 等)
### 性能
- [ ] 避免循环中重复创建对象
- [ ] 字符串拼接使用 join
- [ ] 合理使用缓存(@lru_cache
- [ ] IO/CPU 密集型使用合适的并行方式