• Guava-EventBus 源码解析



    EventBus 采用发布订阅者模式的实现方式,它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化,观察者模式都比较熟悉,这里会简单介绍一下,重点介绍的是如何泛化的进行方法的注册以及调用,还有在单个线程和多线程不同的实现方式。

    #发布订阅者模式
    观察者模式又名发布订阅者模式,类结构图如下:

    观察者类结构图

    package com.dfan.设计模式.观察者模式;
    import java.util.Arrays;
    import java.util.List;
    
    public class ObeserverMode {
    
        static class TestResult {
            private List<IObserver> iObservers;
    
    
            public TestResult() {
            }
    
    
            public void register(IObserver iObserver){
                this.iObservers.add(iObserver);
            }
            public void declaration() {
                System.out.println("this is result");
            }
            public void notifyRunners(List<IObserver> iObservers) {
                for(IObserver iObserver : iObservers) {
                    iObserver.run(this);
                }
            }
            public  void notifyRunner(IObserver iObserver){
                iObserver.run(this);
            }
        }
    
        static class UITestResult extends TestResult{
    
    
            public UITestResult() {
                super();
            }
    
            public void declaration() {
                System.out.println("i am ui test result");
            }
    
            public  void notifyRunner(IObserver iObserver){
                iObserver.run(this);
            }
        }
    
        interface IObserver{
            void run(TestResult testResult) ;
        }
    
        static class TestObserver implements IObserver {
            private TestResult testResult;
    
            public TestResult createTestResult(){
                return new TestResult();
            }
    
            public void run(TestResult testResult) {
                this.testResult = testResult;
                System.out.println("this is test obeserver");
                testResult.declaration();
            }
    
        }
    
        static class TestObserver1 implements IObserver {
            private TestResult testResult;
    
            public TestResult createTestResult(){
                return new TestResult();
            }
    
            public void run(TestResult testResult) {
                testResult = createTestResult();
    
                System.out.println("this is test obeserver 1");
                testResult.declaration();
            }
    
        }
    
        public static void main(String[] args) {
            IObserver testRunner = new TestObserver();
            UITestResult uiTestResult = new UITestResult();
            uiTestResult.notifyRunner(testRunner);
    
            System.out.println("华丽分割线");
            List<IObserver> observers = Arrays.asList(new TestObserver1(), new TestObserver());
            uiTestResult.notifyRunners(observers);
        }
    }
    

    #Guava EventBus
    Guva中EventBus的机制就是观察者模式,因此符合观察者模式的一般结构:

    监听者:监听来自被监听者的变更事件,完成动作变更
    被监听者:发送变更事件给监听者,使监听者监听到变更事件后,完成动作变更

    EventBus的用法简单总结为一句话就是:

    订阅者向EventBus进行事件注册(register),表示对这个事件关心;
    EventBus会向所有订阅发布者事件的订阅者进行事件的发送(post)

    EventBus 区分 同步模式和异步模式,下面将根据这两个点进行展开
    ##同步模式
    ###向EventBus进行注册

    /**
       * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
       */
      private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
        Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
        Class<?> clazz = listener.getClass();
        for (Method method : getAnnotatedMethods(clazz)) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          Class<?> eventType = parameterTypes[0];
          methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
        }
        return methodsInListener;
      }
      
    /**
       * Registers all subscriber methods on the given listener object.
       */
      void register(Object listener) {
        Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    
        for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
          Class<?> eventType = entry.getKey();
          Collection<Subscriber> eventMethodsInListener = entry.getValue();
    
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
    
          if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
            eventSubscribers = MoreObjects.firstNonNull(
                subscribers.putIfAbsent(eventType, newSet), newSet);
          }
    
          eventSubscribers.addAll(eventMethodsInListener);
        }
      }
    
    • 其中 findAllSubscribers方法 目的是获取所有添加注解@Subscriber的方法,并将根据当前EventBusListener、以及加有@Subscriber注解的方法生成的Subscribe作为 Multimap, Subscriber>的value返回(其中key为方法[注释]的入参)

    • registerSubscriber注册到集合private final ConcurrentMap, CopyOnWriteArraySet> subscribers = Maps.newConcurrentMap();中,其中该Map的key为EventType(即方法[注释]入参)。

    EventBus发送事件给所有订阅者

    /**
       * Gets an iterator representing an immutable snapshot of all subscribers to the given event at
       * the time this method is called.
       */
      Iterator<Subscriber> getSubscribers(Object event) {
        ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
    
        List<Iterator<Subscriber>> subscriberIterators =
            Lists.newArrayListWithCapacity(eventTypes.size());
    
        for (Class<?> eventType : eventTypes) {
          CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
          if (eventSubscribers != null) {
            // eager no-copy snapshot
            subscriberIterators.add(eventSubscribers.iterator());
          }
        }
    
        return Iterators.concat(subscriberIterators.iterator());
      }
      
    /**
       * Posts an event to all registered subscribers.  This method will return
       * successfully after the event has been posted to all subscribers, and
       * regardless of any exceptions thrown by subscribers.
       *
       * 

    If no subscribers have been subscribed for {@code event}'s class, and * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a * DeadEvent and reposted. * * @param event event to post. */ public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }

    • getSubscribers 根据刚才提到的参数类型会查找对应的Subscribe,而且不止查指定的类型,还会对这个类型的继承体系上的其他参数类型也会查,比如对于String类型,他会找Serializable,CharSequence,Comparable,Object四种类型,
      举个例子说明下这种情况,在这个例子中,会有两个task被执行,分别是task1和task3
    public class EventBusSyncEx {
        static class SimpleListener1 {
            /**
             *订阅方式,通过@Subscribe进行事件订阅,方法名随意
             **/
            @Subscribe
            public void task1(String s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("listener1 do task , String param:" + s);
            }
            @Subscribe
            public void task3(Object s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("listener1 do task , Object param:" + s);
            }
            @Subscribe
            public void task1(Integer s) {
                System.out.println("listener1 do task , int  param:" + s);
            }
        }
        public static class SimpleEventBusExample {
    
            public static void main(String[] args) {
                EventBus eventBus = new EventBus();
                eventBus.register(new SimpleListener1());
                System.out.println("Post Simple EventBus Example");
                eventBus.post("Simple EventBus Example");
            }
        }
    }
    
    • subscribers 就是刚才注册subscriber的集合(Map),通过getSubscribers获取到了Subscribe之后,下面就是要根据这个Event的type来执行对应的Event了,首先这里引入一个属性dispatcher (事件分发器 : 用于分发事件给订阅对象的事件处理器,该对象在EventBus构造方法内部初始化,默认的实现是,该分发器将事件存入队列).

      PerThreadQueuedDispatcher: 默认实现,该分发器将事件存入队列,并保证在同一个线程上发送的事件能够按照他们发布的顺序被分发给所有的订阅者。

      private static final class PerThreadQueuedDispatcher extends Dispatcher 
      // This dispatcher matches the original dispatch behavior of EventBus.
      /**
       * Per-thread queue of events to dispatch.
       */
      private final ThreadLocal<Queue<Event>> queue =
          new ThreadLocal<Queue<Event>>() {
            @Override
            protected Queue<Event> initialValue() {
              return Queues.newArrayDeque();
            }
          };
      
      /**
       * Per-thread dispatch state, used to avoid reentrant event dispatching.
       */
      private final ThreadLocal<Boolean> dispatching =
          new ThreadLocal<Boolean>() {
            @Override
            protected Boolean initialValue() {
              return false;
            }
          };
      
      @Override
      void dispatch(Object event, Iterator<Subscriber> subscribers) {
        checkNotNull(event);
        checkNotNull(subscribers);
        Queue<Event> queueForThread = queue.get();
        queueForThread.offer(new Event(event, subscribers));
      
        if (!dispatching.get()) {
          dispatching.set(true);
          try {
            Event nextEvent;
            while ((nextEvent = queueForThread.poll()) != null) {
              while (nextEvent.subscribers.hasNext()) {
                nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
              }
            }
          } finally {
            dispatching.remove();
            queue.remove();
          }
        }
      }
      
      private static final class Event {
        private final Object event;
        private final Iterator<Subscriber> subscribers;
      
        private Event(Object event, Iterator<Subscriber> subscribers) {
          this.event = event;
          this.subscribers = subscribers;
        }
      }
      }
      
      • 这段代码有三个关键点需要注意:
        1. PerThreadQueuedDispatcher 通过Queue来对Event进行存储
        2. Queue以及 dispatching都是ThreadLocal变量,也就意味着每个线程维护自己的一个变量,即线程安全的
        3. nextEvent.subscribers.next().dispatchEvent(nextEvent.event);调用了 Subscribe的dispatchEvent (类似于文中开篇所讲的Observer模式中的被监听者中的iObserver.run(this),只是Observer模式中,是在被监听者中执行的,而EventBus中是在dispatcher中执行的) ,如果继续跟进代码会发现,这个dispatchEvent实际工作就是直接通过反射执行了Method方法(method.invoke(target, checkNotNull(event));)

    至此,EventBus的同步执行方式已经分析完成


    异步模式

    异步模式, 它与同步模式的EventBus的主要区别有两点:

    1. 声明EventBus时,声明为 AsyncEventBus, 而AsyncEventBus的构造函数必须要传入一个 Executor
    2. 在Dispatcher上,AsyncEventBus 采用的事件分发器为 LegacyAsyncDispatcher
    /**
       * Dispatches {@code event} to this subscriber using the proper executor.
       */
      final void dispatchEvent(final Object event) {
        executor.execute(new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
      }
      
    private static final class LegacyAsyncDispatcher extends Dispatcher {
    
        /**
         * Global event queue.
         */
        private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
            Queues.newConcurrentLinkedQueue();
    
        @Override
        void dispatch(Object event, Iterator<Subscriber> subscribers) {
          checkNotNull(event);
          while (subscribers.hasNext()) {
            queue.add(new EventWithSubscriber(event, subscribers.next()));
          }
    
          EventWithSubscriber e;
          while ((e = queue.poll()) != null) {
            e.subscriber.dispatchEvent(e.event);
          }
        }
    
        private static final class EventWithSubscriber {
          private final Object event;
          private final Subscriber subscriber;
    
          private EventWithSubscriber(Object event, Subscriber subscriber) {
            this.event = event;
            this.subscriber = subscriber;
          }
        }
      }
    

    之所以异步模式传入Executor就是在通过 dispatchEvent 进行多线程的创建 new ThreadPoolExecutor().excute(new Runnable)
    而之所以使用 LegacyAsyncDispatcher 目的还有一个就是这个 Dispatcher中使用的queue是ConcurrentLinkedQueue, 之所以使用这个Queue,后面会有专门的一个讲解。

  • 相关阅读:
    vue15
    npm install失败的分析与解决方案,以及修复完成的代码地址
    Java进阶篇--并发容器之CopyOnWriteArrayList
    【文末送书】全栈开发流程——后端连接数据源(二)
    influxdb时序库之Python操作
    说实话,确实有点难
    Vue事件绑定(v-on用法)
    何超秘书长带队参访中国系统 共商元宇宙落地应用与数字体育强国
    微软黑科技如何加速游戏开发,读这篇就够了
    云存储空间的动态分配技术
  • 原文地址:https://blog.csdn.net/dyf4281/article/details/139710636