• Sqlalchemy异步操作不完全指北


    异步SQLAlchemy

    SQLAlchemy作为一款通用的Python Orm工具,在最近的版本也支持了异步操作。但网上很多资料都不是很齐全,API也不是很好查询的情况下,我便有了整理一份基础文档的想法。文章主要会以CRUD为入口,解决大家最基本的需求。

    engine的区别

    在普通的SQLAlchemy中,建立engine对象,我们会采用下面的方式:

    复制代码
    • 1
    • 2
    from sqlalchemy import create_engine engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_recycle=1500)

    而异步的方式如下:

    复制代码
    • 1
    • 2
    from sqlalchemy.ext.asyncio import create_async_engine async_engine = create_async_engine(ASYNC_SQLALCHEMY_URI, pool_recycle=1500)

    session的区别

    我们一般用sessionmaker来建立session,不过异步的有点区别:

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import sessionmaker # 同步session Session = sessionmaker(engine) # 异步session 区别在于需要指定对应的class_ async_session = sessionmaker(async_engine, class_=AsyncSession)

    建立会话

    我们还是以代码的形式展示:

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    # 同步 with Session() as session: # 里面是具体的sql操作 pass # 异步 async with Session() as session: # 里面是异步的操作,区别就是从with变成了async with 也就意味着方法必须是async修饰的 pass

    以上是关于建立连接,处理会话的一些区别,接着我们讲对应的CRUD操作。

    查询

    这里依旧会给出新老版本的对比:

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    # 注意Session为同步Session,为了区分,异步session为async_session # model则为具体的Model类 # 异步查询方式 from sqlalchemy import select async def query(): async with async_session() as session: sql = select(model).where(model.id == 1) print(sql) # 这里可以打印出sql result = await session.execute(sql) # 第一条数据 data = result.scalars().first() # 所有数据 # data = result.scalars().all() # 同步查询方式一 def query(): with Session() as session: # 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None result = session.query(model).filter_by(id=1).first() # 查询所有数据 result对应的数据为List[model],即model数组 # result = session.query(model).filter_by(name="zhangsan").all() # 同步查询方式二 def query(): with Session() as session: # 查询id=1的第一条数据 result对应的就是model的实例 如果没有则是None result = session.query(model).filter(model.id == 1).first() # 查询所有数据 result对应的数据为List[model],即model数组 # result = session.query(model).filter(model.name == "zhangsan").all()

    新增

    这里开始就只讲异步的操作了。

    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    async def insert(data): async with async_session() as session: async with session.begin(): session.add(data) # 刷新自带的主键 await session.flush() # 释放这个data数据 session.expunge(data) return data

    先说一下session.begin,这个你可以理解为一个事务操作,当采用session的begin方法后,你可以发现我们不需要调用commit方法也能把修改存入数据库。

    expunge方法,是用例释放这个实例,SQLAlchemy有个特点,当你的session会话结束以后,它会销毁你插入的这种临时数据,你再想访问这个data就访问不了了。所以我们可以释放这个数据。(expunge的作用)

    编辑

    一般编辑有2种方式:

    • 查询出对应的数据,在数据上修改
    • 根据key-value的形式,修改对应数据的字段
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    from sqlalchemy import select, update # 方式一 async def update_record(model): async with async_session() as session: async with session.begin(): result = await session.execute(select(model).where(id=1)) now = result.scalars().first() if now is None: raise Exception("记录不存在") now.name = "李四" now.age = 23 # 这里测试过,如果去掉flush会导致数据不更新 await session.flush() session.expunge(now) return now # 方式二 async def update_by_map(): async with async_session() as session: async with session.begin(): # 更新id为1的数据,并把name改为李四 age改为23 sql = update(model).where(model.id == 1).values(name="李四", age=23) await session.execute(sql)

    删除

    删除的话,软删除大家都是update,所以不需要多说,物理删除的话,也有两种方式:

    • 查到以后删除之
    • 直接根据条件删除(这种我没有仔细研究,我选的是第一种方式,容错率高点)
    复制代码
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    async def delete_by_id(): async with async_session() as session: async with session.begin(): result = await session.execute(select(model).where(model.id == 2)) original = result.scalars().first() if original is None: raise Exception("记录不存在") # 如果是多条 # session.delete(original) # for item in result: # session.delete(item)

    今天的异步内容就整理到这里,我个人觉得还是很实用的,希望对大家有帮助~~~

  • 相关阅读:
    b 树和 b+树的理解
    【华为机试 Python实现】图的遍历
    【UE】在游戏运行时,通过选择uasset来生成静态网格体
    龙芯loongarch64服务器安装Rustup,解决“error: can‘t find Rust compiler”
    VA01/VA02/VA03 销售订单根据定价和步骤校验权限隐藏价格(二)
    神经内分泌肿瘤如何分级,神经系统分级调节概念
    HTML-CSS练习例子
    拉新、复购、供应链,双11中小商家“三难”如何破?
    WiFi网络分析工具Airtool for Mac
    Java 接口
  • 原文地址:https://www.cnblogs.com/we8fans/p/16126577.html