Python Web框架Flask实战:从零构建RESTful API服务
0x00 引言:为什么选择Flask?
Python Web框架的生态中,Django以"全家桶"著称,而Flask则走了截然不同的道路——它是一个微框架(Microframework),核心只提供路由、请求处理和模板渲染,其余功能全靠扩展(Extension)实现。
这种设计哲学意味着:
- 自由度高:你可以选择自己喜欢的ORM、认证方案、序列化库
- 学习曲线低:核心API简洁,几行代码就能启动一个Web服务
- 适合微服务:轻量级的架构天然适合构建API服务
- 易于调试:代码路径清晰,没有"黑魔法"
本文将从程序员的视角,系统讲解Flask的核心机制、项目架构、数据库集成、认证授权、错误处理到生产部署的全链路实战。
0x01 Flask核心机制
1.1 最简应用:Hello World
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return 'Hello, World!'
if __name__ == '__main__':
app.run(debug=True)三行代码背后的机制:
Flask(__name__):创建应用实例,__name__用于定位资源文件路径@app.route('/'):注册URL规则到视图函数的映射- [
app.run](http://app.run)():启动内置的Werkzeug开发服务器
1.2 请求-响应生命周期
"""
Flask请求处理流程:
客户端请求 → WSGI服务器 → Flask应用
│
┌────────┼────────┐
│ 请求上下文 │
│ 应用上下文 │
└────────┼────────┘
│
┌────────┼────────┐
│ before_request │ → 请求前置钩子
└────────┼────────┘
│
┌────────┼────────┐
│ URL路由匹配 │ → 查找视图函数
└────────┼────────┘
│
┌────────┼────────┐
│ 视图函数执行 │ → 处理业务逻辑
└────────┼────────┘
│
┌────────┼────────┐
│ after_request │ → 请求后置钩子
└────────┼────────┘
│
┌────────┼────────┐
│ 返回响应 │ → Response对象
└────────┼────────┘
"""1.3 上下文机制:Flask的核心设计
Flask使用线程局部变量(Thread-Local)实现请求隔离。
应用上下文(Application Context)
from flask import current_app
with app.app_context():
print(current_app.config['DEBUG']) # 在请求外访问应用配置请求上下文(Request Context)
from flask import request, g
@app.route('/api/user')
def get_user():
# request对象:当前HTTP请求
user_id = request.args.get('id')
# g对象:请求级别的全局存储
g.current_user = User.query.get(user_id)
return jsonify(g.current_user.to_dict())上下文的实现原理:
# Flask内部使用Werkzeug的LocalStack
from werkzeug.local import LocalStack
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
# 每个线程/协程有独立的栈
# 请求进入时push,请求结束时pop
# 这就是为什么可以直接使用 request, g 等全局变量1.4 路由系统
# 基本路由
@app.route('/users')
def list_users():
return 'User list'
# 动态参数
@app.route('/users/<int:user_id>')
def get_user(user_id):
return f'User {user_id}'
# 支持多种HTTP方法
@app.route('/users', methods=['GET', 'POST'])
def users():
if request.method == 'POST':
return create_user()
return list_users()
# URL变量转换器
# <string:name> 默认,字符串
# <int:id> 整数
# <float:price> 浮点数
# <path:subpath> 路径(包含斜杠)
# <uuid:code> UUID
# 自定义转换器
from werkzeug.routing import BaseConverter
class ListConverter(BaseConverter):
def to_python(self, value):
return value.split(',')
def to_url(self, values):
return ','.join(str(v) for v in values)
app.url_map.converters['list'] = ListConverter
@app.route('/users/<list:ids>')
def get_users(ids):
# /users/1,2,3 → ids = ['1', '2', '3']
return jsonify(ids)0x02 项目架构:从单文件到工厂模式
2.1 应用工厂模式(Application Factory)
生产级Flask应用应使用工厂模式,避免循环导入。
项目结构:
myapp/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── config.py # 配置文件
│ ├── extensions.py # 扩展初始化
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── post.py
│ ├── api/ # API蓝图
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ └── posts.py
│ ├── services/ # 业务逻辑
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── post_service.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── decorators.py
│ └── validators.py
├── migrations/ # 数据库迁移
├── tests/ # 测试
├── requirements.txt
├── .env
└── run.py应用工厂实现:
# app/__init__.py
from flask import Flask
from app.config import config_map
from app.extensions import db, migrate, jwt, cors, ma
def create_app(config_name='development'):
"""应用工厂函数"""
app = Flask(__name__)
# 1. 加载配置
app.config.from_object(config_map[config_name])
# 2. 初始化扩展
register_extensions(app)
# 3. 注册蓝图
register_blueprints(app)
# 4. 注册错误处理
register_error_handlers(app)
# 5. 注册钩子
register_hooks(app)
return app
def register_extensions(app):
"""初始化扩展"""
db.init_app(app)
migrate.init_app(app, db)
jwt.init_app(app)
cors.init_app(app)
ma.init_app(app)
def register_blueprints(app):
"""注册蓝图"""
from app.api.auth import auth_bp
from app.api.users import users_bp
from app.api.posts import posts_bp
app.register_blueprint(auth_bp, url_prefix='/api/auth')
app.register_blueprint(users_bp, url_prefix='/api/users')
app.register_blueprint(posts_bp, url_prefix='/api/posts')
def register_error_handlers(app):
"""注册全局错误处理"""
@app.errorhandler(404)
def not_found(error):
return {'error': 'Resource not found', 'status': 404}, 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
return {'error': 'Internal server error', 'status': 500}, 500
def register_hooks(app):
"""注册请求钩子"""
@app.before_request
def before_request():
# 请求日志
app.logger.info(f'{request.method} {request.path}')
@app.after_request
def after_request(response):
# 添加通用响应头
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
return response2.2 配置管理
# app/config.py
import os
from datetime import timedelta
class BaseConfig:
"""基础配置"""
SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-secret-key')
SQLALCHEMY_TRACK_MODIFICATIONS = False
JSON_SORT_KEYS = False
# JWT配置
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY', 'jwt-secret')
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
class DevelopmentConfig(BaseConfig):
"""开发环境"""
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get(
'DEV_DATABASE_URL',
'sqlite:///dev.db'
)
SQLALCHEMY_ECHO = True # 打印SQL语句
class TestingConfig(BaseConfig):
"""测试环境"""
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
class ProductionConfig(BaseConfig):
"""生产环境"""
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SQLALCHEMY_POOL_SIZE = 20
SQLALCHEMY_MAX_OVERFLOW = 40
SQLALCHEMY_POOL_TIMEOUT = 10
config_map = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig
}2.3 扩展管理
# app/extensions.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from flask_marshmallow import Marshmallow
# 在工厂函数外部创建扩展实例(延迟初始化)
db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()
cors = CORS()
ma = Marshmallow()0x03 数据库集成:SQLAlchemy ORM
3.1 模型定义
# app/models/user.py
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
avatar = db.Column(db.String(256))
bio = db.Column(db.Text)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow,
onupdate=datetime.utcnow)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic',
cascade='all, delete-orphan')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'avatar': self.avatar,
'bio': self.bio,
'created_at': self.created_at.isoformat()
}
def __repr__(self):
return f'<User {self.username}>'
# app/models/post.py
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
status = db.Column(db.String(20), default='draft') # draft, published
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow,
onupdate=datetime.utcnow)
# 多对多关系
tags = db.relationship('Tag', secondary='post_tags', backref='posts')
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'content': self.content,
'status': self.status,
'author': self.author.to_dict(),
'tags': [tag.name for tag in self.tags],
'created_at': self.created_at.isoformat()
}
# 关联表
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)3.2 数据库查询
# 基础CRUD操作
# 创建
user = User(username='alice', email='alice@example.com')
user.set_password('secure_password')
db.session.add(user)
db.session.commit()
# 查询
user = User.query.get(1) # 按主键
user = User.query.filter_by(username='alice').first() # 按条件
users = User.query.filter(User.is_active == True).all()
# 更新
user.bio = 'Updated bio'
db.session.commit()
# 删除
db.session.delete(user)
db.session.commit()
# 复杂查询
from sqlalchemy import and_, or_, func
# 分页
page = User.query.filter(
User.is_active == True
).order_by(
User.created_at.desc()
).paginate(page=1, per_page=20, error_out=False)
# 聚合
user_count = db.session.query(func.count(User.id)).scalar()
# 联合查询
results = db.session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post, User.id == Post.user_id).group_by(
User.id
).having(
func.count(Post.id) > 5
).all()3.3 数据库迁移
# 初始化迁移仓库
flask db init
# 生成迁移脚本
flask db migrate -m "create users and posts tables"
# 执行迁移
flask db upgrade
# 回滚
flask db downgrade0x04 RESTful API设计
4.1 蓝图与路由
# app/api/users.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.models.user import User
from app.services.user_service import UserService
from app.utils.validators import validate_user_input
users_bp = Blueprint('users', __name__)
@users_bp.route('/', methods=['GET'])
def list_users():
"""获取用户列表"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('search', '')
pagination = UserService.get_users(page, per_page, search)
return jsonify({
'users': [u.to_dict() for u in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'per_page': pagination.per_page,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
})
@users_bp.route('/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""获取单个用户"""
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
@users_bp.route('/', methods=['POST'])
def create_user():
"""创建用户"""
data = request.get_json()
# 参数验证
errors = validate_user_input(data)
if errors:
return jsonify({'errors': errors}), 400
# 检查重复
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 409
user = UserService.create_user(data)
return jsonify(user.to_dict()), 201
@users_bp.route('/<int:user_id>', methods=['PUT'])
@jwt_required()
def update_user(user_id):
"""更新用户"""
current_user_id = get_jwt_identity()
if current_user_id != user_id:
return jsonify({'error': 'Forbidden'}), 403
data = request.get_json()
user = UserService.update_user(user_id, data)
return jsonify(user.to_dict())
@users_bp.route('/<int:user_id>', methods=['DELETE'])
@jwt_required()
def delete_user(user_id):
"""删除用户"""
current_user_id = get_jwt_identity()
if current_user_id != user_id:
return jsonify({'error': 'Forbidden'}), 403
UserService.delete_user(user_id)
return '', 2044.2 服务层(Service Layer)
# app/services/user_service.py
from app.extensions import db
from app.models.user import User
class UserService:
@staticmethod
def get_users(page, per_page, search=''):
query = User.query.filter(User.is_active == True)
if search:
query = query.filter(
User.username.ilike(f'%{search}%') |
User.email.ilike(f'%{search}%')
)
return query.order_by(
User.created_at.desc()
).paginate(page=page, per_page=per_page, error_out=False)
@staticmethod
def create_user(data):
user = User(
username=data['username'],
email=data['email'],
bio=data.get('bio', '')
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return user
@staticmethod
def update_user(user_id, data):
user = User.query.get_or_404(user_id)
if 'username' in data:
user.username = data['username']
if 'email' in data:
user.email = data['email']
if 'bio' in data:
user.bio = data['bio']
if 'password' in data:
user.set_password(data['password'])
db.session.commit()
return user
@staticmethod
def delete_user(user_id):
user = User.query.get_or_404(user_id)
user.is_active = False # 软删除
db.session.commit()4.3 序列化与验证(Marshmallow)
# app/schemas/user_schema.py
from marshmallow import Schema, fields, validate, validates, ValidationError
class UserCreateSchema(Schema):
username = fields.Str(
required=True,
validate=[
validate.Length(min=3, max=80),
validate.Regexp(r'^[a-zA-Z0-9_]+$',
error='Only letters, numbers and underscores')
]
)
email = fields.Email(required=True)
password = fields.Str(
required=True,
validate=validate.Length(min=8, max=128),
load_only=True # 不序列化输出
)
bio = fields.Str(validate=validate.Length(max=500))
@validates('username')
def validate_username(self, value):
if User.query.filter_by(username=value).first():
raise ValidationError('Username already exists')
class UserResponseSchema(Schema):
id = fields.Int(dump_only=True)
username = fields.Str()
email = fields.Email()
bio = fields.Str()
avatar = fields.Str()
created_at = fields.DateTime(format='iso')
# 使用示例
user_create_schema = UserCreateSchema()
user_response_schema = UserResponseSchema()
@users_bp.route('/', methods=['POST'])
def create_user():
try:
data = user_create_schema.load(request.get_json())
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
user = UserService.create_user(data)
return jsonify(user_response_schema.dump(user)), 2010x05 认证与授权
5.1 JWT认证
# app/api/auth.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt
)
from app.models.user import User
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': 'Invalid credentials'}), 401
if not user.is_active:
return jsonify({'error': 'Account is disabled'}), 403
# 生成令牌
access_token = create_access_token(
identity=user.id,
additional_claims={
'username': user.username,
'email': user.email
}
)
refresh_token = create_refresh_token(identity=user.id)
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
})
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""刷新令牌"""
current_user_id = get_jwt_identity()
access_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': access_token})
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_me():
"""获取当前用户信息"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
return jsonify(user.to_dict())5.2 基于角色的访问控制(RBAC)
# app/utils/decorators.py
from functools import wraps
from flask import jsonify
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
from app.models.user import User
def role_required(*roles):
"""角色权限装饰器"""
def wrapper(fn):
@wraps(fn)
def decorated(*args, **kwargs):
verify_jwt_in_request()
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user or user.role not in roles:
return jsonify({'error': 'Insufficient permissions'}), 403
return fn(*args, **kwargs)
return decorated
return wrapper
# 使用
@users_bp.route('/admin/users', methods=['GET'])
@role_required('admin', 'superadmin')
def admin_list_users():
users = User.query.all()
return jsonify([u.to_dict() for u in users])0x06 错误处理与日志
6.1 统一异常处理
# app/utils/exceptions.py
class APIError(Exception):
"""自定义API异常"""
def __init__(self, message, status_code=400, payload=None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload
def to_dict(self):
result = {
'error': self.message,
'status': self.status_code
}
if self.payload:
result['details'] = self.payload
return result
class NotFoundError(APIError):
def __init__(self, message='Resource not found'):
super().__init__(message, 404)
class UnauthorizedError(APIError):
def __init__(self, message='Unauthorized'):
super().__init__(message, 401)
class ForbiddenError(APIError):
def __init__(self, message='Forbidden'):
super().__init__(message, 403)
class ConflictError(APIError):
def __init__(self, message='Resource conflict'):
super().__init__(message, 409)
# 注册到app
def register_error_handlers(app):
@app.errorhandler(APIError)
def handle_api_error(error):
return jsonify(error.to_dict()), error.status_code
@app.errorhandler(404)
def handle_404(error):
return jsonify({'error': 'Not found', 'status': 404}), 404
@app.errorhandler(500)
def handle_500(error):
db.session.rollback()
app.logger.error(f'Internal error: {error}')
return jsonify({'error': 'Internal server error', 'status': 500}), 5006.2 日志配置
# app/utils/logger.py
import logging
from logging.handlers import RotatingFileHandler
import os
def setup_logging(app):
"""配置日志"""
log_level = logging.DEBUG if app.debug else logging.INFO
# 文件日志
if not os.path.exists('logs'):
os.makedirs('logs')
file_handler = RotatingFileHandler(
'logs/app.log',
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s [%(name)s] %(message)s'
))
file_handler.setLevel(log_level)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(message)s'
))
console_handler.setLevel(log_level)
app.logger.addHandler(file_handler)
app.logger.addHandler(console_handler)
app.logger.setLevel(log_level)0x07 中间件与请求钩子
7.1 请求限流
# app/utils/rate_limit.py
from functools import wraps
from flask import request, jsonify
import time
# 简单的内存限流(生产环境应使用Redis)
request_counts = {}
def rate_limit(max_requests=100, window=60):
"""请求限流装饰器"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
key = f'{request.remote_addr}:{request.endpoint}'
now = time.time()
if key not in request_counts:
request_counts[key] = []
# 清除过期记录
request_counts[key] = [
t for t in request_counts[key]
if t > now - window
]
if len(request_counts[key]) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429
request_counts[key].append(now)
return f(*args, **kwargs)
return decorated
return decorator
# 使用
@app.route('/api/data')
@rate_limit(max_requests=60, window=60)
def get_data():
return jsonify({'data': 'value'})7.2 请求日志中间件
# 请求计时中间件
@app.before_request
def start_timer():
g.start_time = time.time()
@app.after_request
def log_request(response):
if hasattr(g, 'start_time'):
elapsed = time.time() - g.start_time
app.logger.info(
f'{request.method} {request.path} '
f'→ {response.status_code} '
f'({elapsed:.3f}s)'
)
return response0x08 文件上传与处理
import os
import uuid
from flask import Blueprint, request, jsonify, current_app, send_from_directory
from werkzeug.utils import secure_filename
uploads_bp = Blueprint('uploads', __name__)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf', 'doc', 'docx'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@uploads_bp.route('/upload', methods=['POST'])
@jwt_required()
def upload_file():
"""文件上传"""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
# 检查文件大小(10MB)
file.seek(0, os.SEEK_END)
size = file.tell()
file.seek(0)
if size > 10 * 1024 * 1024:
return jsonify({'error': 'File too large (max 10MB)'}), 400
# 生成安全文件名
ext = file.filename.rsplit('.', 1)[1].lower()
filename = f'{uuid.uuid4().hex}.{ext}'
# 保存文件
upload_dir = current_app.config['UPLOAD_FOLDER']
os.makedirs(upload_dir, exist_ok=True)
file.save(os.path.join(upload_dir, filename))
return jsonify({
'filename': filename,
'url': f'/uploads/{filename}',
'size': size
}), 201
@uploads_bp.route('/uploads/<filename>')
def serve_file(filename):
"""文件访问"""
return send_from_directory(
current_app.config['UPLOAD_FOLDER'],
filename
)0x09 测试
9.1 单元测试
# tests/conftest.py
import pytest
from app import create_app
from app.extensions import db as _db
@pytest.fixture(scope='session')
def app():
"""创建测试应用"""
app = create_app('testing')
with app.app_context():
_db.create_all()
yield app
_db.drop_all()
@pytest.fixture(scope='function')
def db(app):
"""每个测试函数使用独立的数据库事务"""
with app.app_context():
_db.session.begin_nested()
yield _db
_db.session.rollback()
@pytest.fixture
def client(app):
"""测试客户端"""
return app.test_client()
@pytest.fixture
def auth_headers(client):
"""认证头"""
# 创建测试用户并登录
client.post('/api/auth/register', json={
'username': 'testuser',
'email': 'test@example.com',
'password': 'testpass123'
})
response = client.post('/api/auth/login', json={
'username': 'testuser',
'password': 'testpass123'
})
token = response.json['access_token']
return {'Authorization': f'Bearer {token}'}
# tests/test_users.py
class TestUsersAPI:
def test_create_user(self, client):
"""测试创建用户"""
response = client.post('/api/users/', json={
'username': 'alice',
'email': 'alice@example.com',
'password': 'secure123'
})
assert response.status_code == 201
assert response.json['username'] == 'alice'
def test_create_user_duplicate(self, client):
"""测试重复用户名"""
client.post('/api/users/', json={
'username': 'bob',
'email': 'bob@example.com',
'password': 'secure123'
})
response = client.post('/api/users/', json={
'username': 'bob',
'email': 'bob2@example.com',
'password': 'secure123'
})
assert response.status_code == 409
def test_get_user(self, client, auth_headers):
"""测试获取用户"""
response = client.get('/api/users/1', headers=auth_headers)
assert response.status_code == 200
def test_list_users_pagination(self, client):
"""测试分页"""
response = client.get('/api/users/?page=1&per_page=10')
assert response.status_code == 200
assert 'total' in response.json
assert 'pages' in response.json0x10 性能优化与缓存
10.1 Redis缓存
# app/utils/cache.py
import json
import redis
from functools import wraps
from flask import request
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cached(timeout=300, key_prefix='api'):
"""缓存装饰器"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
cache_key = f'{key_prefix}:{request.path}:{request.query_string.decode()}'
# 尝试读取缓存
cached_data = redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 执行函数
result = f(*args, **kwargs)
# 写入缓存
redis_client.setex(
cache_key,
timeout,
json.dumps(result.get_json())
)
return result
return decorated
return decorator
# 使用
@users_bp.route('/<int:user_id>')
@cached(timeout=60)
def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())10.2 数据库查询优化
# 避免N+1查询
# 错误:
posts = Post.query.all()
for post in posts:
print(post.author.username) # 每次都查询数据库
# 正确:使用预加载
posts = Post.query.options(
db.joinedload(Post.author)
).all()
# 或使用子查询加载
posts = Post.query.options(
db.subqueryload(Post.tags)
).all()0x11 生产部署
11.1 Gunicorn + Nginx
Gunicorn配置:
# gunicorn.conf.py
import multiprocessing
bind = '127.0.0.1:8000'
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = 'gevent'
timeout = 120
keepalive = 5
# 日志
accesslog = '/var/log/gunicorn/access.log'
errorlog = '/var/log/gunicorn/error.log'
loglevel = 'info'Nginx配置:
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /static {
alias /var/www/myapp/static;
expires 30d;
}
}启动命令:
gunicorn -c gunicorn.conf.py "app:create_app('production')"11.2 Docker部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV FLASK_APP=run.py
ENV FLASK_ENV=production
EXPOSE 8000
CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app('production')"]# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/myapp
- SECRET_KEY=your-secret-key
- JWT_SECRET_KEY=your-jwt-secret
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=myapp
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
pgdata:0x12 总结
12.1 Flask项目清单
✅ 架构设计
- 使用应用工厂模式
- 蓝图分离模块
- 服务层封装业务逻辑
- 统一异常处理
✅ 数据层
- SQLAlchemy ORM
- 数据库迁移(Flask-Migrate)
- 查询优化(预加载、索引)
✅ 安全
- JWT认证
- RBAC授权
- 输入验证
- CORS配置
- 请求限流
✅ 运维
- 结构化日志
- 健康检查端点
- Docker容器化
- Gunicorn + Nginx部署
12.2 Flask vs Django选型
| 维度 | Flask | Django |
|---|---|---|
| 定位 | 微框架 | 全栈框架 |
| 学习曲线 | 低 | 中高 |
| 灵活性 | 极高 | 中等 |
| 内置功能 | 最少 | 完整 |
| 适合场景 | API服务、微服务 | CMS、全栈应用 |
| ORM | 自选(SQLAlchemy) | 内置Django ORM |
| Admin后台 | 无 | 内置 |
记住:Flask的精髓在于"选择自由",但自由也意味着你需要为每个决策负责。
参考资料:
- Flask官方文档
- SQLAlchemy官方文档
- Flask-JWT-Extended文档
- Flask Web Development (O'Reilly)
- Grinberg, M. (2018). "Flask Web Development: Developing Web Applications with Python"