Flask 是一款基于 Werkzeug 和 Jinja2 实现的轻量级 Web 框架,灵活轻便易上手,有非常多的扩展功能。
pip install Flask
本文 Flask==2.1.2
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return '<h1>Hello World!</h1>'
@app.route('/user/<name>')
def user(name):
return f'<h1>Hello, {name}!</h1>'
@app.route('/number/<int:id>')
def number(id):
return f'<h1>Hello, {id}!</h1>'
if __name__ == '__main__':
app.run(debug=True)

访问 http://127.0.0.1:5000/user/XerCis

访问 http://127.0.0.1:5000/number/1

Ctrl + C 退出程序
处理 URL 到 Python 函数的映射程序称为路由,如 @app.route('/'),将触发视图函数 index()
创建路由方式有以下两种:
装饰器 app.route() 相当于函数 app.add_url_rule()
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return '<h1>Hello World!</h1>'
def user(name):
return f'<h1>Hello, {name}!</h1>'
app.add_url_rule('/user/<name>', view_func=user)
print(app.url_map)
# Map([<Rule '/' (HEAD, OPTIONS, GET) -> index>,
# <Rule '/static/<filename>' (HEAD, OPTIONS, GET) -> static>,
# <Rule '/user/<name>' (HEAD, OPTIONS, GET) -> user>])
实现代码分层

auth/__init__.py
from flask import Blueprint
auth = Blueprint('auth', __name__)
from . import views
auth/views.py
from . import auth
@auth.route('/login')
def login():
return 'success'
main/__init__.py
from flask import Blueprint
main = Blueprint('main', __name__)
from . import views
main/views.py
from . import main
@main.route('/')
def index():
return '<h1>Hello World!</h1>'
app.py
from flask import Flask
from main import main as main_blueprint
from auth import auth as auth_blueprint
app = Flask(__name__)
app.register_blueprint(main_blueprint)
app.register_blueprint(auth_blueprint, url_prefix='/auth')
print(app.url_map)
# Map([<Rule '/auth/login' (OPTIONS, GET, HEAD) -> auth.login>,
# <Rule '/' (OPTIONS, GET, HEAD) -> main.index>,
# <Rule '/static/<filename>' (OPTIONS, GET, HEAD) -> static>])
Flask 使用上下文临时把某些对象变为全局可访问,如参数 request
from flask import Flask, request
app = Flask(__name__)
@app.route('/')
def index():
user_agent = request.headers.get('User-Agent')
return f'<p>Your browser is {user_agent}</p>'
if __name__ == '__main__':
app.run(debug=True)

| 变量名 | 上下文 | 说明 |
|---|---|---|
| current_app | 程序上下文 | 当前激活程序的实例 |
| g | 程序上下文 | 处理请求时的临时存储对象 |
| request | 请求上下文 | 请求对象,封装了客户端的 HTTP 请求内容 |
| session | 请求上下文 | 用户会话,存储请求间的键值对字典 |
在请求前或后执行代码,如请求前创建数据库连接或认证发起请求的用户
为避免在每个视图函数中使用重复代码,Flask 提供了注册通用函数的功能
请求钩子用装饰器实现:
before_first_request:在处理第一个请求前运行before_request:每次请求前运行after_request:如果没有未处理的异常抛出,在每次请求后运行teardown_request:即使有未处理的异常抛出,也在每次请求后运行Flask 响应的状态码默认为 200,表示成功处理
如果需要不同状态码,,可作为第二个返回值,如状态码 400,表示请求无效
或返回 Response 对象,可用 make_response() 函数构造
重定向的响应码为 302,可使用 redirect() 替代返回三个值或 Response 对象的形式
网页或页面没找到的响应码为 404,常通过 abort() 处理错误
from flask import Flask, make_response, redirect, abort
app = Flask(__name__)
@app.route('/')
def index():
return '<h1>Bad Request</h1>', 400
@app.route('/error')
def error():
response = make_response('<h1>This document carries a cookie!</h1>')
response.set_cookie('answer', '42')
return response
@app.route('/user/<int:id>')
def get_user(id):
if id % 2 == 0:
abort(404)
else:
return '<h1>Hello</h1>'
@app.route('/redirect')
def _redirect():
return redirect('http://www.baidu.com')
if __name__ == '__main__':
app.run(debug=True)
访问:
为避免混乱,业务逻辑和显示逻辑应分开
模板是一个包含响应文本的文件,占位符表示动态部分,Flask 使用 Jinja2 这个强大的模板引擎进行渲染
项目结构,Flask 默认在 templates 子文件夹下寻找模板

templates/index.html
<h1>Hello World!</h1>
templates/user.html
<h1>Hello, {{ name }}!</h1>
main.py
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/user/<name>')
def user(name):
return render_template('user.html', name=name)
if __name__ == '__main__':
app.run(debug=True)
访问:
{{ x }}{{ d['key'] }}、{{ obj.f() }}Hello, {{ name|capitalize }}templates/index.html
<p>{{ d['key'] }}</p>
<p>{{ l[3] }}</p>
<p>{{ l[index] }}</p>
<p>{{ obj.f() }}</p>
<p>Hello, {{ name|capitalize }}</p>
main.py
from flask import Flask, render_template
app = Flask(__name__)
class A():
def f(self):
return 'a'
@app.route('/')
def index():
d = {'key': 'Hello'}
l = [0, 1, 2, 3, 4]
index = 4
obj = A()
name = 'xercis'
return render_template('index.html', d=d, l=l, index=index, obj=obj, name=name)
if __name__ == '__main__':
app.run(debug=True)

条件 templates/condition.html
{% if x > 0 %}
x is positive
{% elif x == 0 %}
x is zero
{% else %}
x is negative
{% endif %}
循环 templates/loop.html
<ul>
{% for comment in comments %}
<li>{{ comment }}</li>
{% endfor %}
</ul>
宏,类似于 Python 中的函数,templates/macro.html
{% macro render_comment(comment) %}
<li>{{ comment }}</li>
{% endmacro %}
<ul>
{% for comment in comments %}
{{ render_comment(comment) }}
{% endfor %}
</ul>
还有模板继承,此处略。
main.py
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/condition/<x>')
def condition(x):
x = int(x)
return render_template('condition.html', x=x)
@app.route('/loop')
def loop():
comments = 'abcde'
return render_template('loop.html', comments=comments)
@app.route('/macro')
def macro():
comments = 'abcde'
return render_template('macro.html', comments=comments)
if __name__ == '__main__':
app.run(debug=True)
访问:
from flask import Flask, abort
app = Flask(__name__)
@app.route('/a')
def a():
abort(404)
@app.route('/b')
def b():
abort(500)
@app.errorhandler(404)
def page_not_found(e):
return '<h1>Page not found</h1>', 404
@app.errorhandler(500)
def internal_server_error(e):
return '<h1>internal server error</h1>', 500
if __name__ == '__main__':
app.run(debug=True)
访问:
安装
pip install flask-bootstrap
pip install flask-wtf
快速渲染表单 templates/index.html
{% extends "bootstrap/base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block content %}
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
<button type="button" class="close" data-dismiss="alert">×</button>
{{ message }}
</div>
{% endfor %}
<h1>Hello, {% if name %}{{ name }}{% else %}Stranger{% endif %}!</h1>
{{ wtf.quick_form(form) }}
{% endblock %}
main.py
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
from flask import Flask, render_template, session, redirect, url_for, flash
app = Flask(__name__)
app.config['SECRET_KEY'] = 'hard to guess string' # 生成加密令牌的密钥
from flask_bootstrap import Bootstrap
bootstrap = Bootstrap(app)
class NameForm(FlaskForm):
name = StringField('What is your name?', validators=[DataRequired()])
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
old_name = session.get('name')
if old_name is not None and old_name != form.name.data:
flash('Looks like you have changed your name!')
session['name'] = form.name.data
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'))
if __name__ == '__main__':
app.run(debug=True)

订单管理程序有数据表如下
安装
pip install flask-sqlalchemy
| 数据库引擎 | URL |
|---|---|
| MySQL | mysql://scott:tiger@localhost/foo mysql+mysqldb://scott:tiger@localhost/foo mysql+pymysql://scott:tiger@localhost/foo |
| PostgreSQL | postgresql://scott:tiger@localhost/mydatabase postgresql+psycopg2://scott:tiger@localhost/mydatabase |
| SQLite | sqlite:///C:\path\to\foo.db |
| 类型 | Python 类型 | 描述 |
|---|---|---|
| BigInteger | int | 大整数 |
| Boolean | bool | 布尔类型 |
| Date | datetime.date | 日期 |
| DateTime | datetime.datetime | 日期时间 |
| Enum | str | 枚举 |
| Float | float | 浮点数 |
| Integer | int | 整数 |
| Interval | datetime.timedelta | 时间间隔 |
| LargeBinary | str | 二进制文件 |
| MatchType | MATCH操作符的返回类型 | |
| Numeric | decimal.Decimal | 数值的基类 |
| PickleType | 任意 Python 对象 | pickle序列化的Python对象 |
| SchemaType | Mark a type as possibly requiring schema-level DDL for usage. | |
| SmallInteger | int | 较小整数 |
| String | str | 字符串的基类 |
| Text | str | 可变大小字符串 |
| Time | datetime.time | 时间 |
| Unicode | str | 可变大小Unicode字符串 |
| UnicodeText | str | 长度无界的Unicode字符串 |
| 选项明 | 描述 |
|---|---|
| name | 数据库中的列名 |
| type_ | 列类型 |
| autoincrement | 整数主键列自动递增 |
| default | 默认值 |
| index | 索引 |
| nullable | 是否允许为空 |
| unique | 是否为唯一值 |
| 过滤器 | 描述 |
|---|---|
filter() | 筛选条件 |
filter_by() | 关键字形式的筛选条件 |
limit() | 数量限制 |
offset() | 偏移量 |
order_by() | 排序 |
group_by() | 分组 |
| 执行函数 | 描述 |
|---|---|
all() | 以列表的形式返回 |
first() | 第一条结果 |
first_or_404() | 第一条结果或404 |
get() | 主键对应的行 |
get_or_404() | 主键对应的行或404 |
count() | 结果数量 |
paginate() | 返回一个 Paginate 对象,包含指定范围的结果 |
from pathlib import Path
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(Path(__file__).parent / 'data.sqlite') # 数据库连接
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 不跟踪对象修改
db = SQLAlchemy(app)
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
users = db.relationship('User', backref='role', lazy='dynamic')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
def __repr__(self):
return '<User %r>' % self.username
if __name__ == '__main__':
db.drop_all() # 删除旧数据
db.create_all() # 创建数据库和表
# 插入行
admin_role = Role(name='Admin')
mod_role = Role(name='Moderator')
user_role = Role(name='User')
user_john = User(username='john', role=admin_role)
user_susan = User(username='susan', role=user_role)
user_david = User(username='david', role=user_role)
# db.session.add(admin_role)
# db.session.add(mod_role)
# db.session.add(user_role)
# db.session.add(user_john)
# db.session.add(user_susan)
# db.session.add(user_david)
db.session.add_all([admin_role, mod_role, user_role, user_john, user_susan, user_david])
db.session.commit()
print(admin_role.id) # 1
print(mod_role.id) # 2
print(user_role.id) # 3
# 修改行
print(admin_role.name) # Admin
admin_role.name = 'Administrator'
db.session.add(admin_role)
db.session.commit()
print(admin_role.name) # Administrator
# 删除行
db.session.delete(mod_role)
db.session.commit()
print(mod_role.id) # 2
# 查询行
print(Role.query.all()) # [<Role 'Administrator'>, <Role 'User'>]
print(User.query.all()) # [<User 'john'>, <User 'susan'>, <User 'david'>]
print(User.query.filter_by(role=user_role).all()) # [<User 'susan'>, <User 'david'>]
print(str(User.query.filter_by(role=user_role)))
# SELECT users.id AS users_id, users.username AS users_username, users.role_id AS users_role_id FROM users
users = user_role.users
print(users) # [<User 'susan'>, <User 'david'>]
print(users[0].role) # <Role 'User'>
print(user_role.users.order_by(User.username).all()) # [<User 'david'>, <User 'susan'>]
print(user_role.users.count()) # 2
安装
pip install flask-mail
推荐阅读:Python通过163和QQ收发邮件
异步发邮件
from threading import Thread
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config['MAIL_SERVER'] = 'smtp.163.com' # 服务器
app.config['MAIL_PORT'] = 465 # 启用SSL发信,端口一般是465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = 'abc@163.com' # 用户名
app.config['MAIL_PASSWORD'] = 'MXHQPFWUFEXCVMOQ' # 授权密码
mail = Mail(app) # 这句不能在app配置初始化前
def send_async_email(app, message):
"""异步发送电子邮件"""
with app.app_context():
mail.send(message)
@app.route('/')
def index():
subject = '关于Python日志库Loguru库的问题请教'
message = Message(
subject,
recipients=['12345678@qq.com'],
html='<h1>请问如何轻松记录日志?</h1>',
sender=app.config['MAIL_USERNAME']
)
# mail.send(message) # 这句会卡几秒,影响用户体验
thr = Thread(target=send_async_email, args=[app, message])
thr.start()
return '发送成功\n'
if __name__ == '__main__':
app.run(debug=True)
生产环境中最好用 Celery
大多数用户在不同的网站中使用相同的密码,想保证数据库中用户密码的安全,不能存储密码本身,而要存储密码的散列值,通常使用散列值加盐的方法。
推荐阅读:
使用 Werkzeug 实现密码散列值加盐
from werkzeug.security import generate_password_hash, check_password_hash
password = '123456'
password_hash = generate_password_hash(password)
print(password_hash)
print(check_password_hash(password_hash, password))
# pbkdf2:sha256:260000$mcvf9pIsr5Dg625I$f9060c13862c0b2570407eb27fba8a59d68b8d56aaf6845d46b64d8788894768
# True
安装
pip install flask-login
pip install email_validator
Flask-Login 要求实现的用户模型方法
| 方法 | 描述 |
|---|---|
| is_authenticated() | 用户是否已登录 |
| is_active() | 是否允许用户登录 |
| is_anonymous() | 普通用户返回 False |
| get_id() |
用户认证
from . import db, login_manager
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(64), unique=True, index=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return '<User %r>' % self.username
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
具体代码
git checkout 8e

git clone https://github.com/miguelgrinberg/flasky.git
cd flasky
git checkout 7a

工程结构部分


前端能对状态码 404 和 500 进行路由,前后端分离的话,应由前端渲染,对应 static 和 templates 文件夹就不需要了
区分版本很重要,因为无法强制更新手机 APP,可让客户端的参数带上当前版本。
| 状态码 | 名 | 描述 |
|---|---|---|
| 200 | OK | 成功 |
| 201 | Created | 成功并创建了新资源 |
| 400 | Bad request | 请求不可用 |
| 401 | Unauthorized | 未授权 |
| 403 | Forbidden | 无权访问 |
| 404 | Not found | 对应资源不存在 |
| 405 | Method not allowed | 不支持该方法 |
| 500 | Internal server error | 服务器内部错误 |
404 和 500 状态码可能会让客户端困惑,因此可以在错误处理程序中改写响应,这种技术称为内容协商,发送 JSON 格式响应。
after_app_request() 将慢查询写入日志,视情况发送邮件(前者获取慢查询,后者发邮件)from flask import Flask, send_file
app = Flask(__name__)
@app.route('/download/')
def download():
file = '测试.txt'
with open(file, mode='w') as f:
f.write('Hello World!')
return send_file(file, as_attachment=True)
if __name__ == '__main__':
app.run()
访问 http://127.0.0.1:5000/download/
POST:request.form.get(key, default=None, type=None)
GET:request.args.get(key, default=None, type=None)
所有:request.values.get(key, default=None, type=None)
from werkzeug.utils import redirect
安装
pip install flask-bootstrap
templates/user.html
{% extends "bootstrap/base.html" %}
{% block title %}Flasky{% endblock %}
{% block navbar %}
<div class="navbar navbar-inverse" role="navigation">
<div class="container">
<div class="navbar-header">
<button type="button" class="navbar-toggle"
data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="/">Flasky</a>
</div>
<div class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li><a href="/">Home</a></li>
</ul>
</div>
</div>
</div>
{% endblock %}
{% block content %}
<div class="container">
<div class="page-header">
<h1>Hello, {{ name }}!</h1>
</div>
</div>
{% endblock %}
main.py
from flask import Flask, render_template
from flask_bootstrap import Bootstrap
app = Flask(__name__)
Bootstrap(app)
@app.route('/user/<name>')
def user(name):
return render_template('user.html', name=name)
if __name__ == '__main__':
app.run(debug=True)
访问 http://127.0.0.1:5000/user/XerCis

如果用户来自世界各地,需要统一时间单位,一般使用地理位置无关的协调世界时( Coordinated Universal Time, UTC),在渲染的时候转换为当地时间。
安装
pip install flask-moment
templates/index.html
{{ moment.include_moment() }}
<p>The local date and time is {{ moment(current_time).format('LLL') }}.</p>
<p>That was {{ moment(current_time).fromNow(refresh=True) }}</p>
main.py
from datetime import datetime
from flask import Flask, render_template
from flask_moment import Moment
app = Flask(__name__)
moment = Moment(app)
@app.route('/')
def index():
return render_template('index.html', current_time=datetime.utcnow())
if __name__ == '__main__':
app.run(debug=True)

安装
pip install flask-wtf
安装
pip install flask-sqlalchemy
安装
pip install flask-migrate
安装
pip install flask-mail
安装
pip install flask-login
REST Web 服务要求无状态,服务器再两次请求间不能记住客户端的任何信息。
可通过 HTTP 认证发送密令,将密令包含在请求的 Authorization 中。
安装
pip install flask-httpauth
可结合装饰器 auth.login_required() 和 api.before_request() 进行所有路由的自动认证,甚至能认证角色。
安装
pip install flask-talisman
安装
pip install flask-restful
安装
pip install flask-openid
安装
pip install flask_whooshalchemy
安装
pip install flask-admin

安装
pip install flask-security
安装
pip install flask-jwt-extended
安装
pip install flask-limiter
安装
pip install flask-babel
安装
pip install flask-caching
安装
pip install flask-debugtoolbar

安装
pip install flask-redis
安装
pip install flask-sqlacodegen
安装
pip install flask-sse
安装
pip install flask-socketio
安装
pip install flask-uploads
安装
pip install flask-dropzone
安装
pip install flask-ckeditor
安装
pip install flask-flatPages
安装
pip install flasgger
安装
pip install apiflask
代码
from apiflask import APIFlask, Schema, abort
from apiflask.fields import Integer, String
from apiflask.validators import Length, OneOf
app = APIFlask(__name__)
pets = [
{'id': 0, 'name': 'Kitty', 'category': 'cat'},
{'id': 1, 'name': 'Coco', 'category': 'dog'}
]
class PetInSchema(Schema):
"""入参"""
name = String(required=True, validate=Length(0, 10))
category = String(required=True, validate=OneOf(['dog', 'cat']))
class PetOutSchema(Schema):
"""出参"""
id = Integer()
name = String()
category = String()
@app.get('/')
def say_hello():
# 返回dict相当于使用jsonify()
return {'message': 'Hello!'}
@app.get('/pets/<int:pet_id>')
@app.output(PetOutSchema)
def get_pet(pet_id):
if pet_id > len(pets) - 1:
abort(404)
# 也可以直接返回ORM/ODM实例,会自动将对象序列化为JSON格式
return pets[pet_id]
@app.patch('/pets/<int:pet_id>')
@app.input(PetInSchema(partial=True), location='json_or_form')
@app.output(PetOutSchema)
def update_pet(pet_id, data):
# 经过验证和解析的输入数据将作为dict注入视图函数
if pet_id > len(pets) - 1:
abort(404)
for attr, value in data.items():
pets[pet_id][attr] = value
return pets[pet_id]
if __name__ == '__main__':
app.run()
测试
curl http://127.0.0.1:5000/
curl http://127.0.0.1:5000/pets/0
curl -X PATCH http://127.0.0.1:5000/pets/0 --data "{'name':'Butterfly', 'category':'cat'}"
文档

