• Python协程(asyncio)(三)异步应用对象


    Python实用教程_spiritx的博客-CSDN博客

    流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。

     asyncio 函数可以用来创建和处理异步网络流。

    1. import asyncio
    2. async def tcp_echo_client(message):
    3. reader, writer = await asyncio.open_connection(
    4. '127.0.0.1', 8888)
    5. print(f'Send: {message!r}')
    6. writer.write(message.encode())
    7. await writer.drain()
    8. data = await reader.read(100)
    9. print(f'Received: {data.decode()!r}')
    10. print('Close the connection')
    11. writer.close()
    12. await writer.wait_closed()
    13. asyncio.run(tcp_echo_client('Hello World!'))

    高级异步函数和对象

    异步网络

    打开异步连接

    语法为:

    coroutine asyncio.open_connection(host=Noneport=None*limit=Nonessl=Nonefamily=0proto=0flags=0sock=Nonelocal_addr=Noneserver_hostname=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonehappy_eyeballs_delay=Noneinterleave=None)

    asyncio.open_connection()是一个协程,用来建立网络连接并返回一对 (reader, writer) 对象。返回的 readerwriter 对象是 StreamReader 和 StreamWriter 类的实例。

    参数说明:

    • host:待连接的服务器地址。host 形参可被设为几种类型,它确定了服务器所应监听的位置:
      • 如果 host 是一个字符串,则 TCP 服务器会被绑定到 host 所指明的单一网络接口。
      • 如果 host 是一个字符串序列,则 TCP 服务器会被绑定到序列所指明的所有网络接口。
      • 如果 host 是一个空字符串或 None,则会应用所有接口并将返回包含多个套接字的列表(通常是一个 IPv4 的加一个 IPv6 的)。
    • port:待连接的服务器端口
    • limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。
    • ssl:如果给定该参数且不为假值,则会创建一个 SSL/TLS 传输(默认创建一个纯 TCP 传输)。 如果 ssl 是一个 ssl.SSLContext 对象,则会使用此上下文来创建传输对象;如果 ssl 为 True,则会使用从 ssl.create_default_context() 返回的默认上下文。
    • family:协议簇,一般为AF_INET or AF_INET6 
    • proto:协议
    • flags:标志,它们会被传递给 getaddrinfo() 来对 host 进行解析。如果要指定的话,这些都应该是来自于 socket 模块的对应常量。
    • sock:如果给出 sock,它应当是一个已存在、已连接并将被传输所使用的 socket.socket 对象。 如果给出了 sock,则 host, port, family, proto, flags, happy_eyeballs_delay, interleave 和 local_addr 都不应当被指定。
    • local_addr:如果给出 local_addr,它应当是一个用来在本地绑定套接字的 (local_host, local_port) 元组。 local_host 和 local_port 会使用 getaddrinfo() 来查找,这与 host 和 port 类似。
    • server_hostname:设置或重载目标服务器的证书将要匹配的主机名。 应当只在 ssl 不为 None 时传入。 默认情况下会使用 host 参数的值。 如果 host 为空那就没有默认值,你必须为 server_hostname 传入一个值。 如果 server_hostname 为空字符串,则主机名匹配会被禁用(这是一个严重的安全风险,使得潜在的中间人攻击成为可能)。
    • ssl_handshake_timeout:用于 TLS 连接的)在放弃连接之前要等待 TLS 握手完成的秒数。 如果参数为 None 则使用 (默认的) 60.0
    • ssl_shutdown_timeout:在SSL关闭之前等待连接完成处理的秒数。
    • happy_eyeballs_delay:如果给出 happy_eyeballs_delay,它将为此链接启用 Happy Eyeballs。 该函数应当为一个表示在开始下一个并行尝试之前要等待连接尝试完成的秒数的浮点数。 这也就是在 RFC 8305 中定义的 "连接尝试延迟"。 该 RFC 所推荐的合理默认值为 0.25 (250 毫秒)。
    • interleave:控制当主机名解析为多个 IP 地址时的地址重排序。 如果该参数为 0 或未指定,则不会进行重排序,这些地址会按 getaddrinfo() 所返回的顺序进行尝试。 如果指定了一个正整数,这些地址会按地址族交错排列,而指定的整数会被解读为 RFC 8305 所定义的 "首个地址族计数"。 如果 happy_eyeballs_delay 未指定则默认值为 0,否则为 1。

    使用asyncio.open_connection()我们依然需要像使用回调模式中那样,建立socket连接 > 设置为非阻塞IO > register到epoll/select中监听其IO状态,这在asyncio.open_connection()都实现了。

    reader, writer = await asyncio.open_connection(host, 80)

     启动异步网络服务

    语法为:

    coroutine asyncio.start_server(client_connected_cbhost=Noneport=None*limit=Nonefamily=socket.AF_UNSPECflags=socket.AI_PASSIVEsock=Nonebacklog=100ssl=Nonereuse_address=Nonereuse_port=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonestart_serving=True)

    参数说明:

    • client_connected_cb :当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task 被调度。
    • host:待连接的服务器地址。host 形参可被设为几种类型,它确定了服务器所应监听的位置:
      • 如果 host 是一个字符串,则 TCP 服务器会被绑定到 host 所指明的单一网络接口。
      • 如果 host 是一个字符串序列,则 TCP 服务器会被绑定到序列所指明的所有网络接口。
      • 如果 host 是一个空字符串或 None,则会应用所有接口并将返回包含多个套接字的列表(通常是一个 IPv4 的加一个 IPv6 的)。
    • port:监听端口。如果为 0 或者 None (默认),将选择一个随机的未使用的端口(注意,如果 host 解析到多个网络接口,将为每个接口选择一个不同的随机端口)。
    • limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。
    • family:协议簇,一般为AF_INET or AF_INET6 
    • proto:协议
    • flags:标志,它们会被传递给 getaddrinfo() 来对 host 进行解析。如果要指定的话,这些都应该是来自于 socket 模块的对应常量。
    • sock:预先存在的套接字对象。 如果指定了此参数,则不可再指定 host 和 port
    • backlog :传递给 listen() 的最大排队连接的数量(默认为100)。
    • ssl: 可被设置为一个 SSLContext 实例以在所接受的连接上启用 TLS。
    • reuse_address 告知内核要重用一个处于 TIME_WAIT 状态的本地套接字,而不是等待其自然超时失效。 如果未指定此参数则在 Unix 上将自动设置为 True
    • reuse_port 告知内核,只要在创建的时候都设置了这个标志,就允许此端点绑定到其它端点列表所绑定的同样的端口上。这个选项在 Windows 上是不支持的。
    • ssl_handshake_timeout 是(用于 TLS 服务器的)在放弃连接之前要等待 TLS 握手完成的秒数。 如果参数为 (默认值) None 则为 60.0 秒。
    • ssl_shutdown_timeout:在SSL关闭之前等待连接完成处理的秒数。
    • start_serving 设置成 True (默认值) 会导致创建server并立即开始接受连接。设置成 False ,用户需要等待 Server.start_serving() 或者 Server.serve_forever() 以使server开始接受连接。

    返回一个 Server 对象。

     Server对象

    class asyncio.Server

    Server 对象是异步上下文管理器。当用于 async with 语句时,异步上下文管理器可以确保 Server 对象被关闭,并且在 async with 语句完成后,不接受新的连接。

    1. srv = await loop.create_server(...)
    2. async with srv:
    3. # some code
    4. # At this point, srv is closed and no longer accepts new connections.

    Server 对象的主要方法和属性:

    方法和属性名说明
    coroutine start_serving()启动server,开始接受连接。
    coroutine serve_forever()启动server,开始接受连接,直到协程被取消。 serve_forever 任务的取消将导致服务器被关闭。如果服务器已经在接受连接了,这个方法可以被调用。每个 Server 对象,仅能有一个 serve_forever 任务。
    is_serving()如果服务器正在接受新连接的状态,返回 True 。
    close()停止服务:关闭监听的套接字并且设置 sockets 属性为 None 。用于表示已经连进来的客户端连接会保持打开的状态。
    服务器是被异步关闭的,使用 wait_closed() 协程来等待服务器关闭。
    coroutine wait_closed()等待 close() 方法执行完毕。
    get_loop()返回与服务器对象相关联的事件循环。
    sockets返回正在监听的socket对象,类型为asyncio.trsock.TransportSocket

    1. import asyncio
    2. async def handle_echo(reader, writer):
    3. data = await reader.read(100)
    4. message = data.decode()
    5. addr = writer.get_extra_info('peername')
    6. print(f"Received {message!r} from {addr!r}")
    7. print(f"Send: {message!r}")
    8. writer.write(data)
    9. await writer.drain()
    10. print("Close the connection")
    11. writer.close()
    12. await writer.wait_closed()
    13. async def main():
    14. server = await asyncio.start_server(
    15. handle_echo, '127.0.0.1', 8888)
    16. addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    17. print(f'Serving on {addrs}')
    18. async with server:
    19. await server.serve_forever()
    20. asyncio.run(main())
    1. async def client_connected(reader, writer):
    2. # Communicate with the client with
    3. # reader/writer streams. For example:
    4. await reader.readline()
    5. async def main(host, port):
    6. srv = await asyncio.start_server(
    7. client_connected, host, port)
    8. await srv.serve_forever()
    9. asyncio.run(main('127.0.0.1', 0))

    异步UNIX域连接

    打开异步UNIX域连接

    语法为:

    coroutine asyncio.open_unix_connection(path=None*limit=Nonessl=Nonesock=Noneserver_hostname=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=None)

    建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

    与 open_connection() 相似,但是是在 Unix 套接字上的操作。

    参数说明:

    • path: 所要求的 Unix 域套接字的名字,除非指定了 sock 形参。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。
    • 其他参数与open_connection()一样。

    启动UNIX域连接服务

    语法为:

    coroutine asyncio.start_unix_server(client_connected_cbpath=None*limit=Nonesock=Nonebacklog=100ssl=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonestart_serving=True)

    启动一个 Unix 套接字服务。

    与 start_server() 相似,但是是在 Unix 套接字上的操作。

    参数与start_server()一样。

    异步流

    StreamReader

    用于读取数据的流对象。

    class asyncio.StreamReader

    对象支持asynchronous iterable(异步可迭代对象,一个可以在 async for 语句中使用的对象。 必须通过它的 __aiter__() 方法返回一个 asynchronous iterator)。

    不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 对象。

    主要的方法和属性:

    方法和属性名说明
    coroutine read(n=- 1)从流中读取n个bytes。如果n未提供或为-1,读到EOF,返回所有的bytes;如果n为0,返回空的bytes对象;如果n为正整数,返回最多n个bytes,如果到达EOF,提前返回。
    coroutine readline()读取一行,其中“行”指的是以 \n 结尾的字节序列。
    如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。
    如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。
    coroutine readexactly(n)精确读取 n 个 bytes,不会超过也不能少于。如果在读取完 n 个byte之前读取到EOF,则会引发 IncompleteReadError 异常。使用 IncompleteReadError.partial 属性来获取到达流结束之前读取的 bytes 字符串。
    coroutine readuntil(separator=b'\n')从流中读取数据直至遇到 separator
    成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。
    如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError 异常,数据将留在内部缓冲区中并可以再次读取。
    如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError 异常,并重置内部缓冲区。 IncompleteReadError.partial 属性可能包含指定separator的一部分。
    at_eof()如果缓冲区为空并且 feed_eof() 被调用,则返回 True 

    StreamWriter

    用于写入数据的流对象

    class asyncio.StreamWriter

    这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。
    不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。

    主要方法和属性:

    属性和方法名说明
    write(data)

    此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

    此方法应当与 drain() 方法一起使用:

    stream.write(data)
    await stream.drain()
    coroutine drain()等待直到可以适当地恢复写入到流。这是一个与下层的 IO 写缓冲区进行交互的流程控制方法。 当缓冲区大小达到最高水位(最大上限)时,drain() 会阻塞直到缓冲区大小减少至最低水位以便恢复写入。 当没有要等待的数据时,drain() 会立即返回。
    writelines(data)

    此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

    此方法应当与 drain() 方法一起使用:

    stream.writelines(lines)
    await stream.drain()
    close()

    关闭流以及下层的套接字,建议和wait_closed()一起使用:

    stream.close()
    await stream.wait_closed()
    coroutine wait_closed()等待直到流被关闭。
    can_write_eof()如果下层的传输支持 write_eof() 方法则返回True,否则返回 False。
    write_eof()在已缓冲的写入数据被刷新后关闭流的写入端。
    transport返回下层的 asyncio 传输
    get_extra_info(namedefault=None)访问可选的传输信息
    coroutine start_tls(sslcontext\*server_hostname=Nonessl_handshake_timeout=None)升级存在的流为TLS,参数与open_connection中的含义一致
    is_closing()如果流已被关闭或正在被关闭则返回 True

    异步子进程

    创建异步子进程

    语法为:

    coroutine asyncio.create_subprocess_exec(program*argsstdin=Nonestdout=Nonestderr=Nonelimit=None**kwds)

    返回一个 Process 实例。

    参数说明:

    • program:子进程要执行的程序
    • args 必须是个由下列形式的字符串组成的列表:
      • str;
      • 或者由 文件系统编码格式 编码的 bytes。
    • limit:对包裹Process.stdout and Process.stderr 的流的buffer限制
    • stdin 可以是以下对象之一:
      • 一个文件类对象,表示要使用 connect_write_pipe() 连接到子进程的标准输入流的管道
      • subprocess.PIPE 常量(默认),将创建并连接一个新的管道。
      • None 值,这将使得子进程继承来自此进程的文件描述符
      • subprocess.DEVNULL 常量,这表示将使用特殊的 os.devnull 文件
    • stdout 可以是以下对象之一:
      • 一个文件类对象,表示要使用 connect_write_pipe() 连接到子进程的标准输出流的管道
      • subprocess.PIPE 常量(默认),将创建并连接一个新的管道。
      • None 值,这将使得子进程继承来自此进程的文件描述符
      • subprocess.DEVNULL 常量,这表示将使用特殊的 os.devnull 文件
    • stderr 可以是以下对象之一:
      • 一个文件类对象,表示要使用 connect_write_pipe() 连接到子进程的标准错误流的管道
      • subprocess.PIPE 常量(默认),将创建并连接一个新的管道。
      • None 值,这将使得子进程继承来自此进程的文件描述符
      • subprocess.DEVNULL 常量,这表示将使用特殊的 os.devnull 文件
      • subprocess.STDOUT 常量,将把标准错误流连接到进程的标准输出流

    coroutine asyncio.create_subprocess_shell(cmdstdin=Nonestdout=Nonestderr=Nonelimit=None**kwds)

    运行 cmd shell 命令。

    子进程对象

    class asyncio.subprocess.Process

    一个用于包装 create_subprocess_exec() and create_subprocess_shell() 函数创建的 OS 进程的对象。

    主要方法和属性:

    方法和属性名说明
    coroutine wait()等待子进程终结。
    coroutine communicate(input=None)

    与进程交互:
    发送数据到 stdin (如果 input 不为 None);
    从 stdout 和 stderr 读取数据,直至到达 EOF;
    等待进程终结。
    可选的 input 参数为将被发送到子进程的数据 (bytes 对象)。
    返回一个元组 (stdout_data, stderr_data)。
    如果在将 input 写入到 stdin 时引发了 BrokenPipeError 或 ConnectionResetError 异常,异常会被忽略。 此条件会在进程先于所有数据被写入到 stdin 之前退出时发生。
    如果想要将数据发送到进程的 stdin,则创建进程时必须使用 stdin=PIPE。 类似地,要在结果元组中获得任何不为 None 的值,则创建进程时必须使用 stdout=PIPE 和/或 stderr=PIPE 参数。
    注意,数据读取在内存中是带缓冲的,因此如果数据量过大或不受则不要使用此方法。

    使用 communicate() 方法而非 process.stdin.write(), await process.stdout.read() 或 await process.stderr.read()。 这可以避免由于流暂停读取或写入并阻塞子进程而导致的死锁。

    send_signal(signal)将信号 signal 发送给子进程。
    terminate()停止子进程。
    kill()杀掉子进程。
    在 POSIX 系统中此方法会发送 SIGKILL 给子进程。
    在 Windows 上此方法是 terminate() 的别名。
    stdin标准输入流 (StreamWriter) 或者如果进程创建时设置了 stdin=None 则为 None
    stdout标准输出流 (StreamReader) 或者如果进程创建时设置了 stdout=None 则为 None
    stderr标准错误流 (StreamReader) 或者如果进程创建时设置了 stderr=None 则为 None
    pid进程标识号(PID)。这个属性将是所生成的 shell 的 PID。
    returncode当进程退出时返回其代号。
    None 值表示进程尚未终止。
    一个负值 -N 表示子进程被信号 N 中断 (仅 POSIX).
    1. import asyncio
    2. import sys
    3. async def get_date():
    4. code = 'import datetime; print(datetime.datetime.now())'
    5. # Create the subprocess; redirect the standard output
    6. # into a pipe.
    7. proc = await asyncio.create_subprocess_exec(
    8. sys.executable, '-c', code,
    9. stdout=asyncio.subprocess.PIPE)
    10. # Read one line of output.
    11. data = await proc.stdout.readline()
    12. line = data.decode('ascii').rstrip()
    13. # Wait for the subprocess exit.
    14. await proc.wait()
    15. return line
    16. date = asyncio.run(get_date())
    17. print(f"Current date: {date}")

    常用的常量

    subprocess.DEVNULL
    可被 Popen 的 stdin, stdout 或者 stderr 参数使用的特殊值, 表示使用特殊文件 os.devnull.

    subprocess.PIPE
    可被 Popen 的 stdin, stdout 或者 stderr 参数使用的特殊值, 表示打开标准流的管道. 常用于 Popen.communicate().

    subprocess.STDOUT
    可被 Popen 的 stderr 参数使用的特殊值, 表示标准错误与标准输出使用同一句柄。

    异步队列

    asyncio 队列被设计成与 queue 模块类似。尽管 asyncio队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
    注意asyncio 的队列没有 timeout 形参;请使用 asyncio.wait_for() 函数为队列添加超时操作。

    异步FIFO队列

    class asyncio.Queue(maxsize=0)

    如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于 0 的整数,则当队列达到 maxsize 时, await put() 将阻塞至某个元素被 get() 取出。
    不像标准库中的并发型 queue ,队列的尺寸一直是已知的,可以通过调用 qsize() 方法返回。

    这个类不是线程安全的

    主要的方法和属性:

    方法和属性名说明
    maxsize队列中可存放的元素数量。
    full()如果有 maxsize 个条目在队列中,则返回 True 。
    如果队列用 maxsize=0 (默认)初始化,则 full() 永远不会返回 True 。
    coroutine put(item)添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。
    put_nowait(item)不阻塞的放一个元素入队列。
    如果没有立即可用的空闲槽,引发 QueueFull 异常。
    qsize()返回队列用的元素数量。
    empty()如果队列为空返回 True ,否则返回 False 。
    coroutine get()从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。
    get_nowait()立即返回一个队列中的元素,如果队列内没有值,将引发异常 QueueEmpty 。
    task_done()表明前面排队的任务已经完成,即get出来的元素相关操作已经完成。
    由队列使用者控制。每个 get() 用于获取一个任务,任务最后调用 task_done() 告诉队列,这个任务已经完成。
    如果 join() 当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个 put() 进队列的条目的 task_done() 都被收到)。
    如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 。
    coroutine join()阻塞至队列中所有的元素都被接收和处理完毕。
    当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。

    异步优先级队列

    class asyncio.PriorityQueue

    异步FIFO队列的变体,按优先级顺序取出条目 (最小的先取出)。条目通常是 (priority_number, data) 形式的元组。

    后进先出队列

    class asyncio.LifoQueue

    异步FIFO队列的变体,先取出最近添加的条目(后进,先出)。

    异常

    exception asyncio.QueueEmpty
    当队列为空的时候,调用 get_nowait() 方法而引发这个异常。

    exception asyncio.QueueFull
    当队列中条目数量已经达到它的 maxsize 的时候,调用 put_nowait() 方法而引发的异常。

    1. import asyncio
    2. import random
    3. import time
    4. async def worker(name, queue):
    5. while True:
    6. # Get a "work item" out of the queue.
    7. sleep_for = await queue.get()
    8. # Sleep for the "sleep_for" seconds.
    9. await asyncio.sleep(sleep_for)
    10. # Notify the queue that the "work item" has been processed.
    11. queue.task_done()
    12. print(f'{name} has slept for {sleep_for:.2f} seconds')
    13. async def main():
    14. # Create a queue that we will use to store our "workload".
    15. queue = asyncio.Queue()
    16. # Generate random timings and put them into the queue.
    17. total_sleep_time = 0
    18. for _ in range(20):
    19. sleep_for = random.uniform(0.05, 1.0)
    20. total_sleep_time += sleep_for
    21. queue.put_nowait(sleep_for)
    22. # Create three worker tasks to process the queue concurrently.
    23. tasks = []
    24. for i in range(3):
    25. task = asyncio.create_task(worker(f'worker-{i}', queue))
    26. tasks.append(task)
    27. # Wait until the queue is fully processed.
    28. started_at = time.monotonic()
    29. await queue.join()
    30. total_slept_for = time.monotonic() - started_at
    31. # Cancel our worker tasks.
    32. for task in tasks:
    33. task.cancel()
    34. # Wait until all worker tasks are cancelled.
    35. await asyncio.gather(*tasks, return_exceptions=True)
    36. print('====')
    37. print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    38. print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
    39. asyncio.run(main())

    低层网络函数和对象

    如果能使用上面学习的函数和对象,尽可能去使用,本节所要学习的流相关的函数和对象都是与事件循环相关的,都是低层的函数和对象,需要较深入的了解asynicio和网络相关的知识。

    协议

    asyncio 提供了一组抽象基类,它们应当被用于实现网络协议。 这些类被设计为与 传输 配合使用。
    抽象基础协议类的子类可以实现其中的部分或全部方法。 所有这些方法都是回调:它们由传输或特定事件调用,例如当数据被接收的时候。 基础协议方法应当由相应的传输来调用。

    基础协议

    class asyncio.BaseProtocol

    带有所有协议的共享方法的基础协议。

    所有 asyncio 协议均可实现基础协议回调。

    连接回调

    连接回调会在所有协议上被调用,每个成功的连接将恰好调用一次。 所有其他协议回调只能在以下两个方法之间被调用。

    BaseProtocol.connection_made(transport)

    连接建立时被调用。

    transport 参数是代表连接的传输。 此协议负责将引用保存至对应的传输。

    BaseProtocol.connection_lost(exc)

    连接丢失或关闭时将被调用。

    方法的参数是一个异常对象或为 None。 后者意味着收到了常规的 EOF,或者连接被连接的一端取消或关闭。

    流程控制回调

    流程控制回调可由传输来调用以暂停或恢复协议所执行的写入操作。

    BaseProtocol.pause_writing()

    当传输的缓冲区升至高水位以上时将被调用。

    BaseProtocol.resume_writing()

    当传输的缓冲区降到低水位以下时将被调用。

    如果缓冲区大小等于高水位值,则 pause_writing() 不会被调用:缓冲区大小必须要高于该值。
    相反地,resume_writing() 会在缓冲区大小等于或小于低水位值时被调用。 这些结束条件对于当两个水位取零值时也能确保符合预期的行为是很重要的。

    可以通过WriteTransport.set_write_buffer_limits()设置缓冲区的high 和 low 高低标记位。

    流式协议

    class asyncio.Protocol(BaseProtocol)

    用于实现流式协议(TCP, Unix 套接字等等)的基类。

    事件方法,例如 loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() 和 loop.connect_write_pipe() 都接受返回流式协议的工厂。

    数据接收回调

    Protocol.data_received(data)
    当收到数据时被调用。 data 为包含入站数据的非空字节串对象。
    数据是否会被缓冲、分块或重组取决于具体传输。 通常,你不应依赖于特定的语义而应使你的解析具有通用性和灵活性。 但是,数据总是要以正确的顺序被接收。
    此方法在连接打开期间可以被调用任意次数。

    Protocol.eof_received()
    当发出信号的另一端不再继续发送数据时(例如通过调用 transport.write_eof(),如果另一端也使用 asyncio 的话)被调用。
    此方法可能返回假值 (包括 None),在此情况下传输将会自行关闭。 相反地,如果此方法返回真值,将以所用的协议来确定是否要关闭传输。 由于默认实现是返回 None,因此它会隐式地关闭连接。
    某些传输,包括 SSL 在内,并不支持半关闭的连接,在此情况下从该方法返回真值将导致连接被关闭。

    eof_received()最多只能调用一次,一旦调用后,data_received()就不能再继续调用了。

    状态机:

    1. start -> connection_made
    2. [-> data_received]*
    3. [-> eof_received]?
    4. -> connection_lost -> end

    缓冲流协议

    class asyncio.BufferedProtocol(BaseProtocol)

    用于实现可对接收缓冲区进行手动控制的流式协议的基类。

    带缓冲的协议可与任何支持 流式协议 的事件循环方法配合使用。

    BufferedProtocol 实现允许显式手动分配和控制接收缓冲区。 随后事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。 这对于接收大量数据的协议来说会有明显的性能提升。 复杂的协议实现能显著地减少缓冲区分配的数量。

    BufferedProtocol的方法:

    BufferedProtocol.get_buffer(sizehint)
    调用后会分配新的接收缓冲区。
    sizehint 是推荐的返回缓冲区最小尺寸。 返回小于或大于 sizehint 推荐尺寸的缓冲区也是可接受的。 当设为 -1 时,缓冲区尺寸可以是任意的。 返回尺寸为零的缓冲区则是错误的。
    get_buffer() 必须返回一个实现了 缓冲区协议 的对象。

    BufferedProtocol.buffer_updated(nbytes)
    用接收的数据更新缓冲区时被调用。
    nbytes 是被写入到缓冲区的字节总数。

    BufferedProtocol.eof_received()
    请查看 protocol.eof_received() 方法的文档。

    在连接期间 get_buffer() 可以被调用任意次数。 但是,protocol.eof_received() 最多只能被调用一次,如果被调用,则在此之后 get_buffer() 和 buffer_updated() 不能再被调用。

    状态机:

    1. start -> connection_made
    2. [-> get_buffer
    3. [-> buffer_updated]?
    4. ]*
    5. [-> eof_received]?
    6. -> connection_lost -> end

    数据报协议

    class asyncio.DatagramProtocol(BaseProtocol)

    用于实现数据报(UDP)协议的基类。

    数据报协议实例应当由传递给 loop.create_datagram_endpoint() 方法的协议工厂来构造。

    DatagramProtocol.datagram_received(data, addr)
    当接收到数据报时被调用。 data 是包含传入数据的字节串对象。 addr 是发送数据的对等端地址;实际的格式取决于具体传输。

    DatagramProtocol.error_received(exc)
    当前一个发送或接收操作引发 OSError 时被调用。 exc 是 OSError 的实例。
    此方法会在当传输(例如UDP)检测到无法将数据报传给接收方等极少数情况下被调用。 而在大多数情况下,无法送达的数据报将被静默地丢弃。

    在 BSD 系统(macOS, FreeBSD 等等)上,数据报协议不支持流控制,因为没有可靠的方式来检测因写入多过包所导致的发送失败。
    套接字总是显示为 'ready' 且多余的包会被丢弃。 有一定的可能性会引发 OSError 并设置 errno 为 errno.ENOBUFS;如果此异常被引发,它将被报告给 DatagramProtocol.error_received(),在其他情况下则会被忽略。

    子进程协议

    class asyncio.SubprocessProtocol(BaseProtocol)

    用于实现与子进程通信(单向管道)的协议的基类。

    子进程协议实例应当由传递给 loop.subprocess_exec() 和 loop.subprocess_shell() 方法的协议工厂函数来构造。

    SubprocessProtocol.pipe_data_received(fddata)

    当子进程向其 stdout 或 stderr 管道写入数据时被调用。

    • fd 是以整数表示的管道文件描述符。
    • data 是包含已接收数据的非空字节串对象。

    SubprocessProtocol.pipe_connection_lost(fdexc)

    与子进程通信的其中一个管道关闭时被调用。

    • fd 以整数表示的已关闭文件描述符。

    SubprocessProtocol.process_exited()

    子进程退出时被调用。

    只能在pipe_data_received() and pipe_connection_lost() 函数之前调用。

    传输

     传输属于 asyncio 模块中的类,用来抽象各种通信通道。
    传输对象总是由 异步IO事件循环 实例化。
    异步IO实现TCP、UDP、SSL和子进程管道的传输。传输上可用的方法由传输的类型决定。
    传输类属于 线程不安全 。

    基础传输

    class asyncio.BaseTransport

    所有传输的基类。包含所有异步IO传输共用的方法。

    主要方法:

    方法和属性名说明
    BaseTransport.is_closing()返回 True ,如果传输正在关闭或已经关闭。
    BaseTransport.close()关闭传输。如果传输有写入缓冲,会将缓冲的数据异步写入,但不再接收新的数据,所有缓冲数据写入后,会调用protocol.connection_lost(),参数为None。传输关闭后就不能再使用。
    BaseTransport.set_protocol(protocol)

    设置一个新协议。

    只有两种协议都写明支持切换才能完成切换协议。

    BaseTransport.get_protocol()返回当前协议。
    BaseTransport.get_extra_info(namedefault=None)返回 传输或它使用的相关资源信息。

    name 是表示要获取传输特定信息的字符串。

    default 是在信息不可用或传输不支持第三方事件循环实现或当前平台查询时返回的值。

    BaseTransport.get_extra_info(namedefault=None)

    传输可查询信息类别:

    • 套接字:
      • 'peername': 套接字链接时的远端地址,socket.socket.getpeername() 方法的结果 (出错时为 None )
      • 'socket': socket.socket 实例
      • 'sockname': 套接字本地地址, socket.socket.getsockname() 方法的结果
    • SSL套接字
      • 'compression': 用字符串指定压缩算法,或者链接没有压缩时为 None ;ssl.SSLSocket.compression() 的结果。
      • 'cipher': 一个三值的元组,包含使用密码的名称,定义使用的SSL协议的版本,使用的加密位数。 ssl.SSLSocket.cipher() 的结果。
      • 'peercert': 远端凭证; ssl.SSLSocket.getpeercert() 结果。
      • 'sslcontext': ssl.SSLContext 实例
      • 'ssl_object': ssl.SSLObject 或 ssl.SSLSocket 实例
    • 管道:
      • 'pipe': 管道对象
      • 子进程:
      • 'subprocess': subprocess.Popen 实例
    1. sock = transport.get_extra_info('socket')
    2. if sock is not None:
    3. print(sock.getsockopt(...))

    只读传输

    class asyncio.ReadTransport(BaseTransport)
    只读链接的基础传输。

    ReadTransport 类的实例由 loop.connect_read_pipe() 事件循环方法返回,也被子进程相关的方法如 loop.subprocess_exec() 使用。

    主要方法和属性:

    方法和属性名说明
    ReadTransport.is_reading()如果传输接收到新数据时返回 True 。
    ReadTransport.pause_reading()暂停传输的接收端。protocol.data_received() 方法将不会收到数据,除非 resume_reading()  被调用。
    ReadTransport.resume_reading()恢复接收端。如果有数据可读取时,协议方法 protocol.data_received() 将再次被调用。

    只写传输

    class asyncio.WriteTransport(BaseTransport)
    只写链接的基础传输。

    WriteTransport 类的实例由 loop.connect_write_pipe() 事件循环方法返回,也被子进程相关的方法如 loop.subprocess_exec() 使用。

    主要方法和属性:

    方法和属性名说明
    WriteTransport.abort()立即关闭传输,不会等待已提交的操作处理完毕。已缓存的数据将会丢失。不会接收更多数据。 最终 None 将作为协议的 protocol.connection_lost() 方法的参数被调用。
    WriteTransport.can_write_eof()​​​​​​​如果传输支持 write_eof() 返回 True 否则返回 False 。
    WriteTransport.write_eof()在刷新所有已缓冲数据之后关闭传输的写入端。 数据仍可以被接收。如果传输(例如 SSL)不支持半关闭的连接,此方法会引发 NotImplementedError。
    WriteTransport.get_write_buffer_size()返回传输使用输出缓冲区的当前大小。
    WriteTransport.get_write_buffer_limits()获取写入流控制 high 和 low 高低标记位。返回元组 (low, high) , low 和 high 为正字节数。
    WriteTransport.set_write_buffer_limits(high=Nonelow=None)

    设置写入流控制 high 和 low 高低标记位。

    这两个值(以字节数表示)控制何时调用协议的 protocol.pause_writing() 和 protocol.resume_writing() 方法。 如果指明,则低水位必须小于或等于高水位。 high 和 low 都不能为负值。
    pause_writing() 会在缓冲区尺寸大于或等于 high 值时被调用。 如果写入已经被暂停,resume_writing() 会在缓冲区尺寸小于或等于 low 值时被调用。
    默认值是实现专属的。 如果只给出了高水位值,则低水位值默认为一个小于或等于高水位值的实现传属值。 将 high 设为零会强制将 low 也设为零,并使得 pause_writing() 在缓冲区变为非空的任何时刻被调用。 将 low 设为零会使得 resume_writing() 在缓冲区为空时只被调用一次。 对于上下限都使用零值通常是不够优化的,因为它减少了并发执行 I/O 和计算的机会。

    WriteTransport.write(data)将一些 data 字节串写入传输。此方法不会阻塞;它会缓冲数据并安排其被异步地发出。
    WriteTransport.writelines(list_of_data)将数据字节串的列表(或任意可迭代对象)写入传输。 这在功能上等价于在可迭代对象产生的每个元素上调用 write(),但其实现可能更为高效。

    双向传输

    class asyncio.Transport(WriteTransport, ReadTransport)
    接口代表一个双向传输,如TCP链接。
    用户不用直接实例化传输;调用一个功能函数,给它传递协议工厂和其它需要的信息就可以创建传输和协议。
    传输 类实例由如 loop.create_connection() 、 loop.create_unix_connection() 、 loop.create_server() 、 loop.sendfile() 等这类事件循环方法使用或返回。

    数据报传输

    class asyncio.DatagramTransport(BaseTransport)
    数据报(UDP)传输链接。
    DatagramTransport 类实例由事件循环方法 loop.create_datagram_endpoint() 返回。

    主要方法和属性:

    方法和属性名说明
    DatagramTransport.sendto(dataaddr=None)将 data 字节串发送到 addr (基于传输的目标地址) 所给定的远端对等方。 如果 addr 为 None,则将数据发送到传输创建时给定的目标地址。
    此方法不会阻塞;它会缓冲数据并安排其被异步地发出。
    DatagramTransport.abort()立即关闭传输,不会等待已提交的操作执行完毕。 已缓存的数据将会丢失。 不会接收更多的数据。 协议的 protocol.connection_lost() 方法最终将附带 None 作为参数被调用。

    子进程传输

    表示父进程和子进程之间连接的抽象。
    SubprocessTransport 类的实例由事件循环方法 loop.subprocess_shell() 和 loop.subprocess_exec() 返回。

    主要方法和属性:

    方法和属性名说明
    SubprocessTransport.get_pid()将子进程的进程 ID 以整数形式返回。
    SubprocessTransport.get_pipe_transport(fd)返回对应于整数文件描述符 fd 的通信管道的传输:
    0: 标准输入 (stdin) 的可读流式传输,如果子进程创建时未设置 stdin=PIPE 则为 None
    1: 标准输出 (stdout) 的可写流式传输,如果子进程创建时未设置 stdout=PIPE 则为 None
    2: 标准错误 (stderr) 的可写流式传输,如果子进程创建时未设置 stderr=PIPE 则为 None
    其他 fd: None
    SubprocessTransport.get_returncode()返回整数形式的进程返回码,或者如果还未返回则为 None,这类似于 subprocess.Popen.returncode 属性。
    SubprocessTransport.kill()杀死子进程。
    SubprocessTransport.send_signal(signal)发送 signal 编号到子进程
    SubprocessTransport.terminate()停止子进程。在 POSIX 系统中,此方法会发送 SIGTERM 到子进程。 在 Windows 中,则会调用 Windows API 函数 TerminateProcess() 来停止子进程。
    SubprocessTransport.close()通过调用 kill() 方法来杀死子进程。如果子进程尚未返回,并关闭 stdinstdout 和 stderr 管道的传输。

    打开网络连接

    loop.create_connection()

    coroutine loop.create_connection(protocol_factoryhost=Noneport=None*ssl=Nonefamily=0proto=0flags=0sock=Nonelocal_addr=Noneserver_hostname=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonehappy_eyeballs_delay=Noneinterleave=None)

    打开一个流式传输连接,连接到由 host 和 port 指定的地址。参数的含义与asyncio.open_connection()相同。

    这个方法会尝试在后台创建连接。当创建成功,返回 (transport, protocol) 组合。
    底层操作的大致的执行顺序是这样的:

    • 创建连接并为其创建一个 传输。
    • protocol_factory 必须是实现了流式协议的对象
    • 协议实例通过调用其 connection_made() 方法与传输进行配对。
    • 成功时返回一个 (transport, protocol) 元组。
    • 创建的传输是一个具体实现相关的双向流。

    loop.create_datagram_endpoint()

    coroutine loop.create_datagram_endpoint(protocol_factorylocal_addr=Noneremote_addr=None*family=0proto=0flags=0reuse_port=Noneallow_broadcast=Nonesock=None)

    创建一个数据报连接。

    参数说明:

    • 参数名与 loop.create_connection()一样含义也基本相同。
    • family:协议簇,支持 AF_INET, AF_INET6, or AF_UNIX,获得的socket的类型为SOCK_DGRAM
    • protocol_factory 必须是实现了数据报协议的对象。
    • local_addr:本地地址。如果给出 local_addr,它应当是一个用来在本地绑定套接字的 (local_host, local_port) 元组。 local_host 和 local_port 是使用 getaddrinfo() 来查找的。
    • remote_addr:如果指定,是一个 (remote_host, remote_port) 元组,用于同一个远程地址连接。remote_host 和 remote_port 是使用 getaddrinfo() 来查找的。
    • proto, flags 是可选的协议和标志,其会被传递给 getaddrinfo() 来完成 host 的解析。如果要指定的话,这些都应该是来自于 socket 模块的对应常量。
    • reuse_port:允许重复绑定端口,如果不允许,内核释放端口通常需要一定的时间,可能再次重启服务时会提示端口被占用,不能绑定。
    • allow_broadcast 告知内核允许此端点向广播地址发送消息。​​​​​​​
    • sock 可选择通过指定此值用于使用一个预先存在的,已经处于连接状态的 socket.socket 对象,并将其提供给此传输对象使用。如果指定了这个值, local_addr 和 remote_addr 就应该被忽略 (必须为 None)。

    UDP 回显客户端的例子:

    1. import asyncio
    2. class EchoClientProtocol:
    3. def __init__(self, message, on_con_lost):
    4. self.message = message
    5. self.on_con_lost = on_con_lost
    6. self.transport = None
    7. def connection_made(self, transport):
    8. self.transport = transport
    9. print('Send:', self.message)
    10. self.transport.sendto(self.message.encode())
    11. def datagram_received(self, data, addr):
    12. print("Received:", data.decode())
    13. print("Close the socket")
    14. self.transport.close()
    15. def error_received(self, exc):
    16. print('Error received:', exc)
    17. def connection_lost(self, exc):
    18. print("Connection closed")
    19. self.on_con_lost.set_result(True)
    20. async def main():
    21. # Get a reference to the event loop as we plan to use
    22. # low-level APIs.
    23. loop = asyncio.get_running_loop()
    24. on_con_lost = loop.create_future()
    25. message = "Hello World!"
    26. transport, protocol = await loop.create_datagram_endpoint(
    27. lambda: EchoClientProtocol(message, on_con_lost),
    28. remote_addr=('127.0.0.1', 9999))
    29. try:
    30. await on_con_lost
    31. finally:
    32. transport.close()
    33. asyncio.run(main())

    loop.create_unix_connection()

    coroutine loop.create_unix_connection(protocol_factorypath=None*ssl=Nonesock=Noneserver_hostname=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=None)

    创建Unix 域连接

    协议簇必须是AF_UNIX,socket的类型是SOCK_STREAM

    成功时返回一个 (transport, protocol) 元组。

    path 是所要求的 Unix 域套接字的名字,除非指定了 sock 形参。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。

    创建网络服务

    loop.create_server()

    coroutine loop.create_server(protocol_factoryhost=Noneport=None*family=socket.AF_UNSPECflags=socket.AI_PASSIVEsock=Nonebacklog=100ssl=Nonereuse_address=Nonereuse_port=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonestart_serving=True)

    创建一个基于TCP的网络服务(socket类型为SOCK_STREAM),返回一个 Server 对象。

    参数:

    • protocol_factory :实现了流式协议的对象。
    • host:待连接的服务器地址。host 形参可被设为几种类型,它确定了服务器所应监听的位置:
      • 如果 host 是一个字符串,则 TCP 服务器会被绑定到 host 所指明的单一网络接口。
      • 如果 host 是一个字符串序列,则 TCP 服务器会被绑定到序列所指明的所有网络接口。
      • 如果 host 是一个空字符串或 None,则会应用所有接口并将返回包含多个套接字的列表(通常是一个 IPv4 的加一个 IPv6 的)。
    • port:监听端口。如果为 0 或者 None (默认),将选择一个随机的未使用的端口(注意,如果 host 解析到多个网络接口,将为每个接口选择一个不同的随机端口)。
    • family:协议簇,一般为AF_INET or AF_INET6 
    • flags:标志,它们会被传递给 getaddrinfo() 来对 host 进行解析。如果要指定的话,这些都应该是来自于 socket 模块的对应常量。
    • sock:预先存在的套接字对象。 如果指定了此参数,则不可再指定 host 和 port
    • backlog :传递给 listen() 的最大排队连接的数量(默认为100)。
    • ssl: 可被设置为一个 SSLContext 实例以在所接受的连接上启用 TLS。
    • reuse_address 告知内核要重用一个处于 TIME_WAIT 状态的本地套接字,而不是等待其自然超时失效。 如果未指定此参数则在 Unix 上将自动设置为 True
    • reuse_port 告知内核,只要在创建的时候都设置了这个标志,就允许此端点绑定到其它端点列表所绑定的同样的端口上。这个选项在 Windows 上是不支持的。
    • ssl_handshake_timeout 是(用于 TLS 服务器的)在放弃连接之前要等待 TLS 握手完成的秒数。 如果参数为 (默认值) None 则为 60.0 秒。
    • ssl_shutdown_timeout:在SSL关闭之前等待连接完成处理的秒数。
    • start_serving 设置成 True (默认值) 会导致创建server并立即开始接受连接。设置成 False ,用户需要等待 Server.start_serving() 或者 Server.serve_forever() 以使server开始接受连接。

    loop.create_unix_server()

    coroutine loop.create_unix_server(protocol_factorypath=None*sock=Nonebacklog=100ssl=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=Nonestart_serving=True)

    与create_server()非常相似,但是协议簇为AF_UNIX。

    参数:

    • protocol_factory :实现了流式协议的对象。
    • path 是必要的 Unix 域套接字名称,除非提供了 sock 参数。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。
    • 其他参数与create_server()相同的含义上也相同。

    loop.connect_accepted_socket()

    coroutine loop.connect_accepted_socket(protocol_factorysock*ssl=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=None)

    将已被接受的连接包装成一个传输/协议对。

    此方法可被服务器用来接受 asyncio 以外的连接,但是使用 asyncio 来处理它们。

    参数:

    • protocol_factory :实现了流式协议的对象。
    • sock 是一个预先存在的套接字对象,它是由 socket.accept 返回的。
    • ssl 可被设置为一个 SSLContext 以在接受的连接上启用 SSL。
    • ssl_handshake_timeout 是(为一个SSL连接)在中止连接前,等待SSL握手完成的时间【单位秒】。如果为 None (缺省) 则是 60.0 秒。
    • ssl_shutdown_timeout:在SSL关闭之前等待连接完成处理的秒数。

    返回一个 (transport, protocol) 对。

    传输文件

    coroutine loop.sendfile(transportfileoffset=0count=None*fallback=True)

    将 file 通过 transport 发送。 返回所发送的字节总数。

    如果可用的话,该方法将使用高性能的 os.sendfile()。
    file 必须是个二进制模式打开的常规文件对象。
    offset 指明从何处开始读取文件。 如果指定了 count,它是要传输的字节总数而不再一直发送文件直至抵达 EOF。 文件位置总是会被更新,即使此方法引发了错误,并可以使用 file.tell() 来获取实际发送的字节总数。
    fallback 设为 True 会使得 asyncio 在平台不支持 sendfile 系统调用时手动读取并发送文件(例如 Windows 或 Unix 上的 SSL 套接字)。
    如果系统不支持 sendfile 系统调用且 fallback 为 False 则会引发 SendfileNotAvailableError。

    TLS升级

    coroutine loop.start_tls(transportprotocolsslcontext*server_side=Falseserver_hostname=Nonessl_handshake_timeout=Nonessl_shutdown_timeout=None)

    将现有基于传输的连接升级到 TLS。

    参数:

    • transport 和 protocol 实例的方法与 create_server() 和 create_connection() 所返回的类似。
    • sslcontext :一个已经配置好的 SSLContext 实例。
    • 当服务端连接已升级时 (如 create_server() 所创建的对象) server_side 会传入 True。
    • server_hostname :设置或者覆盖目标服务器证书中相对应的主机名。
    • ssl_handshake_timeout 是(用于 TLS 连接的)在放弃连接之前要等待 TLS 握手完成的秒数。 如果参数为 None 则使用 (默认的) 60.0。
    • ssl_shutdown_timeout:在SSL关闭之前等待连接完成处理的秒数。

    监控文件描述符

    loop.add_reader(fd, callback, *args)
    开始监视 fd 文件描述符以获取读取的可用性,一旦 fd 可用于读取,使用指定的参数调用 callback 。

    loop.remove_reader(fd)
    移除对fd的读监控

    loop.add_writer(fd, callback, *args)
    开始监视 fd 文件描述符的写入可用性,一旦 fd 可用于写入,使用指定的参数调用 callback 。

    loop.remove_writer(fd)
    移除对fd的写监控

    直接使用socket对象

    通常,使用基于传输的 API 的协议实现,例如 loop.create_connection() 和 loop.create_server() 比直接使用套接字的实现更快。 但是,在某些应用场景下性能并不非常重要,直接使用 socket 对象会更方便。

    coroutine loop.sock_recv(sock, nbytes)
    从 sock 接收至多 nbytes。 socket.recv() 的异步版本。返回接收到的数据【bytes对象类型】。sock 必须是个非阻塞socket。

    coroutine loop.sock_recv_into(sock, buf)
    从 sock 接收数据放入 buf 缓冲区。 模仿了阻塞型的 socket.recv_into() 方法。返回写入缓冲区的字节数。sock 必须是个非阻塞socket。

    coroutine loop.sock_recvfrom(sock, bufsize)

    socket.sock_recvfrom()的异步版本。sock 必须是个非阻塞socket。

    coroutine loop.sock_recvfrom_into(sockbufnbytes=0)

    socket.sock_recvfrom_into()的异步版本,sock 必须是个非阻塞socket。

    coroutine loop.sock_sendall(sockdata)

    将 data 发送到 sock 套接字。 socket.sendall() 的异步版本。
    此方法会持续发送数据到套接字直至 data 中的所有数据发送完毕或是有错误发生。 当成功时会返回 None。 当发生错误时,会引发一个异常。 此外,没有办法能确定有多少数据或是否有数据被连接的接收方成功处理。
    sock 必须是个非阻塞socket。

    coroutine loop.sock_sendto(sock, data, address)
    socket.sendto()的异步版本sock 必须是个非阻塞socket。

    coroutine loop.sock_connect(sock, address)
    将 sock 连接到位于 address 的远程套接字。socket.connect() 的异步版本。sock 必须是个非阻塞socket。

    coroutine loop.sock_accept(sock)
    接受一个连接。 模仿了阻塞型的 socket.accept() 方法。
    此 scoket 必须绑定到一个地址上并且监听连接。返回值是一个 (conn, address) 对,其中 conn 是一个 新*的套接字对象,用于在此连接上收发数据,*address 是连接的另一端的套接字所绑定的地址。
    sock 必须是个非阻塞socket。

    coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)
    在可能的情况下使用高性能的 os.sendfile 发送文件。 返回所发送的字节总数。
    socket.sendfile() 的异步版本。
    sock 必须为非阻塞型的 socket.SOCK_STREAM socket。
    file 必须是个用二进制方式打开的常规文件对象。
    offset 指明从何处开始读取文件。 如果指定了 count,它是要传输的字节总数而不再一直发送文件直至抵达 EOF。 文件位置总是会被更新,即使此方法引发了错误,并可以使用 file.tell() 来获取实际发送的字节总数。
    当 fallback 被设为 True 时,会使用 asyncio 在平台不支持 sendfile 系统调用时手动读取并发送文件(例如 Windows 或 Unix 上的 SSL 套接字)。
    如果系统不支持 sendfile 并且 fallback 为 False ,引发 SendfileNotAvailableError 异常。
    sock 必须是个非阻塞socket。

    DNS

    coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)
    异步版的 socket.getaddrinfo() 。

    coroutine loop.getnameinfo(sockaddr, flags=0)
    异步版的 socket.getnameinfo() 。

    异步管道

    coroutine loop.connect_read_pipe(protocol_factory, pipe)
    在事件循环中注册 pipe 的读取端。

    参数:

    • protocol_factory 必须为一个返回 asyncio 协议 实现的可调用对象。
    • pipe 是个 类似文件型的管道对象.

    返回一对 (transport, protocol),其中 transport 支持 ReadTransport 接口而 protocol 是由 protocol_factory 所实例化的对象。
    使用 SelectorEventLoop 事件循环, pipe 被设置为非阻塞模式。

    coroutine loop.connect_write_pipe(protocol_factory, pipe)
    在事件循环中注册 pipe 的写入端。

    参数说明:

    • protocol_factory 必须为一个返回 asyncio 协议 实现的可调用对象。
    • pipe 是个 类似文件型的管道对象.

    返回一对 (transport, protocol),其中 transport 支持 WriteTransport 接口而 protocol 是由 protocol_factory 所实例化的对象。
    使用 SelectorEventLoop 事件循环, pipe 被设置为非阻塞模式。

    备注 在 Windows 中 SelectorEventLoop 不支持上述方法。 对于 Windows 请改用 ProactorEventLoop。

    UNIX信号

    loop.add_signal_handler(signum, callback, *args)
    设置 callback 作为 signum 信号的处理程序。
    此回调将与该事件循环中其他加入队列的回调和可运行协程一起由 loop 发起调用。 不同与使用 signal.signal() 注册的信号处理程序,使用此函数注册的回调可以与事件循环进行交互。
    如果信号数字非法或者不可捕获,就抛出一个 ValueError 。如果建立处理器的过程中出现问题,会抛出一个 RuntimeError 。
    和 signal.signal() 一样,这个函数只能在主线程中调用。

    loop.remove_signal_handler(sig)
    移除 sig 信号的处理程序。
    如果信号处理程序被移除则返回 True,否则如果给定信号未设置处理程序则返回 False。

    线程池和进程池

    awaitable loop.run_in_executor(executorfunc*args)

    安排在指定的执行器中调用 func 。

    参数说明:

    • executor:指定的执行器executor将取代默认的执行器,要求必须是class concurrent.futures.Executor或其子类。
    • func:待执行的函数
    • args:函数的参数
    1. import asyncio
    2. import concurrent.futures
    3. def blocking_io():
    4. # File operations (such as logging) can block the
    5. # event loop: run them in a thread pool.
    6. with open('/dev/urandom', 'rb') as f:
    7. return f.read(100)
    8. def cpu_bound():
    9. # CPU-bound operations will block the event loop:
    10. # in general it is preferable to run them in a
    11. # process pool.
    12. return sum(i * i for i in range(10 ** 7))
    13. async def main():
    14. loop = asyncio.get_running_loop()
    15. ## Options:
    16. # 1. Run in the default loop's executor:
    17. result = await loop.run_in_executor(
    18. None, blocking_io)
    19. print('default thread pool', result)
    20. # 2. Run in a custom thread pool:
    21. with concurrent.futures.ThreadPoolExecutor() as pool:
    22. result = await loop.run_in_executor(
    23. pool, blocking_io)
    24. print('custom thread pool', result)
    25. # 3. Run in a custom process pool:
    26. with concurrent.futures.ProcessPoolExecutor() as pool:
    27. result = await loop.run_in_executor(
    28. pool, cpu_bound)
    29. print('custom process pool', result)
    30. if __name__ == '__main__':
    31. asyncio.run(main())

    loop.set_default_executor(executor)

    设置缺省的执行器。要求必须是class concurrent.futures.Executor或其子类。

    错误处理

    允许自定义事件循环中如何去处理异常。

    loop.set_exception_handler(handler)
    将 handler 设置为新的事件循环异常处理器。
    如果 handler 为 None,将设置默认的异常处理程序。 在其他情况下,handler 必须是一个可调用对象且签名匹配 (loop, context),其中 loop 是对活动事件循环的引用,而 context 是一个包含异常详情的 dict (请查看 call_exception_handler() 文档来获取关于上下文的更多信息)。

    loop.get_exception_handler()
    返回当前的异常处理器,如果没有设置异常处理器,则返回 None 。

    loop.default_exception_handler(context)
    默认的异常处理器。
    此方法会在发生异常且未设置异常处理程序时被调用。 此方法也可以由想要具有不同于默认处理程序的行为的自定义异常处理程序来调用。
    context 参数和 call_exception_handler() 中的同名参数完全相同。

    loop.call_exception_handler(context)
    调用当前事件循环的异常处理器。
    context 是个包含下列键的 dict 对象(未来版本的Python可能会引入新键):

    • 'message': 错误消息;
    • 'exception' (可选): 异常对象;
    • 'future' (可选): asyncio.Future 实例;
    • 'task' (可选): asyncio.Task 实例;
    • 'handle' (可选): asyncio.Handle 实例;
    • 'protocol' (可选): Protocol 实例;
    • 'transport' (可选): Transport 实例;
    • 'socket' (可选): socket.socket 实例;
    • 'asyncgen' (可选): 异步生成器,它导致了这个异常

    开启调试模式

     loop.get_debug()
    获取事件循环调试模式设置(bool)。
    如果环境变量 PYTHONASYNCIODEBUG 是一个非空字符串,就返回 True ,否则就返回 False 。

    loop.set_debug(enabled: bool)
    设置事件循环的调试模式。

  • 相关阅读:
    python医患档案管理系统vue+elementui+django
    【最新版】Git安装详细教程
    Linux磁盘管理(CentOS)
    java中的静态代理、jdk动态代理以及CGLIB 动态代理
    Git代码仓库管理系统的配置方法之:Gitlab
    Docker Compose初使用
    Kafka 典型问题与排查以及相关优化
    mysql里的事务
    influxdb2的使用
    w10系统 如何使用 C++、cmake、opencv、
  • 原文地址:https://blog.csdn.net/spiritx/article/details/133199619