• Flutter 从源码扒一扒Stream机制


    Stream的基本使用

    //1、创建一个流控制对象,只要用来控制流的暂停、取消和订阅
    StreamController _controller = StreamController();
    
    //2、实现对一个流的订阅和监听事件
    _controller.stream.listen((event) {
    print("event==$event");
    });
    
    //3、添加一个事件
    _controller.add("123");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    StreamController类

    职责是一个抽象类,用于创建一个可以发送数据和接收数据的可监听对象。
    _StreamController 是StreamController的真正实现类
    工厂构造方法

    factory StreamController(
          {void onListen()?,
          void onPause()?,
          void onResume()?,
          FutureOr onCancel()?,
          bool sync = false}) {
        return sync
            ? _SyncStreamController(onListen, onPause, onResume, onCancel)
            : _AsyncStreamController(onListen, onPause, onResume, onCancel);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们使用StreamController的时候会这样
    StreamController controller = StreamController();
    但是StreamController是个抽象,不能被实例化,主要他使用了工厂构造,最后使用他的子类去实例化。
    还要一个字段 bool sync = false ,false表示的是异步的,true表示同步。不更改的话,默认是异步的

    _SyncStreamController和_AsyncStreamController就是StreamController的子类

    //异步控制器
    class _AsyncStreamController = _StreamController
        with _AsyncStreamControllerDispatch;
    
    //同步控制器
    class _SyncStreamController = _StreamController
        with _SyncStreamControllerDispatch;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到_AsyncStreamController和_SyncStreamController直接继承了_StreamController。

    而_StreamController是StreamController的真正的实现类

    //继承关系链
    abstract class _StreamController implements _StreamControllerBase 
     
    abstract class _StreamControllerBase
        implements
            StreamController,
            _StreamControllerLifecycle,
            _EventSink,
            _EventDispatch {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    _controller.stream就是Stream对象

    Stream 提供了一种接收事件序列的方法。每个事件要么是数据事件(也称为流的元素),要么是错误事件(这是某件事情失败的通知)。

    这个流机制也算是一个生产者和消费者模式。
    生产者就是_controller.add(“123”); 消费者就是listen监听了
    那我们来看看stream内部是如何实现这个机制的?

    listen监听

    我们得先注册一下监听事件,添加对此流的订阅,才能接收到生产者的通知。

      StreamSubscription listen(void onData(T event)?,
          {Function? onError, void onDone()?, bool? cancelOnError});
     
     
       StreamSubscription listen(void onData(T data)?,
          {Function? onError, void onDone()?, bool? cancelOnError}) {
        cancelOnError ??= false;
        StreamSubscription subscription =
            _createSubscription(onData, onError, onDone, cancelOnError);
        _onListen(subscription);
        return subscription;
      }
     
    
    这个listen方法是Stream类的方法,返回的是一个订阅对象StreamSubscription。并且是空的实现,需要子类去实现它。
    
    
        
     
    我们看看是哪个类实现了Stream类。它有很多的实现类,我选一个主要的类_StreamImpl。
    
    _StreamImpl 是 Stream的继承类,它也是个抽象类,我去找它的最终实现类_ControllerStream
    
    abstract class _StreamImpl extends Stream
    
    class _ControllerStream extends _StreamImpl
    
    我们可以发现在_StreamController 有个方法,用来获取流对象的,正好是Stream的子类_ControllerStream
    
    Stream get stream => _ControllerStream(this);
    
    我们来看看_ControllerStream类
     
     StreamSubscription _createSubscription(void onData(T data)?,
              Function? onError, void onDone()?, bool cancelOnError) =>
          _controller._subscribe(onData, onError, onDone, cancelOnError);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    _controller就是抽象类_StreamControllerLifecycle的实例对象,
    _subscribe是个抽象方法,需要子类去实现它。

    abstract class _StreamController implements _StreamControllerBase
    abstract class _StreamControllerBase
     implements
         StreamController,
         _StreamControllerLifecycle,
         EventSink,
         _EventDispatch
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    从上面的代码可以看出,最终是_StreamController实现了_subscribe方法
    我们把_subscribe方法代码拿出来

    StreamSubscription _subscribe(void onData(T data)?, Function? onError,
          void onDone()?, bool cancelOnError) {
        if (!_isInitialState) {
          throw StateError("Stream has already been listened to.");
        }
        //创建一个订阅对象
        _ControllerSubscription subscription = _ControllerSubscription(
            this, onData, onError, onDone, cancelOnError);
    
        _PendingEvents? pendingEvents = _pendingEvents;
        _state |= _STATE_SUBSCRIBED;
        if (_isAddingStream) {
          _StreamControllerAddStreamState addState = _varData as dynamic;
          addState.varData = subscription;
          addState.resume();
        } else {
          _varData = subscription;
        }
        subscription._setPendingEvents(pendingEvents);
        subscription._guardCallback(() {
          _runGuarded(onListen);
        });
    
        return subscription;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这个方法是关键,从方法名字我们就可以知道这是一个订阅方法,实现对一个事件的订阅。
    我们看一下_ControllerSubscription,它的父类是StreamSubscription。

    StreamSubscription 类

    StreamSubscription 是一个抽象接口类
    文档上描述 订阅向listen提供事件,并保存用于处理事件的回调。订阅还可用于取消订阅事件,或暂时暂停流中的事件。

    下面是它的一些抽象方法

        ///取消订阅事件
      Future cancel();
    
     ///处理订阅事件
      void onData(void handleData(T data)?);
    
     ///完成订阅事件
      void onDone(void handleDone()?);
    
      ///暂停订阅事件
      void pause([Future? resumeSignal]);
    
       ///恢复订阅事件
      void resume();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    class _BufferingStreamSubscription
        implements StreamSubscription, _EventSink, _EventDispatch
        
    class _ControllerSubscription extends _BufferingStreamSubscription 
    
    • 1
    • 2
    • 3
    • 4

    发起数据通知

    接着我们再看看数据添加方法,这个方法就是用来发起数据通知的

       if (hasListener) {
         _sendData(value);
       } else if (_isInitialState) {
         _ensurePendingEvents().add(_DelayedData(value));
       }
     }
    
    // 这个_sendData方法是抽象类_EventDispatch的方法
     abstract class _EventDispatch {
       void _sendData(T data);
       void _sendError(Object error, StackTrace stackTrace);
       void _sendDone();
     }
       
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    经过查找发现 _AsyncStreamControllerDispatch类实现了_sendData的方法

    继承关系链

    abstract class _StreamController implements _StreamControllerBase
    abstract class _StreamControllerBase implements StreamController, _StreamControllerLifecycle,_EventSink,_EventDispatch {}
    
    _AsyncStreamControllerDispatch类实现的方法如下
      void _sendData(T data) {
        _subscription._addPending(_DelayedData(data));
      }
    
    
    _subscription 就是_ControllerSubscription的实例对象
    如下代码所示
    
      _ControllerSubscription get _subscription {
        assert(hasListener);
        Object? varData = _varData;
        if (_isAddingStream) {
          _StreamControllerAddStreamState streamState = varData as dynamic;
          varData = streamState.varData;
        }
        return varData as dynamic;
      }
    
      varData 这个很熟悉就是上面的_subscribe方法里面实例化ControllerSubscription对象赋值给varData,这个varData
      实际就是ControllerSubscription对象。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    继承关系链

    class _ControllerSubscription extends _BufferingStreamSubscription
    class _BufferingStreamSubscription implements StreamSubscription, _EventSink, _EventDispatch
    
    
    • 1
    • 2
    • 3

    接着我们再来看看 _subscription._addPending(_DelayedData(data));

    代码如下,这个方法是_BufferingStreamSubscription类实现的

       void _addPending(_DelayedEvent event) {
        var pending = _pending ??= _PendingEvents();
        pending.add(event);
        if (!_hasPending) {
          _state |= _STATE_HAS_PENDING;
          if (!_isPaused) {
            pending.schedule(this);
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这个方法主要是添加一个处理事件,接着我们看看schedule方法

       void schedule(_EventDispatch dispatch) {
         if (isScheduled) return;
         assert(!isEmpty);
         if (_eventScheduled) {
           assert(_state == stateCanceled);
           _state = stateScheduled;
           return;
         }
         scheduleMicrotask(() {
           int oldState = _state;
           _state = stateUnscheduled;
           if (oldState == stateCanceled) return;
           handleNext(dispatch);
         });
         _state = stateScheduled;
       }
     放到微任务中执行handleNext
    
       void handleNext(_EventDispatch dispatch) {
         assert(!isScheduled);
         assert(!isEmpty);
         _DelayedEvent event = firstPendingEvent!;
         _DelayedEvent? nextEvent = event.next;
         firstPendingEvent = nextEvent;
         if (nextEvent == null) {
           lastPendingEvent = null;
         }
         event.perform(dispatch);
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    _DelayedEvent是个抽象类,perform方法是需要子类去实现
    它的子类就是_DelayedData

    class _DelayedData extends _DelayedEvent {
     final T value;
     _DelayedData(this.value);
     void perform(_EventDispatch dispatch) {
       dispatch._sendData(value);
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我们看_sendData这个方法,这个方法也是在_BufferingStreamSubscription类中实现的。
    整个代码看下来我们只需要关心 _zone.runUnaryGuarded(_onData, data); 这个是实现数据发送的关键。
    也是订阅者和被订阅者之间的中间角色,起着承上启下的作用。

     void _sendData(T data) {
        assert(!_isCanceled);
        assert(!_isPaused);
        assert(!_inCallback);
        bool wasInputPaused = _isInputPaused;
        _state |= _STATE_IN_CALLBACK;
        _zone.runUnaryGuarded(_onData, data);
        _state &= ~_STATE_IN_CALLBACK;
        _checkState(wasInputPaused);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Zone 被称做是沙箱,在 Dart 中,Zone 是用于隔离代码执行环境的概念。Zone 可以看作是一种执行上下文,它可以用于控制代码执行过程中的一些行为,比如异常处理、日志记录、资源管理等。Zone 中的 run 方法允许你在指定的 Zone 中运行一段代码块。
    这里不继续深究下去,有兴趣的可以看看。我们只需要知道调用runUnaryGuarded这个方法。它就会回调_onData函数,并将参数data传给_onData方法。
    _onData这个函数就是我们listen监听里的匿名函数,最终会回调那里去。

    最后结语

    虽然看不懂源码是一件枯燥无聊的事,但是多看几遍或许会变成一件有趣的事。如果要想深入学习flutter,阅读源码是必须要跨过的坎。
    当然我写的也不是很好,只是纯粹记录学习从步骤一步步看源码的过程。有不满的,可以提提意见,但不要乱喷。

  • 相关阅读:
    0108函数的连续性与间断点-函数与极限-高等数学
    MQTT服务采用nginx 代理TLS配置
    二分法
    七分钟学会 HTML 网页制作
    Android中activity详解
    C++日期类实现(联系类和对象)
    用c动态数组(实现权重矩阵可视化)实现手撸神经网络230902
    springboot集成mybatis步骤
    box-sizing: border-box;box-sizing:content-box;讲解
    海关外贸企业大数据风控平台产品应用
  • 原文地址:https://blog.csdn.net/hjjdehao/article/details/138065810