• python模块之 aiomysql 异步mysql


    mysql安装教程
    mysql语法大全
    python 模块pymysql模块,连接mysql数据库

    一、介绍

    aiomysql 是一个基于 asyncio 的异步 MySQL 客户端库,用于在 Python 中与 MySQL 数据库进行交互。它提供了异步的数据库连接和查询操作,适用于异步编程环境

    1. 异步支持:aiomysql 是基于 asyncio 实现的,可以与 asyncio 框架无缝集成,充分利用异步编程的优势,提高应用程序的性能和并发能力。
    2. 高性能:aiomysql 使用了底层的 PyMySQL 库,通过异步操作和连接池技术,可以实现高性能的数据库访问。
    3. 支持事务:aiomysql 提供了事务管理的支持,可以执行原子性的数据库操作,保证数据的一致性。
    4. SQL 语句构建器:aiomysql 提供了一个 SQL 语句构建器,可以方便地构建和执行 SQL 查询。
    二、安装
    pip install aiomysql
    
    • 1
    三、方法及属性
    1. aiomysql.connect():连接mysql,conn = await aiomysql.connect(host=‘localhost’, port=3307, user=‘root’, password=‘root’)
    • host:MySQL 服务器的主机名或 IP 地址。
    • port:MySQL 服务器的端口号,默认为 3306。
    • user:连接 MySQL 服务器的用户名。
    • password:连接 MySQL 服务器的密码。
    • db:连接的数据库名称。
    • unix_socket:UNIX 域套接字路径。如果指定了 unix_socket,则会忽略 host 和 port 参数。
    • charset:连接的字符集,默认为 “utf8mb4”。
    • autocommit:自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。
    • connect_timeout:连接超时时间(以秒为单位),默认为 10 秒。
    • maxsize:连接池中的最大连接数,默认为 10。
    • minsize:连接池中的最小连接数,默认为 1。
    • ssl:SSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如 ssl={‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’}。
    • loop:事件循环对象。如果不指定,将使用默认的事件循环。
    • pool_recycle:连接池中连接的回收时间(以秒为单位)。当连接在连接池中的时间超过 pool_recycle 时,连接将被回收并重新创建。
    • echo:是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。
    • cursorclass:游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。
    • server_public_key:服务器公钥,用于 SSL/TLS 连接的服务器验证。
    1. aiomysql.create_pool():创建一个连接池。该方法用于创建一个异步连接池,接受一系列的连接参数,例如数据库主机、端口、用户名、密码等。pool = await aiomysql.create_poo(host=‘localhost’, port=3307, user=‘root’, password=‘root’)
    • host:MySQL 服务器的主机名或 IP 地址。
    • port:MySQL 服务器的端口号,默认为 3306。
    • user:连接 MySQL 服务器的用户名。
    • password:连接 MySQL 服务器的密码。
    • db:连接的数据库名称。
    • unix_socket:UNIX 域套接字路径。如果指定了 unix_socket,则
      忽略 host 和 port 参数。
    • charset:连接的字符集,默认为 “utf8mb4”。
    • autocommit:自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。
    • minsize:连接池中的最小连接数,默认为 1。
    • maxsize:连接池中的最大连接数,默认为 10。
    • connect_timeout:连接超时时间(以秒为单位),默认为 10 秒。
    • pool_recycle:连接池中连接的回收时间(以秒为单位)。当连接在连接池中的时间超过 pool_recycle 时,连接将被回收并重新创建。
    • ssl:SSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如 ssl={‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’}。
    • loop:事件循环对象。如果不指定,将使用默认的事件循环。
    • echo:是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。
    • cursorclass:游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。
    • server_public_key:服务器公钥,用于 SSL/TLS 连接的服务器验证
    1. pool.acquire():从连接池中获取一个连接。该方法用于从连接池中获取一个异步连接对象,可以用于执行数据库操作。coon = pool.acquire()
    2. pool.release(coon):释放一个连接。该方法用于将一个连接对象返回到连接池中,以便其他代码可以继续使用。
    3. pool.close() :用于关闭连接池。调用 close() 方法后,连接池将不再接受新的连接请求,并开始关闭池中的所有连接。
    4. pool.wait_closed():用于等待连接池中的所有连接关闭。在调用 close() 方法后,应该调用 wait_closed() 方法来确保所有连接都已关闭,然后才能结束程序。
    5. pool.size:表示连接池的当前大小,即池中当前可用的连接数。
    6. pool.maxsize:表示连接池的最大连接数。当连接池中的连接数达到最大值时,新的连接请求将被阻塞,直到有连接被释放。
    7. pool.minsize:表示连接池的最小连接数。连接池在初始化时会创建最小连接数的连接,并保持这些连接处于活动状态。
    8. conn.close():关闭连接。该方法用于关闭连接,释放资源。
    9. conn.host:MySQL 服务器的主机名或 IP 地址。
    10. conn.port:MySQL 服务器的端口号。
    11. conn.user:连接 MySQL 服务器的用户名。
    12. conn.db:当前连接的数据库名称。
    13. conn.server_status:MySQL 服务器的状态信息。
    14. conn.server_capabilities:MySQL 服务器的功能列表。
    15. conn.client_flag:客户端标志。
    16. conn.insert_id:最近插入行的自增 ID。
    17. conn.warning_count:最近一次执行的 SQL 语句产生的警告数量。
    18. conn.errorhandler:错误处理器。
    19. conn.autocommit:自动提交模式。该属性用于设置是否开启自动提交模式,默认为 False。
    20. conn.charset:字符集。该属性用于设置数据库连接的字符集,默认为 “utf8mb4”。
    21. conn.maxsize:最大连接数。该属性用于设置连接池中的最大连接数,默认为 10。
    22. conn.minsize:最小连接数。该属性用于设置连接池中的最小连接数,默认为 1。
    23. conn.timeout:连接超时时间。该属性用于设置连接的超时时间,默认为 10 秒。
    24. conn.echo:是否打印 SQL 语句。该属性用于设置是否在控制台打印执行的 SQL 语句,默认为 False。
    25. conn.loop:事件循环对象。该属性用于设置要使用的事件循环对象。
    26. conn.connection:当前连接对象。该属性用于获取当前的连接对象。
    27. conn.server_version:MySQL 服务器版本。该属性用于获取 MySQL 服务器的版本信息。
    28. conn.commit():提交事务。该方法用于提交当前事务的操作。
    29. conn.begin():提开始一个事务。
    30. conn.ping():检查连接是否存活。
    31. conn.select_db(dbName):切换数据库
    32. conn.escape_string(string):对字符串进行 MySQL 转义。
    33. conn.rollback():回滚事务。该方法用于回滚当前事务的操作。
    34. conn.cursor():创建一个游标对象。该方法用于创建一个异步游标对象,用于执行 SQL 查询和操作。cursor=conn.cursor()
    35. cursor.close():关闭游标对象
    36. cursor.execute(sql, args=None):执行 SQL 语句。该方法用于执行 SQL 语句,接受 SQL 语句字符串和参数,可以执行查询、插入、更新等操作。
    37. cursor.executemany(sql, args=None):执行多个 SQL 语句。该方法用于执行多个相同结构的 SQL 语句,接受 SQL 语句字符串和参数列表。
    38. cursor.fetchone():获取一条查询结果。该方法用于获取查询结果的下一行数据。
    39. cursor.fetchall():获取所有查询结果。该方法用于获取查询结果的所有行数据。
    40. cursor.fetchmany(size=None):获取查询结果的多行。
    41. cursor.rowcount:最近一次执行的 SQL 语句影响的行数。
    42. cursor.lastrowid:最近插入行的自增 ID。
    43. cursor.description:查询结果的字段描述信息。
    44. cursor.rownumber:当前行在查询结果中的索引。
    45. cursor.arraysize:获取或设置从数据库获取的行数。
    四、案例
    1. 基本使用 aiomysql.connect()
      import aiomysql
      import asyncio
      
      
      async def main():
          conn = await aiomysql.connect(host='localhost', port=3306, user='root', password='root',db='job')
          cursor = await conn.cursor()
          await cursor.execute('SELECT * FROM baidu_job;')
          result = await cursor.fetchall()
          print(result)
          await cursor.close()
          conn.close()
      
      
      asyncio.run(main())
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
    2. 连接池 aiomysql.create_pool(),建议使用连接池
      import aiomysql
      import asyncio
      
      async def main():
          pool = await aiomysql.create_pool(host='localhost', port=3306, user='root', password='root',db='job')
          async with pool.acquire() as conn:
              async with conn.cursor() as cursor:
                  await cursor.execute('SELECT * FROM baidu_job;')
                  result = await cursor.fetchall()
                  print(result)
      
      asyncio.run(main())
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    3. 连接池爬取汽车之家数据
      '''
      网址:https://www.che168.com/china/a0_0msdgscncgpi1ltocsp7exf4x0/?pvareaid=102179#currengpostion
      '''
      
      import asyncio
      import aiohttp
      import aiomysql
      from lxml import etree
      import random
      
      
      class Car:
          url = {
              'car_list': 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion',
              'car_detail': 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
          }
      
          headers = {
              'authority': 'www.che168.com',
              'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
              'accept-language': 'zh-CN,zh;q=0.9',
              'cache-control': 'no-cache',
              'pragma': 'no-cache',
              'referer': 'https://www.che168.com/dealer/481320/48787661.html?pvareaid=100519&userpid=0&usercid=0&offertype=&offertag=0&activitycartype=0&fromsxmlist=0',
              'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
              'sec-ch-ua-mobile': '?0',
              'sec-ch-ua-platform': '"Windows"',
              'sec-fetch-dest': 'document',
              'sec-fetch-mode': 'navigate',
              'sec-fetch-site': 'same-origin',
              'sec-fetch-user': '?1',
              'upgrade-insecure-requests': '1',
              'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
          }
      
          def __init__(self):
              self.pool = None
              self.request = None
      
          '''获取汽车列表'''
          async def get_car_list(self, page):
              print(self.url['car_list'].format(page))
              response = await self.request.get(self.url['car_list'].format(page))
              result = await response.text(encoding='GBK')
              return etree.HTML(result)
      
          '''处理汽车列表信息'''
          async def parse_car_list_info(self, html):
              car_list_html = html.xpath('//div[@id="goodStartSolrQuotePriceCore0"]//li[@name="lazyloadcpc"]')
              task_list = []
              for item in car_list_html:
                  car_info = {
                      'specid': item.xpath('./@specid')[0],
                      'infoid': item.xpath('./@infoid')[0]
                  }
                  task = asyncio.create_task(self.get_car_detail(car_info))
                  task_list.append(task)
              await asyncio.wait(task_list)
      
          '''获取第一页数据,并返回总页数'''
          async def get_first_page(self):
              html = await self.get_car_list(1)
              page_total = html.xpath('//div[@id="listpagination"]/a[last()-1]/text()')[0]
              await self.parse_car_list_info(html)
              return int(page_total)
      
          '''获取除第一页数据之外的其他页数据'''
          async def get_all_page(self,page):
              async with self.semaphore:
                  await asyncio.sleep(random.randint(500, 800) / 1000)
                  html = await self.get_car_list(page)
                  await self.parse_car_list_info(html)
      
          '''获取汽车详情'''
          async def get_car_detail(self, car_info):
              response = await self.request.get(self.url['car_detail'].format(car_info['specid']))
              result = await response.json()
              detail = result['result']['paramtypeitems']
              car_detail = {
                  'specid': car_info['specid'],
                  'infoid': car_info['infoid'],
                  'name': detail[0]['paramitems'][0]['value'],
                  'price': detail[0]['paramitems'][1]['value'],
                  'manufacturer': detail[0]['paramitems'][2]['value'],
                  'level': detail[0]['paramitems'][3]['value'],
                  'length': f'{detail[1]["paramitems"][0]["value"]}mm',
                  'width': f'{detail[1]["paramitems"][1]["value"]}mm',
                  'height':f'{detail[1]["paramitems"][2]["value"]}mm',
              }
              await self.insert_table(car_detail)
      
          '''异步建立mysql表'''
          async def create_table(self):
              sql = '''
              CREATE TABLE IF NOT EXISTS qichezhijia(
              Id INT UNIQUE,
              Specid INT,
              Name VARCHAR(255),
              Price VARCHAR(10),
              Manufacturer VARCHAR(255),
              Level VARCHAR(50),
              Length VARCHAR(10),
              Width VARCHAR(10),
              Height VARCHAR(10),
              PRIMARY KEY(Id)
              )
              '''
              async with self.pool.acquire() as conn:
                  async with conn.cursor() as cursor:
                      try:
                          await cursor.execute(sql)
                      except Exception as e:
                          print(f'创建表失败{e}')
      
          '''插入数据'''
          async def insert_table(self,car_detail):
              sql = '''
              INSERT INTO qichezhijia VALUES(%(infoid)s,%(specid)s,%(name)s,%(price)s,%(manufacturer)s,%(level)s,%(length)s,%(width)s,%(height)s)
              '''
              async with self.pool.acquire() as conn:
                  async with conn.cursor() as cursor:
                      try:
                          await cursor.execute(sql, car_detail)
                          await conn.commit()
                          print('数据插入成功')
                      except Exception as e:
                          print(f'插入数据失败{e},infoid={car_detail["infoid"]}')
      
          '''程序运行主函数'''
          async def main(self):
              async with aiomysql.create_pool(host='localhost', port=3306, user='root', password='root', db='car') as pool:
                  self.pool = pool
                  await self.create_table()
                  async with aiohttp.ClientSession(headers=self.headers) as request:
                      self.request = request
                      page_total = await self.get_first_page()
                      self.semaphore = asyncio.Semaphore(3)
                      task_list = []
                      for page in range(2,page_total+1):
                          task = asyncio.create_task(self.get_all_page(page))
                          task_list.append(task)
                      await asyncio.wait(task_list)
      
      
      if __name__ == '__main__':
          car = Car()
          asyncio.run(car.main())
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
  • 相关阅读:
    python自动化测试selenium(三)下拉选择框、警告框处理、页面截图
    ChatGLM 大模型应用构建 & Prompt 工程
    Python入门教程 | Python 模块和包
    猿如意|IntelliJ IDEA Community下载安装以及基础开发设置和快捷键设置的详细教程
    Windows下Git Bash的基本使用
    Android app专项测试之耗电量测试
    Vue路由的使用及node.js下载安装和环境搭建
    对双STA-双连接的一些思考
    『忘了再学』Shell基础 — 19、使用declare命令声明变量类型
    APP广告变现策略:如何实现盈利与用户体验的平衡?
  • 原文地址:https://blog.csdn.net/randy521520/article/details/132713887