from sqlalchemy.orm.query import Query
from sqlalchemy import text
# 查询User的所有字段,并按照id升序排序
query_obj = session.query(User).order_by(text("id asc"))
# 获取query对象中的数据
# 1. 遍历 获取每一个(记录)对象
# 2. query_obj.first()/.all()/.one()/.one_or_none() 获取对象
#
# 查询name, fullname字段
query_obj = session.query(User.name, User.fullname)
# 对于query_obj 可遍历、也可直接调用获取数据对象的方法
# 结果[('tom', '李四'), ('jack', '张三')]
# 查询User所有字段 + name字段
query_obj = session.query(User, User.name).all()
# 元组列表 [(tom, 'tom'), (jack, 'jack')]
# 元组第一个元素为User对象,第二个为name值
# 为字段指定别名 相当于select name as user_name from user_t;
query_obj = session.query(User.name.label("user_name")).all()
# 为模型类 指定别名
from sqlalchemy.orm import aliased
user_alias = aliased(User, name="user_alias")
# 根据别名查询
query_obj = session.query(user_alias).all()
# 查询User的所有字段的数据,按照id降序排序,并分页获取第一条
result = session.query(User).order_by(-User.id)[:1] # 分片 分页
# 按照id升序排序,并分页获取第二条数据
result2 = session.query(User).order_by(User.id).offset(1).limit(1).all()
# 等值过滤
session.query(User.name).filter_by(fullname="Ed Jones")
# 复杂过滤
session.query(User.name).filter(User.fullname == "Ed Jones")
# 链式过滤
session.query(User).filter(User.name == "ed").filter(User.fullname == "Ed Jones")
# __eq__()
user = session.query(User).filter(User.id==1).one()
# 等价于
user = session.query(User).filter(User.id.__eq__(1)).one()
# __ne__ 类似的用法
# like 模糊查询(区分大小写) % 多个字符通配符 ilike 不分大小写
user_list = session.query(User).filter(User.name.like("lau%")).all()
# in_() 列表范围查询/query对象 not_in()
query.filter(User.name.in_(["ed", "wendy", "jack"])).all()
# 嵌套查询
query.filter(User.name.in_(session.query(User.name).filter(User.name.like("%ed%"))))
# use tuple_() for 复合查询, 多列一起 in_()
from sqlalchemy import tuple_
query.filter(
tuple_(User.name, User.nickname).in_([("ed", "edsnickname"), ("wendy", "windy")])).all()
# is_() is_not()
query.filter(User.name == None)
# 等价于
query.filter(User.name.is_(None))
# use and_() 多个条件与
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
# 等价于
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
# 链式
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
# 多个条件或
from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))
from sqlalchemy import text
# 查询出 id < 3 的所有数据,并按照id 升序排序
session.query(User).filter(User.id<3).order_by(User.id).all()
# text sql
session.query(User).filter(text("id<3")).order_by(text("id asc")).all()
# 参数化的文本sql
# 查询id > 3 且 name like lau% 的所有数据
session.query(User).filter(text("id<:id and name ilike :name")).params(
id=3, name="lau%").order_by(-User.id).all()
# 完整的text sql -> from_statement() 后面不能再跟条件
session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name="ed").all()
# text sql 指定字段
stmt = text("SELECT name, id, fullname, nickname FROM users where name=:name")
stmt = stmt.columns(User.name, User.id, User.fullname, User.nickname)
session.query(User).from_statement(stmt).params(name="ed").all()
6. 聚合查询
# 统计查询的条数 query.count()
session.query(User).filter(User.name.like("%ed")).count()
# 聚合函数
from sqlalchemy import func
# 分组查询,label指定别名
r = session.query(func.count(User.sex).label("num"), User.sex.label("user_sex")).group_by(User.sex).all()
# 统计行数
session.query(func.count("*")).select_from(User).scalar() # scalar() 获取第一行的第一列的值
pass
# 创建User对象
jack = User(name="jack", fullname="Jack Bean", sex="male")
jack.addresses
# []
# 添加地址
jack.addresses.append(Address(email_address="北京天安门"))
jack.addresses.append(Address(email_address="山东济南"))
# 一旦添加一个地址对象,用户、地址对象就建立双向关系
address0 = jack.addresses[0]
address0.user # 可以获取 jack 用户对象
# 只需将jack对象,添加到数据库中,地址自动添加
session.add(jack)
session.commit()
result = session.query(User, Address)
.filter(User.id == Address.user_id) # 字段等值过滤 连接
.all()
# [(jack, 北京), (jack, 河南), (tom, 武汉)]
# 对应sql
SELECT user_t.id AS u_id,
user_t.name AS u_name,
user_t.sex AS u_sex,
address_t.id AS addr_id,
address_t.email_address AS addr_email_address,
address_t.user_id AS addr_user_id
FROM user_t, address_t
WHERE user_t.id = address_t.user_id
AND address.email_address = ?
# 有一个外键的情况下
session.query(User) # 仅查询User中的数据
.join(Address) # 自动根据外键连接
.filter(Address.email_address == "北京")
.all()
# [jack]
session.query(User, Address) # 仅查询User中的数据
.join(Address) # 自动根据外键连接
.filter(Address.email_address == "北京")
.all()
# [(jack, 北京)]
# 有多个外键或者无外键, 使用关系连接
result = session.query(User, Address) # 查询两张表,返回Query对象
.join(User.addresses) # 通过User.addresses 关系 连接两张表
.filter(Address.email_address.match("北京")) # query对象过滤 包含‘北京’
.all() # 获取所有的数据
# [(jack, 北京)] 两个对象的元组 列表
r = session.query(User, Address)
.join(User.addresses.and_(~Address.title.match("北京"))) # 指定关系连接,同时指定过滤条件,地址不包含‘北京’
.all()
# [(jack, 河南), (tom, 武汉)]
query = session.query(User, Address) # 查询多个表
.select_from(Address) # Address作为起始表 开始连接
.join(Address.user) # 关系连接
.filter(User.name.match("jack")) # 条件过滤
.all()
# [(jack, 北京), (jack, 河南)]
# 左外连接
result = session.query(User, Address)
.outerjoin(User.addresses) # 左连接 以左边表的所有记录为主,可多 不可少
.all()
# [(jack, 北京), (jack, 河南), (tom, 武汉), (lauf, None), (LAUF, None), (laufing, None)]
from sqlalchemy.orm import aliased
add1 = aliased(Address, name="add1")
add2 = aliased(Address, name="add2")
result = session.query(User, add1)
.join(add1, User.addresses) # 连接add1 相当于 join(User.addresses.of_type(add1))
.join(add2, User.addresses) # 连接add2
.all()
# 查询每个用户有几个地址 sql
select user_t.id, sub_t.addr_count from user_t left outer join
SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN
(SELECT user_id, count(*) AS address_count
FROM addresses GROUP BY user_id) AS adr_count
ON users.id=adr_count.user_id