流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。
asyncio 函数可以用来创建和处理异步网络流。
- import asyncio
-
- async def tcp_echo_client(message):
- reader, writer = await asyncio.open_connection(
- '127.0.0.1', 8888)
-
- print(f'Send: {message!r}')
- writer.write(message.encode())
- await writer.drain()
-
- data = await reader.read(100)
- print(f'Received: {data.decode()!r}')
-
- print('Close the connection')
- writer.close()
- await writer.wait_closed()
-
- asyncio.run(tcp_echo_client('Hello World!'))
语法为:
coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)
asyncio.open_connection()
是一个协程,用来建立网络连接并返回一对 (reader, writer) 对象。返回的 reader
和writer
对象是 StreamReader
和 StreamWriter
类的实例。
参数说明:
None
时传入。 默认情况下会使用 host 参数的值。 如果 host 为空那就没有默认值,你必须为 server_hostname 传入一个值。 如果 server_hostname 为空字符串,则主机名匹配会被禁用(这是一个严重的安全风险,使得潜在的中间人攻击成为可能)。None
则使用 (默认的) 60.0
。使用asyncio.open_connection()
我们依然需要像使用回调模式中那样,建立socket连接 > 设置为非阻塞IO > register到epoll/select中监听其IO状态
,这在asyncio.open_connection()
都实现了。
reader, writer = await asyncio.open_connection(host, 80)
语法为:
coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
参数说明:
TIME_WAIT
状态的本地套接字,而不是等待其自然超时失效。 如果未指定此参数则在 Unix 上将自动设置为 True
。None
则为 60.0
秒。返回一个 Server 对象。
class asyncio.Server
Server 对象是异步上下文管理器。当用于 async with
语句时,异步上下文管理器可以确保 Server 对象被关闭,并且在 async with
语句完成后,不接受新的连接。
- srv = await loop.create_server(...)
-
- async with srv:
- # some code
-
- # At this point, srv is closed and no longer accepts new connections.
Server 对象的主要方法和属性:
方法和属性名 | 说明 |
coroutine start_serving() | 启动server,开始接受连接。 |
coroutine serve_forever() | 启动server,开始接受连接,直到协程被取消。 serve_forever 任务的取消将导致服务器被关闭。如果服务器已经在接受连接了,这个方法可以被调用。每个 Server 对象,仅能有一个 serve_forever 任务。 |
is_serving() | 如果服务器正在接受新连接的状态,返回 True 。 |
close() | 停止服务:关闭监听的套接字并且设置 sockets 属性为 None 。用于表示已经连进来的客户端连接会保持打开的状态。 服务器是被异步关闭的,使用 wait_closed() 协程来等待服务器关闭。 |
coroutine wait_closed() | 等待 close() 方法执行完毕。 |
get_loop() | 返回与服务器对象相关联的事件循环。 |
sockets | 返回正在监听的socket对象,类型为asyncio.trsock.TransportSocket |
- import asyncio
-
- async def handle_echo(reader, writer):
- data = await reader.read(100)
- message = data.decode()
- addr = writer.get_extra_info('peername')
-
- print(f"Received {message!r} from {addr!r}")
-
- print(f"Send: {message!r}")
- writer.write(data)
- await writer.drain()
-
- print("Close the connection")
- writer.close()
- await writer.wait_closed()
-
- async def main():
- server = await asyncio.start_server(
- handle_echo, '127.0.0.1', 8888)
-
- addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
- print(f'Serving on {addrs}')
-
- async with server:
- await server.serve_forever()
-
- asyncio.run(main())
- async def client_connected(reader, writer):
- # Communicate with the client with
- # reader/writer streams. For example:
- await reader.readline()
-
- async def main(host, port):
- srv = await asyncio.start_server(
- client_connected, host, port)
- await srv.serve_forever()
-
- asyncio.run(main('127.0.0.1', 0))
语法为:
coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
建立一个 Unix 套接字连接并返回 (reader, writer)
这对返回值。
与 open_connection() 相似,但是是在 Unix 套接字上的操作。
参数说明:
语法为:
coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
启动一个 Unix 套接字服务。
与 start_server() 相似,但是是在 Unix 套接字上的操作。
参数与start_server()一样。
用于读取数据的流对象。
class asyncio.StreamReader
对象支持asynchronous iterable(异步可迭代对象,一个可以在 async for 语句中使用的对象。 必须通过它的 __aiter__() 方法返回一个 asynchronous iterator)。
不推荐直接实例化 StreamReader 对象,建议使用 open_connection() 和 start_server() 来获取 StreamReader 对象。
主要的方法和属性:
方法和属性名 | 说明 |
coroutine read(n=- 1) | 从流中读取n个bytes。如果n未提供或为-1,读到EOF,返回所有的bytes;如果n为0,返回空的bytes对象;如果n为正整数,返回最多n个bytes,如果到达EOF,提前返回。 |
coroutine readline() | 读取一行,其中“行”指的是以 \n 结尾的字节序列。 如果读到EOF而没有找到 \n ,该方法返回部分读取的数据。 如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。 |
coroutine readexactly(n) | 精确读取 n 个 bytes,不会超过也不能少于。如果在读取完 n 个byte之前读取到EOF,则会引发 IncompleteReadError 异常。使用 IncompleteReadError.partial 属性来获取到达流结束之前读取的 bytes 字符串。 |
coroutine readuntil(separator=b'\n') | 从流中读取数据直至遇到 separator 成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。 如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError 异常,数据将留在内部缓冲区中并可以再次读取。 如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError 异常,并重置内部缓冲区。 IncompleteReadError.partial 属性可能包含指定separator的一部分。 |
at_eof() | 如果缓冲区为空并且 feed_eof() 被调用,则返回 True |
用于写入数据的流对象
class asyncio.StreamWriter
这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。
不建议直接实例化 StreamWriter;而应改用 open_connection() 和 start_server()。
主要方法和属性:
属性和方法名 | 说明 |
write(data) | 此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。 此方法应当与 stream.write(data) await stream.drain() |
coroutine drain() | 等待直到可以适当地恢复写入到流。这是一个与下层的 IO 写缓冲区进行交互的流程控制方法。 当缓冲区大小达到最高水位(最大上限)时,drain() 会阻塞直到缓冲区大小减少至最低水位以便恢复写入。 当没有要等待的数据时,drain() 会立即返回。 |
writelines(data) | 此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。 此方法应当与 stream.writelines(lines) await stream.drain() |
close() | 关闭流以及下层的套接字,建议和wait_closed()一起使用: stream.close() await stream.wait_closed() |
coroutine wait_closed() | 等待直到流被关闭。 |
can_write_eof() | 如果下层的传输支持 write_eof() 方法则返回True,否则返回 False。 |
write_eof() | 在已缓冲的写入数据被刷新后关闭流的写入端。 |
transport | 返回下层的 asyncio 传输 |
get_extra_info(name, default=None) | 访问可选的传输信息 |
coroutine start_tls(sslcontext, \*, server_hostname=None, ssl_handshake_timeout=None) | 升级存在的流为TLS,参数与open_connection中的含义一致 |
is_closing() | 如果流已被关闭或正在被关闭则返回 True |
语法为:
coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
返回一个 Process 实例。
参数说明:
Process.stdout
and Process.stderr
的流的buffer限制coroutine asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, limit=None, **kwds)
运行 cmd shell 命令。
class asyncio.subprocess.Process
一个用于包装 create_subprocess_exec() and create_subprocess_shell() 函数创建的 OS 进程的对象。
主要方法和属性:
方法和属性名 | 说明 |
coroutine wait() | 等待子进程终结。 |
coroutine communicate(input=None) | 与进程交互: 使用 communicate() 方法而非 process.stdin.write(), await process.stdout.read() 或 await process.stderr.read()。 这可以避免由于流暂停读取或写入并阻塞子进程而导致的死锁。 |
send_signal(signal) | 将信号 signal 发送给子进程。 |
terminate() | 停止子进程。 |
kill() | 杀掉子进程。 在 POSIX 系统中此方法会发送 SIGKILL 给子进程。 在 Windows 上此方法是 terminate() 的别名。 |
stdin | 标准输入流 (StreamWriter ) 或者如果进程创建时设置了 stdin=None 则为 None 。 |
stdout | 标准输出流 (StreamReader ) 或者如果进程创建时设置了 stdout=None 则为 None 。 |
stderr | 标准错误流 (StreamReader ) 或者如果进程创建时设置了 stderr=None 则为 None 。 |
pid | 进程标识号(PID)。这个属性将是所生成的 shell 的 PID。 |
returncode | 当进程退出时返回其代号。 None 值表示进程尚未终止。 一个负值 -N 表示子进程被信号 N 中断 (仅 POSIX). |
- import asyncio
- import sys
-
- async def get_date():
- code = 'import datetime; print(datetime.datetime.now())'
-
- # Create the subprocess; redirect the standard output
- # into a pipe.
- proc = await asyncio.create_subprocess_exec(
- sys.executable, '-c', code,
- stdout=asyncio.subprocess.PIPE)
-
- # Read one line of output.
- data = await proc.stdout.readline()
- line = data.decode('ascii').rstrip()
-
- # Wait for the subprocess exit.
- await proc.wait()
- return line
-
- date = asyncio.run(get_date())
- print(f"Current date: {date}")
subprocess.DEVNULL
可被 Popen 的 stdin, stdout 或者 stderr 参数使用的特殊值, 表示使用特殊文件 os.devnull.
subprocess.PIPE
可被 Popen 的 stdin, stdout 或者 stderr 参数使用的特殊值, 表示打开标准流的管道. 常用于 Popen.communicate().
subprocess.STDOUT
可被 Popen 的 stderr 参数使用的特殊值, 表示标准错误与标准输出使用同一句柄。
asyncio 队列被设计成与 queue 模块类似。尽管 asyncio队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
注意asyncio 的队列没有 timeout 形参;请使用 asyncio.wait_for() 函数为队列添加超时操作。
class asyncio.Queue(maxsize=0)
如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于 0 的整数,则当队列达到 maxsize 时, await put() 将阻塞至某个元素被 get() 取出。
不像标准库中的并发型 queue ,队列的尺寸一直是已知的,可以通过调用 qsize() 方法返回。
这个类不是线程安全的
主要的方法和属性:
方法和属性名 | 说明 |
maxsize | 队列中可存放的元素数量。 |
full() | 如果有 maxsize 个条目在队列中,则返回 True 。 如果队列用 maxsize=0 (默认)初始化,则 full() 永远不会返回 True 。 |
coroutine put(item) | 添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。 |
put_nowait(item) | 不阻塞的放一个元素入队列。 如果没有立即可用的空闲槽,引发 QueueFull 异常。 |
qsize() | 返回队列用的元素数量。 |
empty() | 如果队列为空返回 True ,否则返回 False 。 |
coroutine get() | 从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。 |
get_nowait() | 立即返回一个队列中的元素,如果队列内没有值,将引发异常 QueueEmpty 。 |
task_done() | 表明前面排队的任务已经完成,即get出来的元素相关操作已经完成。 由队列使用者控制。每个 get() 用于获取一个任务,任务最后调用 task_done() 告诉队列,这个任务已经完成。 如果 join() 当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个 put() 进队列的条目的 task_done() 都被收到)。 如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 。 |
coroutine join() | 阻塞至队列中所有的元素都被接收和处理完毕。 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。 |
class asyncio.PriorityQueue
异步FIFO队列的变体,按优先级顺序取出条目 (最小的先取出)。条目通常是 (priority_number, data)
形式的元组。
class asyncio.LifoQueue
异步FIFO队列的变体,先取出最近添加的条目(后进,先出)。
exception asyncio.QueueEmpty
当队列为空的时候,调用 get_nowait() 方法而引发这个异常。
exception asyncio.QueueFull
当队列中条目数量已经达到它的 maxsize 的时候,调用 put_nowait() 方法而引发的异常。
- import asyncio
- import random
- import time
-
-
- async def worker(name, queue):
- while True:
- # Get a "work item" out of the queue.
- sleep_for = await queue.get()
-
- # Sleep for the "sleep_for" seconds.
- await asyncio.sleep(sleep_for)
-
- # Notify the queue that the "work item" has been processed.
- queue.task_done()
-
- print(f'{name} has slept for {sleep_for:.2f} seconds')
-
-
- async def main():
- # Create a queue that we will use to store our "workload".
- queue = asyncio.Queue()
-
- # Generate random timings and put them into the queue.
- total_sleep_time = 0
- for _ in range(20):
- sleep_for = random.uniform(0.05, 1.0)
- total_sleep_time += sleep_for
- queue.put_nowait(sleep_for)
-
- # Create three worker tasks to process the queue concurrently.
- tasks = []
- for i in range(3):
- task = asyncio.create_task(worker(f'worker-{i}', queue))
- tasks.append(task)
-
- # Wait until the queue is fully processed.
- started_at = time.monotonic()
- await queue.join()
- total_slept_for = time.monotonic() - started_at
-
- # Cancel our worker tasks.
- for task in tasks:
- task.cancel()
- # Wait until all worker tasks are cancelled.
- await asyncio.gather(*tasks, return_exceptions=True)
-
- print('====')
- print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
- print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
-
-
- asyncio.run(main())
如果能使用上面学习的函数和对象,尽可能去使用,本节所要学习的流相关的函数和对象都是与事件循环相关的,都是低层的函数和对象,需要较深入的了解asynicio和网络相关的知识。
asyncio 提供了一组抽象基类,它们应当被用于实现网络协议。 这些类被设计为与 传输 配合使用。
抽象基础协议类的子类可以实现其中的部分或全部方法。 所有这些方法都是回调:它们由传输或特定事件调用,例如当数据被接收的时候。 基础协议方法应当由相应的传输来调用。
class asyncio.BaseProtocol
带有所有协议的共享方法的基础协议。
所有 asyncio 协议均可实现基础协议回调。
连接回调会在所有协议上被调用,每个成功的连接将恰好调用一次。 所有其他协议回调只能在以下两个方法之间被调用。
BaseProtocol.connection_made(transport)
连接建立时被调用。
transport 参数是代表连接的传输。 此协议负责将引用保存至对应的传输。
BaseProtocol.connection_lost(exc)
连接丢失或关闭时将被调用。
方法的参数是一个异常对象或为 None。 后者意味着收到了常规的 EOF,或者连接被连接的一端取消或关闭。
流程控制回调可由传输来调用以暂停或恢复协议所执行的写入操作。
BaseProtocol.pause_writing()
当传输的缓冲区升至高水位以上时将被调用。
BaseProtocol.resume_writing()
当传输的缓冲区降到低水位以下时将被调用。
如果缓冲区大小等于高水位值,则 pause_writing() 不会被调用:缓冲区大小必须要高于该值。
相反地,resume_writing() 会在缓冲区大小等于或小于低水位值时被调用。 这些结束条件对于当两个水位取零值时也能确保符合预期的行为是很重要的。
可以通过WriteTransport.set_write_buffer_limits()设置缓冲区的high 和 low 高低标记位。
class asyncio.Protocol(BaseProtocol)
用于实现流式协议(TCP, Unix 套接字等等)的基类。
事件方法,例如 loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() 和 loop.connect_write_pipe() 都接受返回流式协议的工厂。
Protocol.data_received(data)
当收到数据时被调用。 data 为包含入站数据的非空字节串对象。
数据是否会被缓冲、分块或重组取决于具体传输。 通常,你不应依赖于特定的语义而应使你的解析具有通用性和灵活性。 但是,数据总是要以正确的顺序被接收。
此方法在连接打开期间可以被调用任意次数。
Protocol.eof_received()
当发出信号的另一端不再继续发送数据时(例如通过调用 transport.write_eof(),如果另一端也使用 asyncio 的话)被调用。
此方法可能返回假值 (包括 None),在此情况下传输将会自行关闭。 相反地,如果此方法返回真值,将以所用的协议来确定是否要关闭传输。 由于默认实现是返回 None,因此它会隐式地关闭连接。
某些传输,包括 SSL 在内,并不支持半关闭的连接,在此情况下从该方法返回真值将导致连接被关闭。
eof_received()最多只能调用一次,一旦调用后,data_received()就不能再继续调用了。
状态机:
- start -> connection_made
- [-> data_received]*
- [-> eof_received]?
- -> connection_lost -> end
class asyncio.BufferedProtocol(BaseProtocol)
用于实现可对接收缓冲区进行手动控制的流式协议的基类。
带缓冲的协议可与任何支持 流式协议 的事件循环方法配合使用。
BufferedProtocol
实现允许显式手动分配和控制接收缓冲区。 随后事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。 这对于接收大量数据的协议来说会有明显的性能提升。 复杂的协议实现能显著地减少缓冲区分配的数量。
BufferedProtocol的方法:
BufferedProtocol.get_buffer(sizehint)
调用后会分配新的接收缓冲区。
sizehint 是推荐的返回缓冲区最小尺寸。 返回小于或大于 sizehint 推荐尺寸的缓冲区也是可接受的。 当设为 -1 时,缓冲区尺寸可以是任意的。 返回尺寸为零的缓冲区则是错误的。
get_buffer() 必须返回一个实现了 缓冲区协议 的对象。
BufferedProtocol.buffer_updated(nbytes)
用接收的数据更新缓冲区时被调用。
nbytes 是被写入到缓冲区的字节总数。
BufferedProtocol.eof_received()
请查看 protocol.eof_received() 方法的文档。
在连接期间 get_buffer() 可以被调用任意次数。 但是,protocol.eof_received() 最多只能被调用一次,如果被调用,则在此之后 get_buffer() 和 buffer_updated() 不能再被调用。
状态机:
- start -> connection_made
- [-> get_buffer
- [-> buffer_updated]?
- ]*
- [-> eof_received]?
- -> connection_lost -> end
class asyncio.DatagramProtocol(BaseProtocol)
用于实现数据报(UDP)协议的基类。
数据报协议实例应当由传递给 loop.create_datagram_endpoint() 方法的协议工厂来构造。
DatagramProtocol.datagram_received(data, addr)
当接收到数据报时被调用。 data 是包含传入数据的字节串对象。 addr 是发送数据的对等端地址;实际的格式取决于具体传输。
DatagramProtocol.error_received(exc)
当前一个发送或接收操作引发 OSError 时被调用。 exc 是 OSError 的实例。
此方法会在当传输(例如UDP)检测到无法将数据报传给接收方等极少数情况下被调用。 而在大多数情况下,无法送达的数据报将被静默地丢弃。
在 BSD 系统(macOS, FreeBSD 等等)上,数据报协议不支持流控制,因为没有可靠的方式来检测因写入多过包所导致的发送失败。
套接字总是显示为 'ready' 且多余的包会被丢弃。 有一定的可能性会引发 OSError 并设置 errno 为 errno.ENOBUFS;如果此异常被引发,它将被报告给 DatagramProtocol.error_received(),在其他情况下则会被忽略。
class asyncio.SubprocessProtocol(BaseProtocol)
用于实现与子进程通信(单向管道)的协议的基类。
子进程协议实例应当由传递给 loop.subprocess_exec() 和 loop.subprocess_shell() 方法的协议工厂函数来构造。
SubprocessProtocol.pipe_data_received(fd, data)
当子进程向其 stdout 或 stderr 管道写入数据时被调用。
SubprocessProtocol.pipe_connection_lost(fd, exc)
与子进程通信的其中一个管道关闭时被调用。
SubprocessProtocol.process_exited()
子进程退出时被调用。
只能在pipe_data_received() and pipe_connection_lost() 函数之前调用。
传输属于 asyncio 模块中的类,用来抽象各种通信通道。
传输对象总是由 异步IO事件循环 实例化。
异步IO实现TCP、UDP、SSL和子进程管道的传输。传输上可用的方法由传输的类型决定。
传输类属于 线程不安全 。
class asyncio.BaseTransport
所有传输的基类。包含所有异步IO传输共用的方法。
主要方法:
方法和属性名 | 说明 |
BaseTransport.is_closing() | 返回 True ,如果传输正在关闭或已经关闭。 |
BaseTransport.close() | 关闭传输。如果传输有写入缓冲,会将缓冲的数据异步写入,但不再接收新的数据,所有缓冲数据写入后,会调用protocol.connection_lost(),参数为None。传输关闭后就不能再使用。 |
BaseTransport.set_protocol(protocol) | 设置一个新协议。 只有两种协议都写明支持切换才能完成切换协议。 |
BaseTransport.get_protocol() | 返回当前协议。 |
BaseTransport.get_extra_info(name, default=None) | 返回 传输或它使用的相关资源信息。 name 是表示要获取传输特定信息的字符串。 default 是在信息不可用或传输不支持第三方事件循环实现或当前平台查询时返回的值。 |
BaseTransport.get_extra_info(name, default=None)
传输可查询信息类别:
- sock = transport.get_extra_info('socket')
- if sock is not None:
- print(sock.getsockopt(...))
class asyncio.ReadTransport(BaseTransport)
只读链接的基础传输。
ReadTransport 类的实例由 loop.connect_read_pipe() 事件循环方法返回,也被子进程相关的方法如 loop.subprocess_exec() 使用。
主要方法和属性:
方法和属性名 | 说明 |
ReadTransport.is_reading() | 如果传输接收到新数据时返回 True 。 |
ReadTransport.pause_reading() | 暂停传输的接收端。protocol.data_received() 方法将不会收到数据,除非 resume_reading() 被调用。 |
ReadTransport.resume_reading() | 恢复接收端。如果有数据可读取时,协议方法 protocol.data_received() 将再次被调用。 |
class asyncio.WriteTransport(BaseTransport)
只写链接的基础传输。
WriteTransport 类的实例由 loop.connect_write_pipe() 事件循环方法返回,也被子进程相关的方法如 loop.subprocess_exec() 使用。
主要方法和属性:
方法和属性名 | 说明 |
WriteTransport.abort() | 立即关闭传输,不会等待已提交的操作处理完毕。已缓存的数据将会丢失。不会接收更多数据。 最终 None 将作为协议的 protocol.connection_lost() 方法的参数被调用。 |
WriteTransport.can_write_eof() | 如果传输支持 write_eof() 返回 True 否则返回 False 。 |
WriteTransport.write_eof() | 在刷新所有已缓冲数据之后关闭传输的写入端。 数据仍可以被接收。如果传输(例如 SSL)不支持半关闭的连接,此方法会引发 NotImplementedError。 |
WriteTransport.get_write_buffer_size() | 返回传输使用输出缓冲区的当前大小。 |
WriteTransport.get_write_buffer_limits() | 获取写入流控制 high 和 low 高低标记位。返回元组 (low, high) , low 和 high 为正字节数。 |
WriteTransport.set_write_buffer_limits(high=None, low=None) | 设置写入流控制 high 和 low 高低标记位。 这两个值(以字节数表示)控制何时调用协议的 protocol.pause_writing() 和 protocol.resume_writing() 方法。 如果指明,则低水位必须小于或等于高水位。 high 和 low 都不能为负值。 |
WriteTransport.write(data) | 将一些 data 字节串写入传输。此方法不会阻塞;它会缓冲数据并安排其被异步地发出。 |
WriteTransport.writelines(list_of_data) | 将数据字节串的列表(或任意可迭代对象)写入传输。 这在功能上等价于在可迭代对象产生的每个元素上调用 write(),但其实现可能更为高效。 |
class asyncio.Transport(WriteTransport, ReadTransport)
接口代表一个双向传输,如TCP链接。
用户不用直接实例化传输;调用一个功能函数,给它传递协议工厂和其它需要的信息就可以创建传输和协议。
传输 类实例由如 loop.create_connection() 、 loop.create_unix_connection() 、 loop.create_server() 、 loop.sendfile() 等这类事件循环方法使用或返回。
class asyncio.DatagramTransport(BaseTransport)
数据报(UDP)传输链接。
DatagramTransport 类实例由事件循环方法 loop.create_datagram_endpoint() 返回。
主要方法和属性:
方法和属性名 | 说明 |
DatagramTransport.sendto(data, addr=None) | 将 data 字节串发送到 addr (基于传输的目标地址) 所给定的远端对等方。 如果 addr 为 None,则将数据发送到传输创建时给定的目标地址。 此方法不会阻塞;它会缓冲数据并安排其被异步地发出。 |
DatagramTransport.abort() | 立即关闭传输,不会等待已提交的操作执行完毕。 已缓存的数据将会丢失。 不会接收更多的数据。 协议的 protocol.connection_lost() 方法最终将附带 None 作为参数被调用。 |
表示父进程和子进程之间连接的抽象。
SubprocessTransport 类的实例由事件循环方法 loop.subprocess_shell() 和 loop.subprocess_exec() 返回。
主要方法和属性:
方法和属性名 | 说明 |
SubprocessTransport.get_pid() | 将子进程的进程 ID 以整数形式返回。 |
SubprocessTransport.get_pipe_transport(fd) | 返回对应于整数文件描述符 fd 的通信管道的传输: 0: 标准输入 (stdin) 的可读流式传输,如果子进程创建时未设置 stdin=PIPE 则为 None 1: 标准输出 (stdout) 的可写流式传输,如果子进程创建时未设置 stdout=PIPE 则为 None 2: 标准错误 (stderr) 的可写流式传输,如果子进程创建时未设置 stderr=PIPE 则为 None 其他 fd: None |
SubprocessTransport.get_returncode() | 返回整数形式的进程返回码,或者如果还未返回则为 None,这类似于 subprocess.Popen.returncode 属性。 |
SubprocessTransport.kill() | 杀死子进程。 |
SubprocessTransport.send_signal(signal) | 发送 signal 编号到子进程 |
SubprocessTransport.terminate() | 停止子进程。在 POSIX 系统中,此方法会发送 SIGTERM 到子进程。 在 Windows 中,则会调用 Windows API 函数 TerminateProcess() 来停止子进程。 |
SubprocessTransport.close() | 通过调用 kill() 方法来杀死子进程。如果子进程尚未返回,并关闭 stdin, stdout 和 stderr 管道的传输。 |
coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)
打开一个流式传输连接,连接到由 host 和 port 指定的地址。参数的含义与asyncio.open_connection()相同。
这个方法会尝试在后台创建连接。当创建成功,返回 (transport, protocol) 组合。
底层操作的大致的执行顺序是这样的:
coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)
创建一个数据报连接。
参数说明:
UDP 回显客户端的例子:
- import asyncio
-
-
- class EchoClientProtocol:
- def __init__(self, message, on_con_lost):
- self.message = message
- self.on_con_lost = on_con_lost
- self.transport = None
-
- def connection_made(self, transport):
- self.transport = transport
- print('Send:', self.message)
- self.transport.sendto(self.message.encode())
-
- def datagram_received(self, data, addr):
- print("Received:", data.decode())
-
- print("Close the socket")
- self.transport.close()
-
- def error_received(self, exc):
- print('Error received:', exc)
-
- def connection_lost(self, exc):
- print("Connection closed")
- self.on_con_lost.set_result(True)
-
-
- async def main():
- # Get a reference to the event loop as we plan to use
- # low-level APIs.
- loop = asyncio.get_running_loop()
-
- on_con_lost = loop.create_future()
- message = "Hello World!"
-
- transport, protocol = await loop.create_datagram_endpoint(
- lambda: EchoClientProtocol(message, on_con_lost),
- remote_addr=('127.0.0.1', 9999))
-
- try:
- await on_con_lost
- finally:
- transport.close()
-
-
- asyncio.run(main())
coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
创建Unix 域连接
协议簇必须是AF_UNIX,socket的类型是SOCK_STREAM
成功时返回一个 (transport, protocol)
元组。
path 是所要求的 Unix 域套接字的名字,除非指定了 sock 形参。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。
coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
创建一个基于TCP的网络服务(socket类型为SOCK_STREAM),返回一个 Server 对象。
参数:
TIME_WAIT
状态的本地套接字,而不是等待其自然超时失效。 如果未指定此参数则在 Unix 上将自动设置为 True
。None
则为 60.0
秒。coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)
与create_server()非常相似,但是协议簇为AF_UNIX。
参数:
coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
将已被接受的连接包装成一个传输/协议对。
此方法可被服务器用来接受 asyncio 以外的连接,但是使用 asyncio 来处理它们。
参数:
返回一个 (transport, protocol)
对。
coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)
将 file 通过 transport 发送。 返回所发送的字节总数。
如果可用的话,该方法将使用高性能的 os.sendfile()。
file 必须是个二进制模式打开的常规文件对象。
offset 指明从何处开始读取文件。 如果指定了 count,它是要传输的字节总数而不再一直发送文件直至抵达 EOF。 文件位置总是会被更新,即使此方法引发了错误,并可以使用 file.tell() 来获取实际发送的字节总数。
fallback 设为 True 会使得 asyncio 在平台不支持 sendfile 系统调用时手动读取并发送文件(例如 Windows 或 Unix 上的 SSL 套接字)。
如果系统不支持 sendfile 系统调用且 fallback 为 False 则会引发 SendfileNotAvailableError。
coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)
将现有基于传输的连接升级到 TLS。
参数:
loop.add_reader(fd, callback, *args)
开始监视 fd 文件描述符以获取读取的可用性,一旦 fd 可用于读取,使用指定的参数调用 callback 。
loop.remove_reader(fd)
移除对fd的读监控
loop.add_writer(fd, callback, *args)
开始监视 fd 文件描述符的写入可用性,一旦 fd 可用于写入,使用指定的参数调用 callback 。
loop.remove_writer(fd)
移除对fd的写监控
通常,使用基于传输的 API 的协议实现,例如 loop.create_connection() 和 loop.create_server() 比直接使用套接字的实现更快。 但是,在某些应用场景下性能并不非常重要,直接使用 socket 对象会更方便。
coroutine loop.sock_recv(sock, nbytes)
从 sock 接收至多 nbytes。 socket.recv() 的异步版本。返回接收到的数据【bytes对象类型】。sock 必须是个非阻塞socket。
coroutine loop.sock_recv_into(sock, buf)
从 sock 接收数据放入 buf 缓冲区。 模仿了阻塞型的 socket.recv_into() 方法。返回写入缓冲区的字节数。sock 必须是个非阻塞socket。
coroutine loop.sock_recvfrom(sock, bufsize)
socket.sock_recvfrom()的异步版本。sock 必须是个非阻塞socket。
coroutine loop.sock_recvfrom_into(sock, buf, nbytes=0)
socket.sock_recvfrom_into()的异步版本,sock 必须是个非阻塞socket。
coroutine loop.sock_sendall(sock, data)
将 data 发送到 sock 套接字。 socket.sendall() 的异步版本。
此方法会持续发送数据到套接字直至 data 中的所有数据发送完毕或是有错误发生。 当成功时会返回 None。 当发生错误时,会引发一个异常。 此外,没有办法能确定有多少数据或是否有数据被连接的接收方成功处理。
sock 必须是个非阻塞socket。
coroutine loop.sock_sendto(sock, data, address)
socket.sendto()的异步版本sock 必须是个非阻塞socket。
coroutine loop.sock_connect(sock, address)
将 sock 连接到位于 address 的远程套接字。socket.connect() 的异步版本。sock 必须是个非阻塞socket。
coroutine loop.sock_accept(sock)
接受一个连接。 模仿了阻塞型的 socket.accept() 方法。
此 scoket 必须绑定到一个地址上并且监听连接。返回值是一个 (conn, address) 对,其中 conn 是一个 新*的套接字对象,用于在此连接上收发数据,*address 是连接的另一端的套接字所绑定的地址。
sock 必须是个非阻塞socket。
coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)
在可能的情况下使用高性能的 os.sendfile 发送文件。 返回所发送的字节总数。
socket.sendfile() 的异步版本。
sock 必须为非阻塞型的 socket.SOCK_STREAM socket。
file 必须是个用二进制方式打开的常规文件对象。
offset 指明从何处开始读取文件。 如果指定了 count,它是要传输的字节总数而不再一直发送文件直至抵达 EOF。 文件位置总是会被更新,即使此方法引发了错误,并可以使用 file.tell() 来获取实际发送的字节总数。
当 fallback 被设为 True 时,会使用 asyncio 在平台不支持 sendfile 系统调用时手动读取并发送文件(例如 Windows 或 Unix 上的 SSL 套接字)。
如果系统不支持 sendfile 并且 fallback 为 False ,引发 SendfileNotAvailableError 异常。
sock 必须是个非阻塞socket。
coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)
异步版的 socket.getaddrinfo() 。
coroutine loop.getnameinfo(sockaddr, flags=0)
异步版的 socket.getnameinfo() 。
coroutine loop.connect_read_pipe(protocol_factory, pipe)
在事件循环中注册 pipe 的读取端。
参数:
返回一对 (transport, protocol),其中 transport 支持 ReadTransport 接口而 protocol 是由 protocol_factory 所实例化的对象。
使用 SelectorEventLoop 事件循环, pipe 被设置为非阻塞模式。
coroutine loop.connect_write_pipe(protocol_factory, pipe)
在事件循环中注册 pipe 的写入端。
参数说明:
返回一对 (transport, protocol),其中 transport 支持 WriteTransport 接口而 protocol 是由 protocol_factory 所实例化的对象。
使用 SelectorEventLoop 事件循环, pipe 被设置为非阻塞模式。
备注 在 Windows 中 SelectorEventLoop 不支持上述方法。 对于 Windows 请改用 ProactorEventLoop。
loop.add_signal_handler(signum, callback, *args)
设置 callback 作为 signum 信号的处理程序。
此回调将与该事件循环中其他加入队列的回调和可运行协程一起由 loop 发起调用。 不同与使用 signal.signal() 注册的信号处理程序,使用此函数注册的回调可以与事件循环进行交互。
如果信号数字非法或者不可捕获,就抛出一个 ValueError 。如果建立处理器的过程中出现问题,会抛出一个 RuntimeError 。
和 signal.signal() 一样,这个函数只能在主线程中调用。
loop.remove_signal_handler(sig)
移除 sig 信号的处理程序。
如果信号处理程序被移除则返回 True,否则如果给定信号未设置处理程序则返回 False。
awaitable loop.run_in_executor(executor, func, *args)
安排在指定的执行器中调用 func 。
参数说明:
- import asyncio
- import concurrent.futures
-
- def blocking_io():
- # File operations (such as logging) can block the
- # event loop: run them in a thread pool.
- with open('/dev/urandom', 'rb') as f:
- return f.read(100)
-
- def cpu_bound():
- # CPU-bound operations will block the event loop:
- # in general it is preferable to run them in a
- # process pool.
- return sum(i * i for i in range(10 ** 7))
-
- async def main():
- loop = asyncio.get_running_loop()
-
- ## Options:
-
- # 1. Run in the default loop's executor:
- result = await loop.run_in_executor(
- None, blocking_io)
- print('default thread pool', result)
-
- # 2. Run in a custom thread pool:
- with concurrent.futures.ThreadPoolExecutor() as pool:
- result = await loop.run_in_executor(
- pool, blocking_io)
- print('custom thread pool', result)
-
- # 3. Run in a custom process pool:
- with concurrent.futures.ProcessPoolExecutor() as pool:
- result = await loop.run_in_executor(
- pool, cpu_bound)
- print('custom process pool', result)
-
- if __name__ == '__main__':
- asyncio.run(main())
loop.set_default_executor(executor)
设置缺省的执行器。要求必须是class concurrent.futures.Executor或其子类。
允许自定义事件循环中如何去处理异常。
loop.set_exception_handler(handler)
将 handler 设置为新的事件循环异常处理器。
如果 handler 为 None,将设置默认的异常处理程序。 在其他情况下,handler 必须是一个可调用对象且签名匹配 (loop, context),其中 loop 是对活动事件循环的引用,而 context 是一个包含异常详情的 dict (请查看 call_exception_handler() 文档来获取关于上下文的更多信息)。
loop.get_exception_handler()
返回当前的异常处理器,如果没有设置异常处理器,则返回 None 。
loop.default_exception_handler(context)
默认的异常处理器。
此方法会在发生异常且未设置异常处理程序时被调用。 此方法也可以由想要具有不同于默认处理程序的行为的自定义异常处理程序来调用。
context 参数和 call_exception_handler() 中的同名参数完全相同。
loop.call_exception_handler(context)
调用当前事件循环的异常处理器。
context 是个包含下列键的 dict 对象(未来版本的Python可能会引入新键):
loop.get_debug()
获取事件循环调试模式设置(bool)。
如果环境变量 PYTHONASYNCIODEBUG 是一个非空字符串,就返回 True ,否则就返回 False 。
loop.set_debug(enabled: bool)
设置事件循环的调试模式。