• Dart笔记:stream_channel 包用法


    标题1
    标题2

    作者李俊才 (jcLee95)https://blog.csdn.net/qq_28550263
    邮箱 :291148484@163.com
    本文地址https://blog.csdn.net/qq_28550263/article/details/133426961


    【介绍】stream_channel是一个用于处理流(stream)通信的库。它提供了一种在不同部分之间传输数据的机制,特别适用于处理异步事件流。stream_channel库的主要目标是提供简单而强大的工具,以便不同组件、模块、或者甚至不同Flutter Widget之间可以进行实时的事件或数据交流。

    目 录


    1. 概述

    1.1 stream_channel是什么

    stream_channel包提供了StreamChannel接口,它代表了一个双向通信通道。每个StreamChannel都提供了一个用于接收数据的Stream和一个用于发送数据的StreamSink。stream_channel包还包含用于处理StreamChannel和双向通信的实用工具。

    StreamChannel有助于将通信逻辑与底层协议分离。例如,test 包在浏览器套件的 WebSocket 连接和 VM 测试的隔离连接中都重用了其测试套件通信协议。

    stream_channel 库的主要有:

    • 实时通信:在Flutter应用程序的不同部分之间实时传输数据,例如将数据从一个屏幕传递到另一个屏幕或从一个Widget传递到另一个Widget。
    • 异步事件处理:处理异步事件流,例如从网络请求、传感器数据、或者定时器获得的事件流中提取和处理数据。
    • 多通道管理:通过MultiChannel类,可以在单个底层传输层上复用多个虚拟通道,使不同类型的数据可以通过不同的通道进行传输。
    • 与Isolate通信:通过IsolateChannel,在Flutter应用程序的不同Isolate之间进行通信,以实现并发处理和数据传递。

    1.2 简单回顾: Stream

    Stream 表示一个异步的数据流,它可以产生一系列的数据事件(通常是某种类型的对象)供订阅者处理。流通常用于处理连续的事件或数据,例如读取文件、接收网络请求或监视用户输入。

    相比于Future

    功能上看
    • Future 是一次性的,它代表了一个异步操作的结果,一旦操作完成,就不能再次使用。用起来就像这样:
      Future<int> fetchValue() async {
        // 异步操作
        return 666;
      }
      
      fetchValue().then((value) {
        print(value); // 处理结果
      }).catchError((error) {
        print(error); // 处理错误
      });
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    • Stream 是持续的,它可以生成多个事件,而不会销毁。可以订阅一个 Stream 并监听其事件,每次有新事件生成时,会触发订阅者的回调函数。用起来就像这样:
      // 创建一个异步生成器函数,它返回一个 Stream,用于生成一个整数计数器的事件流。
      Stream<int> createCounterStream() async* {
        // 使用 for 循环生成 0 到 4 的整数。
        for (int i = 0; i < 5; i++) {
          // 在生成每个整数之前,等待 1 秒钟的延迟。
          await Future.delayed(Duration(seconds: 1));
          // 使用 yield 关键字将整数添加到事件流中。
          yield i;
        }
      }
      
      // 调用 createCounterStream 函数以获取事件流。
      final stream = createCounterStream();
      
      // 使用 stream.listen() 订阅事件流,每当有新事件生成时,将调用回调函数。
      stream.listen((value) {
        // 处理每个事件,这里将事件的值打印到控制台。
        print(value); // 处理每个事件
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19

    关于StreamSink

    StreamSink 多是从一个 StreamController 实例的sink属性上获取使用的,就如controller.sink,而不会单独使用。

    StreamSink 用于将数据写入一个异步数据流 (Stream)。它是 Dart 异步编程中的一个重要组件,通常用于数据的输出、推送或写入操作。例如:

    import 'dart:async';
    
    void main() {
      var controller = StreamController<int>();
    
      // 获取 StreamSink
      var sink = controller.sink;
    
      // 向流中添加数据
      sink.add(1);
      sink.add(2);
      sink.add(3);
    
      // 关闭流
      sink.close();
    
      // 监听流的数据
      controller.stream.listen((data) {
        print(data); // 打印 1、2、3
      });
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    场景上看
    • Future 适合表示一次性操作,例如发起一个网络请求、读取一个文件、执行一个计算密集型任务等。它通常用于等待某个操作完成并获取其结果;
    • Stream 适合表示连续的事件流,例如实时数据更新、用户输入事件、从多个来源获取数据等。它通常用于监视一系列事件并对它们进行处理。

    1.3 加入依赖

    flutter pub add stream_channel
    
    • 1

    2. 双向通信通道 StreamChannel 类

    StreamChannel类是一个抽象类,表示一个双向通信通道。它定义了用于与通道进行交互的方法和属性,包括用于从通道读取数据的stream和用于向通道写入数据的sinkStreamChannel用于在Dart中实现双向通信,例如在网络通信中使用。

    StreamChannel类的属性和方法

    stream属性
    • 语法格式Stream get stream;
    • 功能:获取用于从通道接收数据的单订阅流。
    • 返回值Stream - 用于从通道接收数据的单订阅流。
    • 用法示例
    StreamChannel<String> channel = ...; // 创建一个StreamChannel
    Stream<String> dataStream = channel.stream; // 获取用于接收数据的流
    
    • 1
    • 2
    sink属性
    • 语法格式StreamSink get sink;
    • 功能:获取用于向通道发送数据的流式数据接收器。
    • 返回值StreamSink - 用于向通道发送数据的流式数据接收器。
    • 用法示例
    StreamChannel<String> channel = ...; // 创建一个StreamChannel
    StreamSink<String> dataSink = channel.sink; // 获取用于发送数据的流式数据接收器
    
    • 1
    • 2
    StreamChannel构造函数
    • 语法格式factory StreamChannel(Stream stream, StreamSink sink) => _StreamChannel(stream, sink);
    • 功能:创建一个新的StreamChannel,用于双向通信,基于提供的streamsink
    • 参数
      • stream:用于从通道接收数据的单订阅流。
      • sink:用于向通道发送数据的流式数据接收器。
    • 返回值StreamChannel - 新创建的StreamChannel实例。
    • 用法示例
    Stream<String> inputStream = ...; // 创建一个输入流
    StreamSink<String> outputStream = ...; // 创建一个输出流
    StreamChannel<String> channel = StreamChannel(inputStream, outputStream); // 创建StreamChannel
    
    • 1
    • 2
    • 3
    StreamChannel.withGuarantees构造函数
    • 语法格式factory StreamChannel.withGuarantees(Stream stream, StreamSink sink, {bool allowSinkErrors = true}) => GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
    • 功能:创建一个新的StreamChannel,用于双向通信,同时强制实施StreamChannel文档中列出的保证。
    • 参数
      • stream:用于从通道接收数据的单订阅流。
      • sink:用于向通道发送数据的流式数据接收器。
      • allowSinkErrors:一个布尔值,指定是否允许将错误传递给sink。默认为true
    • 返回值StreamChannel - 新创建的StreamChannel实例,具有保证。
    • 用法示例
    Stream<String> inputStream = ...; // 创建一个输入流
    StreamSink<String> outputStream = ...; // 创建一个输出流
    StreamChannel<String> channel = StreamChannel.withGuarantees(inputStream, outputStream, allowSinkErrors: false); // 创建具有保证的StreamChannel
    
    • 1
    • 2
    • 3
    StreamChannel.withCloseGuarantee构造函数
    • 语法格式factory StreamChannel.withCloseGuarantee(Stream stream, StreamSink sink) => CloseGuaranteeChannel(stream, sink);
    • 功能:创建一个新的StreamChannel,用于双向通信,特别强制实施通道关闭的保证。
    • 参数
      • stream:用于从通道接收数据的单订阅流。
      • sink:用于向通道发送数据的流式数据接收器。
    • 返回值StreamChannel - 新创建的StreamChannel实例,具有通道关闭的保证。
    • 用法示例
    Stream<String> inputStream = ...; // 创建一个输入流
    StreamSink<String> outputStream = ...; // 创建一个输出流
    StreamChannel<String> channel = StreamChannel.withCloseGuarantee(inputStream, outputStream); // 创建具有通道关闭保证的StreamChannel
    
    • 1
    • 2
    • 3
    pipe方法
    • 语法格式void pipe(StreamChannel other);
    • 功能:连接当前StreamChannel到另一个StreamChannel,使两者之间的数据传输直接相互转发。
    • 参数
      • other:另一个StreamChannel,用于连接到当前通道。
    • 返回值:无。
    • 用法示例
    StreamChannel<String> channel1 = ...; // 创建第一个StreamChannel
    StreamChannel<String> channel2 = ...; // 创建第二个StreamChannel
    channel1.pipe(channel2); // 将第一个通道的数据传输到第二个通道,反之亦然
    
    • 1
    • 2
    • 3
    transform方法
    • 语法格式StreamChannel transform(StreamChannelTransformer transformer);
    • 功能:使用指定的StreamChannelTransformer对当前StreamChannel进行转换。
    • 参数
      • transformer:要应用于当前通道的转换器。
    • 返回值StreamChannel - 转换后的StreamChannel实例。
    • 用法示例
    StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
    StreamChannel<int> channel2 = channel1.transform(IntToStringTransformer()); // 使用转换器将String类型的通道转换为Int类型的通道
    
    • 1
    • 2
    transformStream方法
    • 语法格式StreamChannel transformStream(StreamTransformer transformer);
    • 功能:仅对当前通道的stream部分应用指定的StreamTransformer
    • 参数
      • transformer:要应用于stream的转换器。
    • 返回值StreamChannel - 转换后的StreamChannel实例。
    • 用法示例
    StreamChannel<String>
    
     channel1 = ...; // 创建一个StreamChannel
    StreamChannel<String> channel2 = channel1.transformStream(StringTransformer()); // 使用转换器仅对stream部分进行转换
    
    • 1
    • 2
    • 3
    • 4
    transformSink方法
    • 语法格式StreamChannel transformSink(StreamSinkTransformer transformer);
    • 功能:仅对当前通道的sink部分应用指定的StreamSinkTransformer
    • 参数
      • transformer:要应用于sink的转换器。
    • 返回值StreamChannel - 转换后的StreamChannel实例。
    • 用法示例
    StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
    StreamChannel<String> channel2 = channel1.transformSink(StringSinkTransformer()); // 使用转换器仅对sink部分进行转换
    
    • 1
    • 2
    changeStream方法
    • 语法格式StreamChannel changeStream(Stream Function(Stream) change);
    • 功能:返回当前通道的副本,其中stream部分被替换为由change函数返回的值。
    • 参数
      • change:一个函数,用于更改stream部分。
    • 返回值StreamChannel - 具有更改后的stream部分的StreamChannel实例。
    • 用法示例
    StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
    StreamChannel<String> channel2 = channel1.changeStream((stream) => stream.where((data) => data.isNotEmpty)); // 过滤掉stream中的空数据
    
    • 1
    • 2
    changeSink方法
    • 语法格式StreamChannel changeSink(StreamSink Function(StreamSink) change);
    • 功能:返回当前通道的副本,其中sink部分被替换为由change函数返回的值。
    • 参数
      • change:一个函数,用于更改sink部分。
    • 返回值StreamChannel - 具有更改后的sink部分的StreamChannel实例。
    • 用法示例
    StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
    StreamChannel<String> channel2 = channel1.changeSink((sink) => MyCustomSink(sink)); // 使用自定义的sink替换原有的sink
    
    • 1
    • 2
    cast方法
    • 语法格式StreamChannel cast();
    • 功能:返回当前通道的副本,将通道的泛型类型强制转换为S
    • 返回值StreamChannel - 具有泛型类型为SStreamChannel实例。
    • 用法示例
    StreamChannel<dynamic> channel1 = ...; // 创建一个通用类型的StreamChannel
    StreamChannel<int> channel2 = channel1.cast<int>(); // 强制将通道类型转换为int类型
    
    • 1
    • 2

    这些方法和属性组合使得StreamChannel类能够实现双向通信,并提供了多种方法来操作和转换通道的数据流。这些方法使得StreamChannel在Dart中成为一个有用的工具,用于处理双向数据流通信。

    3. IsolateChannel类

    IsolateChannel类是一个实现了StreamChannel接口的通道,用于在不同的隔离体(isolate)之间进行通信,通常用于与另一个隔离体进行数据交换。它基于两个Isolate之间的ReceivePortSendPort实现,允许双向通信。

    3.1 IsolateChannel类的构造函数

    IsolateChannel.connectReceive构造函数
    • 语法格式factory IsolateChannel.connectReceive(ReceivePort receivePort);
    • 功能:连接到使用IsolateChannel.connectSend创建的远程通道。
    • 参数
      • receivePort:用于接收消息的ReceivePort
    • 返回值IsolateChannel - 已连接到远程通道的IsolateChannel实例。
    • 用法示例
    ReceivePort remoteReceivePort = ...; // 创建远程ReceivePort
    IsolateChannel<String> channel = IsolateChannel.connectReceive(remoteReceivePort);
    
    • 1
    • 2
    IsolateChannel.connectSend构造函数
    • 语法格式factory IsolateChannel.connectSend(SendPort sendPort);
    • 功能:连接到使用IsolateChannel.connectReceive创建的远程通道。
    • 参数
      • sendPort:用于发送消息的SendPort
    • 返回值IsolateChannel - 已连接到远程通道的IsolateChannel实例。
    • 用法示例
    SendPort remoteSendPort = ...; // 创建远程SendPort
    IsolateChannel<String> channel = IsolateChannel.connectSend(remoteSendPort);
    
    • 1
    • 2
    IsolateChannel构造函数
    • 语法格式factory IsolateChannel(ReceivePort receivePort, SendPort sendPort);
    • 功能:创建一个IsolateChannel,用于在两个隔离体之间传递消息。
    • 参数
      • receivePort:用于接收消息的本地ReceivePort
      • sendPort:用于发送消息的远程SendPort
    • 返回值IsolateChannel - 已创建的IsolateChannel实例。
    • 用法示例
    ReceivePort localReceivePort = ReceivePort(); // 创建本地ReceivePort
    SendPort remoteSendPort = ...; // 获取远程SendPort
    IsolateChannel<String> channel = IsolateChannel(localReceivePort, remoteSendPort);
    
    • 1
    • 2
    • 3

    3.2 IsolateChannel类的属性

    stream属性
    • 语法格式Stream get stream;
    • 功能:获取从远程通道接收到的消息的输入流。
    • 返回值Stream - 输入流,用于接收从远程通道发送的消息。
    • 用法示例
    IsolateChannel<String> channel = ...; // 创建一个IsolateChannel
    Stream<String> inputStream = channel.stream; // 获取输入流
    
    • 1
    • 2
    sink属性
    • 语法格式StreamSink get sink;
    • 功能:获取发送消息到远程通道的输出流。
    • 返回值StreamSink - 输出流,用于发送消息到远程通道。
    • 用法示例
    IsolateChannel<String> channel = ...; // 创建一个IsolateChannel
    StreamSink<String> outputStream = channel.sink; // 获取输出流
    
    • 1
    • 2

    IsolateChannel类允许在不同的隔离体之间进行通信,通过stream属性接收远程通道发送的消息,通过sink属性发送消息到远程通道。它提供了两种构造函数用于建立连接,分别是IsolateChannel.connectReceiveIsolateChannel.connectSend,并且可以通过IsolateChannel构造函数创建一个新的IsolateChannel来实现双向通信。

    4.MultiChannel类

    MultiChannel类是一个抽象类,用于多路复用多个虚拟通道(Virtual Channel)在单一的底层传输层之上。它允许在一个通道上创建多个虚拟通道,每个虚拟通道都可以独立传输数据。虚拟通道可用于在两个端点之间进行双向通信,通过底层通道进行消息传递。

    4.1 MultiChannel类的构造函数

    MultiChannel构造函数
    • 语法格式factory MultiChannel(StreamChannel inner) => _MultiChannel(inner);
    • 功能:创建一个新的MultiChannel,用于在内部传输层上发送和接收消息。
    • 参数
      • inner:用于在内部传输层上发送和接收消息的底层通道,必须接受类似JSON的对象。
    • 返回值MultiChannel - 新创建的MultiChannel实例。
    • 用法示例
    StreamChannel<dynamic> innerChannel = ...; // 创建一个内部通道
    MultiChannel<String> channel = MultiChannel(innerChannel); // 创建MultiChannel
    
    • 1
    • 2

    4.2 MultiChannel类的属性

    stream属性
    • 语法格式Stream get stream;
    • 功能:获取默认的输入流,连接到远程通道的输出。
    • 返回值Stream - 默认的输入流。
    • 用法示例
    MultiChannel<String> channel = ...; // 创建一个MultiChannel
    Stream<String> inputStream = channel.stream; // 获取默认的输入流
    
    • 1
    • 2
    sink属性
    • 语法格式StreamSink get sink;
    • 功能:获取默认的输出流,连接到远程通道的输入。如果关闭此输出流,则远程输入流将关闭,但其他虚拟通道将保持打开,并且可以创建新的虚拟通道。
    • 返回值StreamSink - 默认的输出流。
    • 用法示例
    MultiChannel<String> channel = ...; // 创建一个MultiChannel
    StreamSink<String> outputStream = channel.sink; // 获取默认的输出流
    
    • 1
    • 2

    4.3 MultiChannel类的方法

    virtualChannel方法
    • 语法格式VirtualChannel virtualChannel([int? id]);
    • 功能:创建一个新的虚拟通道(Virtual Channel)。
    • 参数
      • id(可选):虚拟通道的标识符。如果未提供,将创建一个新的虚拟通道。如果提供,将创建与远程通道上具有相同标识符的虚拟通道。
    • 返回值VirtualChannel - 新创建的虚拟通道(Virtual Channel)。
    • 用法示例
    MultiChannel<String> multiChannel = ...; // 创建一个MultiChannel
    VirtualChannel<String> virtual = multiChannel.virtualChannel(); // 创建新的虚拟通道
    
    • 1
    • 2

    这些方法和属性使得MultiChannel类能够实现多路复用多个虚拟通道在单一底层传输层上进行通信,从而实现双向通信,并允许在一个通道上创建多个虚拟通道以独立传输数据。这种模式对于网络通信和消息传递非常有用。

    5. Disconnector类

    Disconnector类是一个StreamChannelTransformer,用于允许调用者强制断开通道连接。通过这个转换器,可以实现对通道的断开操作,导致通道的stream会发出done事件,而sink会忽略后续的输入。同时,内部的sink也会被关闭,以通知远程端断开连接。

    5.1 Disconnector类的属性

    isDisconnected属性
    • 语法格式bool get isDisconnected;
    • 功能:判断是否已经调用了disconnect方法来断开通道连接。
    • 返回值bool - 如果已经断开连接,则为true;否则为false
    • 用法示例
    Disconnector<String> disconnector = ...; // 创建一个Disconnector
    bool disconnected = disconnector.isDisconnected; // 判断是否已断开连接
    
    • 1
    • 2

    5.2 Disconnector类的方法

    disconnect方法
    • 语法格式Future disconnect();
    • 功能:断开所有已经被转换的通道连接。
    • 返回值Future - 表示断开连接的未来对象,当所有内部sinkStreamSink.close完成后,该未来对象将完成。
    • 用法示例
    Disconnector<String> disconnector = ...; // 创建一个Disconnector
    Future<void> disconnectFuture = disconnector.disconnect(); // 断开连接并获取未来对象
    
    • 1
    • 2
    bind方法
    • 语法格式StreamChannel bind(StreamChannel channel);
    • 功能:将Disconnector应用于给定的通道,返回一个已经应用了断开连接逻辑的通道。
    • 参数
      • channel:要应用Disconnector的通道。
    • 返回值StreamChannel - 已应用了断开连接逻辑的通道。
    • 用法示例
    Disconnector<String> disconnector = ...; // 创建一个Disconnector
    StreamChannel<String> channel = ...; // 创建一个通道
    StreamChannel<String> transformedChannel = disconnector.bind(channel); // 应用Disconnector到通道
    
    • 1
    • 2
    • 3

    5.3 _DisconnectorSink类

    _DisconnectorSink类是Disconnector内部使用的辅助类,它是StreamSink的包装器,用于实现强制断开连接的功能。

    5.3.1 _DisconnectorSink类的属性

    done属性
    • 语法格式Future get done;
    • 功能:获取底层sink的done属性。
    • 返回值Future - 表示底层sink的done属性。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    Future<void> doneFuture = disconnectorSink.done; // 获取done属性的未来对象
    
    • 1
    • 2

    5.3.2 _DisconnectorSink类的方法

    add方法
    • 语法格式void add(T data);
    • 功能:向底层sink中添加数据,如果已经断开连接则不会添加。
    • 参数
      • data:要添加的数据。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    disconnectorSink.add("Hello"); // 向底层sink中添加数据
    
    • 1
    • 2
    addError方法
    • 语法格式void addError(Object error, [StackTrace? stackTrace]);
    • 功能:向底层sink中添加错误信息,如果已经断开连接则不会添加。
    • 参数
      • error:要添加的错误对象。
      • stackTrace:可选参数,表示错误的堆栈跟踪信息。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    disconnectorSink.addError(Exception("An error occurred")); // 向底层sink中添加错误信息
    
    • 1
    • 2
    addStream方法
    • 语法格式Future addStream(Stream stream);
    • 功能:将一个流中的数据添加到底层sink中,如果已经断开连接则不会添加。
    • 参数
      • stream:要添加的流。
    • 返回值Future - 表示添加流的未来对象,当添加完成后,未来对象将完成。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    Stream<String> dataStream = ...; // 创建一个数据流
    Future<void> addStreamFuture = disconnectorSink.addStream(dataStream); // 将流中的数据添加到底层sink中
    
    • 1
    • 2
    • 3
    close方法
    • 语法格式Future close();
    • 功能:关闭底层sink,同时标记通道已经关闭。
    • 返回值Future - 表示关闭底层sink的未来对象,当关闭完成后,未来对象将完成。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    Future<void> closeFuture = disconnectorSink.close(); // 关闭底层sink
    
    • 1
    • 2
    _disconnect方法
    • 语法格式Future _disconnect();
    • 功能:断开底层sink,停止转发事件。
    • 返回值Future - 表示断开底层sink的未来对象。
    • 用法示例
    _DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
    Future<void> disconnectFuture = disconnectorSink._disconnect(); // 断开底层sink
    
    • 1
    • 2

    6. 分析示例代码

    以下是官方的讲解示例代码:

    import 'dart:async';
    import 'dart:convert';
    import 'dart:io';
    import 'dart:isolate';
    
    import 'package:stream_channel/isolate_channel.dart';
    import 'package:stream_channel/stream_channel.dart';
    
    Future<void> main() async {
      // 一个 StreamChannel 在最简单的情况下,是一个包装了 Stream 和 StreamSink 的对象。
      // 例如,可以创建一个包装标准输入输出的通道:
      var stdioChannel = StreamChannel(stdin, stdout);
      stdioChannel.sink.add('Hello!\n'.codeUnits);
    
      // 就像可以使用 StreamTransformer 转换 Stream 一样,可以使用 StreamChannelTransformer 转换 StreamChannel
      // 例如,我们可以将标准输入处理为字符串:
      var stringChannel = stdioChannel
          .transform(StreamChannelTransformer.fromCodec(utf8))
          .transformStream(LineSplitter());
      stringChannel.sink.add('world!\n');
    
      // 可以通过扩展 StreamChannelMixin 来实现 StreamChannel,但使用 StreamChannelController 更加简单。
      // 控制器有两个 StreamChannel 成员:'local' 和 'foreign'。
      // 控制器的创建者应该使用 'local' 通道,而接收者通常不会直接访问底层控制器,而是使用 'foreign' 通道。
      var ctrl = StreamChannelController<String>();
      ctrl.local.stream.listen((event) {
        // 在这里执行有用的操作...
      });
    
      // 还可以将一个通道的事件传递给另一个通道。
      ctrl
        ..foreign.pipe(stringChannel)
        ..local.sink.add('Piped!\n');
      await ctrl.local.sink.close();
    
      // 通过调用 'StreamChannel.withGuarantees()',可以创建一个提供所有保证的 StreamChannel
      var dummyCtrl0 = StreamChannelController<String>();
      var guaranteedChannel = StreamChannel.withGuarantees(
          dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
    
      // 要关闭 StreamChannel,使用 'sink.close()'。
      await guaranteedChannel.sink.close();
    
      // MultiChannel 可以在单个底层传输层上复用多个虚拟通道。
      // 例如,通过某种机制,应用程序可以将来自不同客户端的事件分开处理,即使通过标准 I/O 监听也可以支持多个客户端。
      //
      // MultiChannel 将事件拆分成编号通道,这些通道是 VirtualChannel 的实例。
      var dummyCtrl1 = StreamChannelController<String>();
      var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
      var channel1 = multiChannel.virtualChannel();
      await multiChannel.sink.close();
    
      // 客户端/对等方还应该创建自己的 MultiChannel,连接到底层传输,使用相应的 ID 处理其各自通道中的事件。
      // 如何在不同端点之间传递通道 ID 取决于。
      var dummyCtrl2 = StreamChannelController<String>();
      var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
      var channel2 = multiChannel2.virtualChannel(channel1.id);
      await channel2.sink.close();
      await multiChannel2.sink.close();
    
      // 多个 Dart 应用程序的实例可以轻松通过 `SendPort`/`ReceivePort` 对来进行通信,这是通过 `IsolateChannel` 类实现的。
      // 通常,一个端点将创建一个 `ReceivePort`,然后调用 `IsolateChannel.connectReceive` 构造函数。
      // 另一个端点将获得相应的 `SendPort`,然后调用 `IsolateChannel.connectSend`。
      var recv = ReceivePort();
      var recvChannel = IsolateChannel.connectReceive(recv);
      var sendChannel = IsolateChannel.connectSend(recv.sendPort);
    
      // 必须手动关闭 `IsolateChannel` 的 sink。
      await recvChannel.sink.close();
      await sendChannel.sink.close();
    
      // 可以使用 `Disconnector` 转换器使通道在远程传输端断开连接。
      var disconnector = Disconnector<String>();
      var disconnectable = stringChannel.transform(disconnector);
      disconnectable.sink.add('Still connected!');
      await disconnector.disconnect();
    
      // 此外:
      //   * 'DelegatingStreamController' 类可扩展为构建包装其他 'StreamChannel' 对象的基础。
      //   * 'jsonDocument' 转换器将事件转换为 JSON 格式,并使用 'dart:convert' 中的 'json' 编解码器。
      //   * 'package:json_rpc_2' 直接构建在 'package:stream_channel' 之上,因此可以使用任何兼容的传输来创建交互式客户端/服务器或点对点应用程序(例如语言服务器、微服务等)。
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    代码输出结果为:

    Hello!
    world!
    Piped!
    
    • 1
    • 2
    • 3

    part.1 创建标准输入输出通道

    var stdioChannel = StreamChannel(stdin, stdout);
    stdioChannel.sink.add('Hello!\n'.codeUnits);
    
    • 1
    • 2

    在这里,创建了一个StreamChannel对象stdioChannel,将标准输入stdin和标准输出stdout包装在通道中。然后,通过sink将字符串’Hello!\n’转换为UTF-8编码的字节流发送到标准输出。

    part.2 转换输入为字符串

    var stringChannel = stdioChannel
        .transform(StreamChannelTransformer.fromCodec(utf8))
        .transformStream(LineSplitter());
    stringChannel.sink.add('world!\n');
    
    • 1
    • 2
    • 3
    • 4

    这部分代码将stdioChannel通过transform方法进行了两次转换。首先,使用StreamChannelTransformer.fromCodec(utf8)将输入数据流编码为UTF-8字符串,然后使用transformStream(LineSplitter())将输入数据流按行拆分。最后,通过sink将字符串’world!\n’发送到通道中。

    part.3 使用 StreamChannelController

    var ctrl = StreamChannelController<String>();
    ctrl.local.stream.listen((event) {
      // 在这里执行有用的操作...
    });
    
    • 1
    • 2
    • 3
    • 4

    这段代码创建了一个StreamChannelController对象ctrl,它用于管理通道。StreamChannelController包含两个StreamChannel成员:local和foreign。通常,创建者会使用local通道,而接收者通常不会直接访问底层控制器,而是使用foreign通道。在此示例中,我们订阅了ctrl.local.stream以处理来自通道的事件。

    part.4 传递事件到另一个通道

    ctrl
      ..foreign.pipe(stringChannel)
      ..local.sink.add('Piped!\n');
    await ctrl.local.sink.close();
    
    • 1
    • 2
    • 3
    • 4

    这部分代码将一个通道的事件传递到另一个通道。首先,通过pipe方法将ctrl.foreign通道中的事件传递到stringChannel中。然后,通过local.sink将字符串’Piped!\n’发送到ctrl.local通道中,并最后关闭ctrl.local.sink。

    part.5 创建具有保证的 StreamChannel

    var dummyCtrl0 = StreamChannelController<String>();
    var guaranteedChannel = StreamChannel.withGuarantees(
        dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
    await guaranteedChannel.sink.close();
    
    • 1
    • 2
    • 3
    • 4

    这部分代码演示了如何使用StreamChannel.withGuarantees方法创建一个具有所有保证的StreamChannel。它接受一个输入流和一个输出流,然后通过sink.close()方法关闭了通道。

    part.6 使用 MultiChannel

    var dummyCtrl1 = StreamChannelController<String>();
    var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
    var channel1 = multiChannel.virtualChannel();
    await multiChannel.sink.close();
    
    • 1
    • 2
    • 3
    • 4

    这段代码演示了如何使用MultiChannel,它可以在单个底层传输层上复用多个虚拟通道。首先,创建了一个MultiChannel对象multiChannel,然后通过virtualChannel()方法创建了一个虚拟通道channel1。最后,通过sink.close()关闭了multiChannel。

    part.7 创建另一个 MultiChannel

    var dummyCtrl2 = StreamChannelController<String>();
    var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
    var channel2 = multiChannel2.virtualChannel(channel1.id);
    await channel2.sink.close();
    await multiChannel2.sink.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这段代码创建了另一个MultiChannel对象multiChannel2,并使用virtualChannel(channel1.id)创建了一个虚拟通道channel2,其ID与channel1相同。然后,通过sink.close()分别关闭了channel2和multiChannel2。

    part.8 使用 IsolateChannel 进行 Dart 应用程序通信

    var recv = ReceivePort();
    var recvChannel = IsolateChannel.connectReceive(recv);
    var sendChannel = IsolateChannel.connectSend(recv.sendPort);
    await recvChannel.sink.close();
    await sendChannel.sink.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这部分代码演示了如何使用IsolateChannel进行Dart应用程序的通信。首先,创建了一个ReceivePort对象recv,然后使用IsolateChannel.connectReceive和IsolateChannel.connectSend构造函数创建了两个IsolateChannel通道,用于发送和接收消息。最后,通过sink.close()方法手动关闭了这两个通道。

    part.9 使用 Disconnector 断开连接

    var disconnector = Disconnector<String>();
    var disconnectable = stringChannel.transform(disconnector);
    disconnectable.sink.add('Still connected!');
    await disconnector.disconnect();
    
    • 1
    • 2
    • 3
    • 4

    这段代码创建了一个Disconnector对象disconnector,然后使用transform方法将stringChannel与disconnector进行转换,使通道可以在远程传输端断开连接。然后,通过sink.add向通道发送消息,最后使用disconnector.disconnect()方法断开连接。

    F. 附录

    F.1 StreamChannel 接口

    表示双向通信通道

    用户应该考虑流(stream)发出"done"事件作为通道关闭的规范指示。如果他们希望关闭通道,他们应该关闭sink——取消流订阅是不足够的。协议错误可能通过流或sink.done发出,具体取决于它们的根本原因。请注意,如果在调用sink.close之前通道关闭,sink可能会静默丢弃事件。

    强烈建议实现混入或扩展StreamChannelMixin以获取各种实例方法的默认实现。如果同时为StreamChannelMixin添加实现,则不会将新方法视为破坏性更改。

    实现必须提供以下保证
    • 流是单订阅的,并且必须遵循所有单订阅流的保证。
    • 关闭sink 会导致流在发出更多事件之前关闭。
    • 流关闭后,sink 会自动关闭。如果发生这种情况,sink 方法应该静默丢弃它们的参数,直到调用sink.close为止。
    • 如果流在有侦听器之前关闭,如果可能的话,sink 应该静默丢弃事件。
    • 取消流的订阅对 sink 没有影响。即使在取消订阅后,通道仍必须能够响应另一端关闭通道的情况。
    • sink要么将错误转发给另一端,要么在添加错误后立即关闭,并将该错误转发给 sink.done 的未来。

    这些保证允许用户与所有实现进行统一交互,并确保关闭流的任一端都会产生一致的行为。

    源码

    /// 一个表示双向通信通道的抽象类。
    ///
    /// 用户应该将 [stream] 发出 "done" 事件视为通道已关闭的标志。如果他们希望关闭通道,他们应该关闭 [sink]——取消流订阅不足够。协议错误可能通过流或 [sink].done 发出,具体取决于其根本原因。请注意,在调用 [sink].close 之前,如果通道在之前关闭,sink 可能会悄悄丢弃事件。
    ///
    /// 强烈建议实现混合或扩展 [StreamChannelMixin],以获取各种实例方法的默认实现。如果还为此接口添加了新的方法,则不会被视为破坏性更改,前提是也将实现添加到 [StreamChannelMixin]。
    ///
    /// 实现必须提供以下保证:
    ///
    /// * 该流是单订阅的,并且必须遵循单订阅流的所有保证。
    ///
    /// * 关闭 sink 会导致流在发出更多事件之前关闭。
    ///
    /// * 在流关闭后,sink 会自动关闭。如果发生这种情况,sink 方法应该悄悄地丢弃它们的参数,直到调用 [sink].close。
    ///
    /// * 如果流在有侦听器之前关闭,sink 应尽可能悄悄地丢弃事件。
    ///
    /// * 取消流的订阅对 sink 没有影响。通道必须仍然能够响应另一端关闭通道,即使已取消订阅。
    ///
    /// * sink 要么将错误转发到另一端,要么在添加错误后立即关闭并将该错误转发到 [sink].done 未来。
    ///
    /// 这些保证使用户能够统一地与所有实现交互,并确保关闭流的任一端都会产生一致的行为。
    abstract class StreamChannel<T> {
      /// 从另一端发出值的单订阅流。
      Stream<T> get stream;
    
      /// 用于将值发送到另一端的 sink。
      StreamSink<T> get sink;
    
      /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
      ///
      /// 请注意,此流/接收器对必须提供 [StreamChannel] 文档中列出的保证。如果它们没有本地提供这些保证,则应使用 [StreamChannel.withGuarantees]。
      factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
          _StreamChannel<T>(stream, sink);
    
      /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
      ///
      /// 与 [StreamChannel.new] 不同,这强制执行 [StreamChannel] 文档中列出的保证。这使其比直接包装流和接收器要低效一些,因此应该在本机提供保证时使用 [StreamChannel.new]。
      ///
      /// 如果 [allowSinkErrors] 为 `false`,则不允许将错误传递给 [sink]。如果有任何错误,连接将关闭,并且错误将转发到 [sink].done。
      factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
              {bool allowSinkErrors = true}) =>
          GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
    
      /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
      ///
      /// 这特别强调了第二个保证:关闭 sink 会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回流的事件之间添加了异步间隙时,例如通过使用 [StreamTransformer] 进行变换,这个保证将无效。这是保留该特定保证的一种较轻量级方式,比 [StreamChannel.withGuarantees] 要轻。
      factory StreamChannel.withCloseGuarantee(
              Stream<T> stream, StreamSink<T> sink) =>
          CloseGuaranteeChannel(stream, sink);
    
      /// 连接此通道到 [other],以便由任何一端发出的值都直接发送到另一端。
      void pipe(StreamChannel<T> other);
    
      /// 使用 [transformer] 进行转换。
      ///
      /// 这与调用 `transformer.bind(channel)` 相同。
      StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
    
      /// 仅使用 [transformer] 转换此通道的 [stream] 组件。
      StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
    
      /// 仅使用 [transformer] 转换此通道的 [sink] 组件。
      StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
    
      /// 返回一个具有 [stream] 替换为 [change] 返回值的副本。
      StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
    
      /// 返回一个具有 [sink] 替换为 [change] 返回值的副本。
      StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
    
      /// 返回一个泛型类型强制转换为 [S] 的副本。
      ///
      /// 如果 [stream] 发出的任何事件不是类型 [S],它们将转换为 [TypeError] 事件(在某些 SDK 版本中为 `CastError`)。类似地,如果向 [sink] 添加任何不是类型 [S] 的事件,将引发 [TypeError]。
      StreamChannel<S> cast<S>();
    }
    
    /// 一个实现 [StreamChannel] 的类,只需将流和接收器作为参数。
    ///
    /// 这与 [StreamChannel] 不同,因此它可以使用 [StreamChannelMixin]。
    class _StreamChannel<T> extends StreamChannelMixin<T> {
      
      final Stream<T> stream;
      
      final StreamSink<T> sink;
    
      _StreamChannel(this.stream, this.sink);
    }
    
    /// [StreamChannelMixin] 是一个混入(mixin),它以 [stream] 和 [sink] 为基础实现了 [StreamChannel] 的实例方法。
    abstract class StreamChannelMixin<T> implements StreamChannel<T> {
      /// 将此通道的输出与另一个通道 [other] 相关联,以便由任一通道发出的值都直接发送到另一通道。
      
      void pipe(StreamChannel<T> other) {
        stream.pipe(other.sink);
        other.stream.pipe(sink);
      }
    
      /// 使用 [transformer] 转换此通道。
      ///
      /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],将 [transformer] 绑定到当前通道。
      
      StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) =>
          transformer.bind(this);
    
      /// 仅使用 [transformer] 转换此通道的 [stream] 组件。
      ///
      /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [stream] 部分。
      
      StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
          changeStream(transformer.bind);
    
      /// 仅使用 [transformer] 转换此通道的 [sink] 组件。
      ///
      /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [sink] 部分。
      
      StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
          changeSink(transformer.bind);
    
      /// 使用 [change] 函数的返回值替换此通道的 [stream]。
      ///
      /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [stream] 替换为 [change] 的返回值。
      
      StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change) =>
          StreamChannel.withCloseGuarantee(change(stream), sink);
    
      /// 使用 [change] 函数的返回值替换此通道的 [sink]。
      ///
      /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [sink] 替换为 [change] 的返回值。
      
      StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change) =>
          StreamChannel.withCloseGuarantee(stream, change(sink));
    
      /// 将此通道的泛型类型强制转换为 [S]。
      ///
      /// 此方法将当前通道的 [stream] 组件的类型强制转换为 [S],并创建一个新的通道,其中 [sink] 仍然与原始通道共享。
      
      StreamChannel<S> cast<S>() => StreamChannel(
          stream.cast(), StreamController(sync: true)..stream.cast<T>().pipe(sink));
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140

    F.2 StreamChannelController

    /// 用于公开新 [StreamChannel] 的控制器。
    ///
    /// 这个控制器公开了两个连接的 [StreamChannel],[local] 和 [foreign]。用户的代码应该使用 [local] 来发出和接收事件。然后可以返回 [foreign] 供其他人使用。例如,这是 [new IsolateChannel] 的简化版本的实现:
    ///
    /// ```dart
    /// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
    ///   var controller = new StreamChannelController(allowForeignErrors: false);
    ///
    ///   // 将接收端口的所有事件传输到本地 sink 中...
    ///   receivePort.pipe(controller.local.sink);
    ///
    ///   // ...将本地流中的所有事件传输到发送端口。
    ///   controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
    ///
    ///   // 然后返回外部用户使用的外部控制器。
    ///   return controller.foreign;
    /// }
    /// ```
    class StreamChannelController<T> {
      /// 本地通道。
      ///
      /// 创建此 [StreamChannelController] 的用户应该直接使用此通道来发送和接收事件。
      StreamChannel<T> get local => _local;
      late final StreamChannel<T> _local;
    
      /// 外部通道。
      ///
      /// 这个通道应该返回给外部用户,以便他们与 [local] 进行通信。
      StreamChannel<T> get foreign => _foreign;
      late final StreamChannel<T> _foreign;
    
      /// 创建一个 [StreamChannelController]。
      ///
      /// 如果 [sync] 为 true,则添加到任一通道的 sink 的事件会同步分派到另一通道的 stream。只有在这些事件的来源已经是异步的情况下才应这样做。
      ///
      /// 如果 [allowForeignErrors] 为 `false`,则不允许将错误传递给外部通道的 sink。如果传递了任何错误,连接将关闭,并且错误将转发到外部通道的 [StreamSink.done] 未来。这确保了本地流永远不会发出错误。
      StreamChannelController({bool allowForeignErrors = true, bool sync = false}) {
        var localToForeignController = StreamController<T>(sync: sync);
        var foreignToLocalController = StreamController<T>(sync: sync);
        _local = StreamChannel<T>.withGuarantees(
            foreignToLocalController.stream, localToForeignController.sink);
        _foreign = StreamChannel<T>.withGuarantees(
            localToForeignController.stream, foreignToLocalController.sink,
            allowSinkErrors: allowForeignErrors);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    F.3 StreamChannelCompleter

    /// [channel],其中源和目标稍后提供。
    ///
    /// [channel] 是一个正常的通道,可以立即监听它,并且可以立即添加事件,但在调用 [setChannel] 之前,它不会发出任何事件,并且添加到它的所有事件都将被缓冲。
    class StreamChannelCompleter<T> {
      /// 此通道流的完成器。
      final _streamCompleter = StreamCompleter<T>();
    
      /// 此通道汇的完成器。
      final _sinkCompleter = StreamSinkCompleter<T>();
    
      /// 此完成器的通道。
      StreamChannel<T> get channel => _channel;
      late final StreamChannel<T> _channel;
    
      /// 是否已调用 [setChannel]。
      bool _set = false;
    
      /// 将 `Future` 转换为 `StreamChannel`。
      ///
      /// 这使用通道完成器创建一个通道,并在未来完成时将源通道设置为未来的结果。
      ///
      /// 如果未来以错误完成,则返回的通道的流将只包含该错误。汇将默默地丢弃所有事件。
      static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
        var completer = StreamChannelCompleter();
        channelFuture.then(completer.setChannel, onError: completer.setError);
        return completer.channel;
      }
    
      StreamChannelCompleter() {
        _channel = StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink);
      }
    
      /// 设置通道为 [channel] 的源和目标。
      ///
      /// 最多可以设置一次通道。
      ///
      /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。
      void setChannel(StreamChannel<T> channel) {
        if (_set) throw StateError('通道已经设置过了。');
        _set = true;
    
        _streamCompleter.setSourceStream(channel.stream);
        _sinkCompleter.setDestinationSink(channel.sink);
      }
    
      /// 指示连接通道时发生错误。
      ///
      /// 这使流发出 [error] 并关闭。它使汇丢弃其所有事件。
      ///
      /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。
      void setError(Object error, [StackTrace? stackTrace]) {
        if (_set) throw StateError('通道已经设置过了。');
        _set = true;
    
        _streamCompleter.setError(error, stackTrace);
        _sinkCompleter.setDestinationSink(NullStreamSink());
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    F.4 StreamChannelTransformer

    /// [StreamChannelTransformer] 转换了传递给 [StreamChannel] 的事件以及由其发出的事件。
    ///
    /// 这与 [StreamTransformer] 和 [StreamSinkTransformer] 的原理相同。
    /// 每个转换器定义了一个 [bind] 方法,该方法接受原始的 [StreamChannel] 并返回转换后的版本。
    ///
    /// 转换器必须能够多次调用 [bind]。如果一个子类明确实现了 [bind],它应确保返回的流遵循第二个流通道保证:关闭汇会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回的流之间添加异步间隙时,例如通过 [StreamTransformer] 进行转换时,此保证将失效。可以使用 [StreamChannel.withCloseGuarantee] 轻松保留此保证。
    class StreamChannelTransformer<S, T> {
      /// 在通道的流上使用的转换器。
      final StreamTransformer<T, S> _streamTransformer;
    
      /// 在通道的汇上使用的转换器。
      final StreamSinkTransformer<S, T> _sinkTransformer;
    
      /// 从现有的流和汇转换器创建一个 [StreamChannelTransformer]。
      const StreamChannelTransformer(
          this._streamTransformer, this._sinkTransformer);
    
      /// 从编解码器的编码器和解码器创建一个 [StreamChannelTransformer]。
      ///
      /// 内部通道汇的所有输入都使用 [Codec.encoder] 进行编码,而其流的所有输出都使用 [Codec.decoder] 进行解码。
      StreamChannelTransformer.fromCodec(Codec<S, T> codec)
          : this(codec.decoder,
                StreamSinkTransformer.fromStreamTransformer(codec.encoder));
    
      /// 转换发送到和由 [channel] 发出的事件。
      ///
      /// 创建一个新的通道。当事件传递给返回的通道的汇时,转换器将对其进行转换并将转换后的版本传递给 `channel.sink`。当事件从 `channel.stream` 发出时,转换器将对其进行转换并将转换后的版本传递给返回的通道的流。
      StreamChannel<S> bind(StreamChannel<T> channel) =>
          StreamChannel<S>.withCloseGuarantee(
              channel.stream.transform(_streamTransformer),
              _sinkTransformer.bind(channel.sink));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    Mysql笔记2
    算法通关村第九关-白银挑战二分查找与高频搜索树
    csmall-passport(Day15)
    车云汇元宇宙:开启虚拟与现实融合的汽车养护新篇章
    湘南学院2023级成考新生课程学习安排及成绩评定标准
    共同富裕-三大维度-各省份、城市、农村基尼系数-附带多种计算方法
    ubuntu 软件包管理之一
    基于SSH开发学生信息管理系统(简单的增删改查)
    程序员的哲学
    tp5.1发送阿里云短信验证码
  • 原文地址:https://blog.csdn.net/qq_28550263/article/details/133426961