• Flask-SQLAlchemy 快速入门


    1 安装

    pip install flask-sqlalchemy

    2 配置

    配置选项说明
    SQLALCHEMY_DATABASE_URI连接数据库(mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8)
    SQLALCHEMY_ECHO调试选项
    SQLALCHEMY_POOL_SIZE数据库池的大小默认为5
    SQLALCHEMY_TRACK_MODIFICATIONS追踪对象的修改并发送信号

    操作数据库首先要创建一个db对象,通常卸载exts.py文件中。

    from flask_sqlalchemy import SQLAlchemy
    
    db = SQLAlchemy()
    
    • 1
    • 2
    • 3

    数据库配置我们一般卸载config.py文件里。

    # 数据库配置
    HOSTNAME = '127.0.0.1'
    PORT = '3306'
    DATABASE = 'web_v1'
    USERNAME = 'root'
    PASSWORD = 'root'
    DB_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'.\
        format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)
    
    SQLALCHEMY_DATABASE_URI = DB_URI
    SQLALCHEMY_TRACK_MODIFICATIONS = True
    SQLALCHEMY_ECHO = True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    写完数据库配置之后需要和app绑定,如下所示:
    
    • 1
    from flask import Flask
    import configs
    from exts import db
    
    app = Flask(__name__)
    # 加载配置文件
    app.config.from_object(configs)
    # db绑定app
    db.init_app(app)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3 基本数据类型

    数据类型说明
    Integer整型
    String字符串
    Text文本
    DateTime日期
    Float浮点型
    Boolean布尔值
    PickleType存储一个序列化后的Python对象
    LargeBinary巨长度的二进制数据

    4 表的创建

    首先,需要创建一个模型对象,我们一般存放在model.py中。
    表关系

    • 一个用户对应多篇文章(一对多)
    • 一篇文章对应多个标签,一个标签对应多个文章(多对多)
    1. 一对一关系中,需要设置relationship中的uselist=Flase,其他数据库操作一样。
    2. 一对多关系中,外键设置在多的一方中,关系(relationship)可设置在任意一方。
    3. 多对多关系中,需建立关系表,设置 secondary=关系表
    from exts import db
    
    
    # 用户表
    class User(db.Model):
        __tablename__ = 'user'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        username = db.Column(db.String(50), nullable=False, unique=True)
        email = db.Column(db.String(50), nullable=False, unique=True)
    
    # 关系表(多对多)
    article_tag_table = db.Table('article_tag',
                                 db.Column('article_id', db.Integer, db.ForeignKey('article.id'), primary_key=True),
                                 db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'), primary_key=True))
    
    
    # 文章表
    class Article(db.Model):
        __tablename__ = 'article'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        title = db.Column(db.String(100))
        content = db.Column(db.Text)
        author_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    
        author = db.relationship("User", backref="articles")
        tags = db.relationship("Tag", secondary=article_tag_table, backref='tags')
    
    # 标签表
    class Tag(db.Model):
        __tablename__ = 'tag'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        name = db.Column(db.String(50))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    5 表的映射

    创建好表之后要将表映射到数据库中。

    from flask_migrate import Migrate
    
    migrate = Migrate(app, db)
    
    • 1
    • 2
    • 3
    在命令行运行如下指令:
    
    • 1

    :::tips
    flask db inite
    flask db migrate
    flask db upgrade
    :::
    映射成功后可以在Navicat Premium中查看各个表。

    6 测试是否已经连接数据库

    from flask import Flask
    import configs
    from exts import db
    from model import User
    from flask_migrate import Migrate
    
    app = Flask(__name__)
    # 加载配置文件
    app.config.from_object(configs)
    # db绑定app
    db.init_app(app)
    migrate = Migrate(app, db)
    
    # 注意上下文
    with app.app_context():
        with db.engine.connect() as conn:
            rs = conn.execute("select 1")
            print(rs.fetchone())
    
    
    
    @app.route('/')
    def hello_world():  # put application's code here
        return 'Hello World!'
    
    
    if __name__ == '__main__':
        app.run()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    如果成功连接会看到返回(1,)
    
    • 1

    7 数据库的增删改查

    7.1增加

    增加一行数据的步骤:

    1. 首先创建ORM对象;
    2. 将ORM对象添加到db.session中;
    3. 将db.session中的改变同步到数据库中。
    @app.route('/insert/')
    def insert():  # put application's code here
        user = User(username='mark', email='123456@qq.com')
        db.session.add(user)
        db.session.commit()
        return 'success'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    执行完代码之后就可以在Navicat Premium查看数据。
    
    • 1

    7.2 查找

    首先,在类User中加入打印的格式。

    # 用户表
    class User(db.Model):
        __tablename__ = 'user'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        username = db.Column(db.String(50), nullable=False, unique=True)
        email = db.Column(db.String(50), nullable=False, unique=True)
    
        def __repr__(self):  # 自定义 交互模式 & print() 的对象打印
            return "(%s, %s, %s)" % (self.id, self.username, self.email)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    加入一些测试用的数据。
    
    • 1
    @app.route('/init/')
    def init_data():  # put application's code here
        user1 = User(username='wang', email='wang@163.com')
        user2 = User(username='luyao', email='luyao@189.com')
        user3 = User(username='qiqi', email='qiqi@126.com')
        user4 = User(username='jiji', email='jiji@163.com')
        user5 = User(username='kaikai', email='kaikai@itheima.com')
        user6 = User(username='dahai', email='dahai@gmail.com')
        user7 = User(username='guishi', email='guishi@gmail.com')
        user8 = User(username='shenmo', email='shenmo@itheima.com')
        user9 = User(username='guijianchou', email='guijianchou@163.com')
        user10 = User(username='goujianchou', email='goujianchou@163.com')
    
        # 一次添加多条数据
        db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
        db.session.commit()
        return 'success'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    查询所有用户数据:
    
    • 1
    @app.route('/Search/')
    def search():  # put application's code here
        print(User.query.all())
        return f'find it!'
    
    • 1
    • 2
    • 3
    • 4

    :::info
    [(1, mark, 123456@qq.com), (3, wang, wang@163.com), (4, luyao, luyao@189.com), (5, qiqi, qiqi@126.com), (6, jiji, jiji@163.com), (7, kaikai, kaikai@itheima.com), (8, dahai, dahai@gmail.com), (9, guishi, guishi@gmail.com), (10, shenmo, shenmo@itheima.com), (11, guijianchou, guijianchou@163.com), (12, goujianchou, goujianchou@163.com)]
    :::
    查询有多少个用户:

    @app.route('/Search/')
    def search():  # put application's code here
        print(User.query.count())
        return f'find it!'
    
    • 1
    • 2
    • 3
    • 4

    :::info
    11
    :::
    查询第一个用户:

    @app.route('/Search/')
    def search():  # put application's code here
        print(User.query.first())
        return f'find it!'
    
    • 1
    • 2
    • 3
    • 4

    :::info
    (1, mark, 123456@qq.com)
    :::
    查询id为4的用户:

    	# 方式1: 根据id查询  返回模型对象/None
        User.query.get(4)
    
        # 方式2: 等值过滤器 关键字实参设置字段值  返回BaseQuery对象
        # BaseQuery对象可以续接其他过滤器/执行器  如 all/count/first等
        User.query.filter_by(id=4).all()
    
        # 方式3: 复杂过滤器  参数为比较运算/函数引用等  返回BaseQuery对象
        User.query.filter(User.id == 4).first()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    @app.route('/Search/')
    def search():  # put application's code here
        print(User.query.get(4))
        # print(User.query.filter_by(id=4).all())
        # print(User.query.filter(User.id == 4).first()User.query.get(4))
        return f'find it!'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    :::info
    (4, luyao, luyao@189.com)
    :::
    一些特殊的查找条件:

    # 查询名字结尾字符为g的所有用户[开始 / 包含]
    User.query.filter(User.name.endswith("g")).all()
    User.query.filter(User.name.startswith("w")).all()
    User.query.filter(User.name.contains("n")).all()
    User.query.filter(User.name.like("w%n%g")).all()  # 模糊查询
    
    # 查询名字和邮箱都以li开头的所有用户[2种方式]
    User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()
    from sqlalchemy import and_
    User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()
    
    # 查询age是25 或者 `email`以`itheima.com`结尾的所有用户
    from sqlalchemy import or_
    User.query.filter(or_(User.age==25, User.email.endswith("itheima.com"))).all()
    
    # 查询名字不等于wang的所有用户[2种方式]
    from sqlalchemy import not_
    User.query.filter(not_(User.name == 'wang')).all()
    User.query.filter(User.name != 'wang').all()
    
    # 查询id为[1, 3, 5, 7, 9]的用户
    User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()
    
    # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
    User.query.order_by(User.age, User.id.desc()).limit(5).all()
    
    # 查询年龄从小到大第2-5位的数据   2 3 4 5
    User.query.order_by(User.age).offset(1).limit(4).all()
    
    # 分页查询, 每页3个, 查询第2页的数据  paginate(页码, 每页条数)
    pn = User.query.paginate(2, 3)
    pn.pages 总页数  pn.page 当前页码 pn.items 当前页的数据  pn.total 总条数
    
    # 查询每个年龄的人数    select age, count(name) from t_user group by age  分组聚合
    from sqlalchemy import func
    data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
    for item in data:
        # print(item[0], item[1])
        print(item.age, item.count)  # 建议通过label()方法给字段起别名, 以属性方式获取数据
    
    
    # 只查询所有人的姓名和邮箱  优化查询   User.query.all()  # 相当于select *
    from sqlalchemy.orm import load_only
    data = User.query.options(load_only(User.name, User.email)).all()  # flask-sqlalchem的语法
    for item in data:
        print(item.name, item.email)
    
    data = db.session.query(User.name, User.email).all()  # sqlalchemy本体的语法
    for item in data:
        print(item.name, item.email)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    7.3 删除

    主要有两种方案:

    1. 先查询,再删除。
      • 对应SQL中的 先select, 再delete
    @app.route('/del/')
    def delete():
        # 方式1: 先查后删除
        user = User.query.filter(User.username == 'luyao').first()
        # 删除数据
        db.session.delete(user)
        # 提交会话 增删改都要提交会话
        db.session.commit()
    
        return "success"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 基于过滤条件的删除
      • 对应SQL中的 delete xx where xx = xx (也称为 delete子查询 )
    @app.route('/del/')
    def delete():
        User.query.filter(User.username == 'jiji').delete()
        # 提交会话 增删改都要提交会话
        db.session.commit()
        return "success"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    8 刷新数据

    @app.route('/init/')
    def init_data():  # put application's code here
        user1 = User(username='wang', email='wang@163.com')
        user2 = User(username='luyao', email='luyao@189.com')
        user3 = User(username='qiqi', email='qiqi@126.com')
        user4 = User(username='jiji', email='jiji@163.com')
        user5 = User(username='kaikai', email='kaikai@itheima.com')
        user6 = User(username='dahai', email='dahai@gmail.com')
        user7 = User(username='guishi', email='guishi@gmail.com')
        user8 = User(username='shenmo', email='shenmo@itheima.com')
        user9 = User(username='guijianchou', email='guijianchou@163.com')
        user10 = User(username='goujianchou', email='goujianchou@163.com')
    
        # 一次添加多条数据
        db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
        
        # 主动执行flush操作, 立即执行SQL操作(数据库同步)
        db.session.flush()
        
        db.session.commit()
        return 'success'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    9 多表查询

    首先,可以通过如下所示的方法,生成一对多的表示例:

    @app.route('/init/')
    def init_data():  # put application's code here
        user = User(username='huihui', email='huihui@163.com')
    
        # 一次添加多条数据
        db.session.add(user)
        db.session.flush()  # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败
    
        adr1 = Address(detail='中关村3号', user_id=user.id)
        adr2 = Address(detail='华强北5号', user_id=user.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
        return 'success'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    关联查询

    • 先查询主表数据
    • 再通过外键字段查询 关联的从表数据
    @app.route('/demo/')
    def demo():
        """查询多表数据  需求: 查询姓名为"张三"的所有地址信息"""
    
        # 1.先根据姓名查找到主表主键
        user1 = User.query.filter_by(username='huihui').first()
    
        # 2.再根据主键到从表查询关联地址
        adrs = Address.query.filter_by(user_id=user1.id).all()
        for adr in adrs:
            print(adr.detail)
    
        return "demo"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    关系属性
    关系属性是 sqlalchemy 封装的一套查询关联数据的语法, 其目的为 让开发者使用 面向对象的形式 方便快捷的获取关联数据。

    from flask import Flask
    from flask_migrate import Migrate
    from model import User, Address
    import configs
    from exts import db
    
    app = Flask(__name__)
    # 加载配置文件
    app.config.from_object(configs)
    # db绑定app
    db.init_app(app)
    migrate = Migrate(app, db)
    
    
    @app.route('/')
    def hello_world():  # put application's code here
        return 'Hello World!'
    
    
    # @app.route('/init/')
    # def init_data():  # put application's code here
    #     user1 = User(username='wang', email='wang@163.com')
    #     user2 = User(username='luyao', email='luyao@189.com')
    #     user3 = User(username='qiqi', email='qiqi@126.com')
    #     user4 = User(username='jiji', email='jiji@163.com')
    #     user5 = User(username='kaikai', email='kaikai@itheima.com')
    #     user6 = User(username='dahai', email='dahai@gmail.com')
    #     user7 = User(username='guishi', email='guishi@gmail.com')
    #     user8 = User(username='shenmo', email='shenmo@itheima.com')
    #     user9 = User(username='guijianchou', email='guijianchou@163.com')
    #     user10 = User(username='goujianchou', email='goujianchou@163.com')
    #
    #     # 一次添加多条数据
    #     db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
    #     db.session.commit()
    #     return 'success'
    
    
    @app.route('/init/')
    def init_data():  # put application's code here
        user = User(username='huihui', email='huihui@163.com')
    
        # 一次添加多条数据
        db.session.add(user)
        db.session.flush()  # 需要手动执行flush操作, 让主表生成主键, 否则外键关联失败
    
        adr1 = Address(detail='中关村3号', user_id=user.id)
        adr2 = Address(detail='华强北5号', user_id=user.id)
        db.session.add_all([adr1, adr2])
        db.session.commit()
        return 'success'
    
    
    @app.route('/demo/')
    def demo():
        """查询多表数据  需求: 查询姓名为"张三"的所有地址信息"""
        # 先根据姓名查找用户主键
        user1 = User.query.filter_by(username='huihui').first()
    
        # 3.使用关系属性获取关系数据
        for address in user1.addresses:
            print(address.detail)
    
        return "demo"
    
    
    @app.route('/Search/')
    def search():  # put application's code here
        print(User.query.get(4))
        return f'find it!'
    
    
    @app.route('/del/')
    def delete():
        User.query.filter(User.username == 'jiji').delete()
        # 提交会话 增删改都要提交会话
        db.session.commit()
        return "success"
    
    
    if __name__ == '__main__':
        app.run()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
  • 相关阅读:
    数据库系统原理与应用教程(059)—— MySQL 练习题:操作题 1-10(三)
    原生js html5 canvas制作flappy bird压扁小鸟游戏
    云计算【第一阶段(27)】DHCP原理与配置以及FTP的介绍
    【MySQL】数据库的存储过程与存储函数通关教程(完整版)
    10分钟部署一套开源表单系统
    Dubbo项目搭建(简单部署)
    国内交互智能平板品类首创者希沃联手倍市得,进一步蓄力品牌发展势能
    堆(堆排序和模拟堆)
    如何有效建立客户关系,提高复购率与客户的终生价值
    二维码智慧门牌管理系统:打造智慧城市之路
  • 原文地址:https://blog.csdn.net/weixin_61823031/article/details/127764800