• 后端思维篇:如何抽一个观察者模板


    前言

    今天跟大家聊聊什么是观察者模式,如何应用到工作实践中,以及如何抽取一个观察者模板。

    1. 观察者模式定义

    观察者模式,也可以称之为发布订阅模式,它在GoF 的《设计模式》中,是这么定义的:

    Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically。

    翻译过来就是:观察者模式定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被完成业务的更新。

    观察者模式属于行为模式,一个对象(被观察者)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。它的主要成员就是观察者和被观察者。

    被观察者(Observerable):目标对象,状态发生变化时,将通知所有的观察者。

    观察者(observer):接受被观察者的状态变化通知,执行预先定义的业务。

    2. 观察者模式的应用场景

    哪些场景我们可以考虑使用观察者模式呢?

    我们日常生活中,其实就有观察者模式类似的例子。比如,我们订阅了报社一年的报纸。每天报社印刷好报纸,就送到我们手中。我们就是观察者,报社就是被观察者。

    而日常开发中,观察者模式的使用场景主要表现在:完成一件事情后,通知处理某个逻辑。如,登陆成功发个IM消息,支付成功发个邮件消息或者发个抽奖消息,用户评论成功给他发个积分等等。

    举个详细点的例子吧,登陆注册应该是最常见的业务场景了,我们就拿注册来说事,大家经常会遇到类似的场景,就是用户注册成功后,我们给用户发一条IM消息,又或者发个邮件等等,因此经常有如下的代码:

    void register(User user){
      insertRegisterUser(user);
      sendIMMessage();
      sendEmail();
    }

    这块代码会有什么问题呢?如果产品又加需求:现在注册成功的用户,再给用户发一条短信通知。于是你又得改register方法的代码了。。。这是不是违反了开闭原则啦。

    void register(User user){
      insertRegisterUser(user);
      sendIMMessage();
      sendMobileMessage();
      sendEmail();
    }

    并且,如果调发短信的接口失败了,是不是又影响到用户注册了?!这时候,是不是得加个异步方法,异步发通知消息才好??其实这种场景,我们可以使用异步非阻塞的观察者模式优化的。

    3. 如何实现一个简单的观察者模式

    我们先来看下,简单的观察者模式如何实现。可以这么定义

    • 一个主题接口Subject(声明添加、删除、通知观察者方法)。
    • 一个Observer观察者接口。
    • 一个创建主题的类ObserverableImpl​(即被观察者),实现了Subject接口。
    • 各个观察者的差异化实现。

    为了通俗易懂,可以这样理解观察者模式:就是被观察者(ObserverableImpl)做了一件事情,或者说发布了一个主题(Subject),然后这件事情通知到各个相关的不同的人(不同的观察者,Observer的差异化实现者)。

    一个主题接口。

    public interface Subject {
    
        /**
         * 添加观察者
         * @param observer
         */
        void addServer(Observer observer);
    
        /**
         * 移除观察者
         * @param observer
         */
        void removeServer(Observer observer);
    
        /**
         * 通知观察者
         * @param msg
         */
        void notifyAllObservers(String msg);
    
    }

    一个Observer接口。

    /**
     * 观察者
     *
     */
    public interface Observer {
        /**
         * 更新消息
         * @param msg
         */
        void update(String msg);
    }

    一个创建主题的类ObserverableImpl(即被观察者),同时有观察者列表的属性(其实就是说观察者要事先注册到被观察者)。

    public class ObserverableImpl implements Subject {
    
        /**
         * 存储被观察者
         */
        private List<Observer> observers = new ArrayList<Observer>();
    
        @Override
        public void addServer(Observer observer) {
            observers.add(observer);
        }
    
        @Override
        public void removeServer(Observer observer) {
            observers.remove(observer);
        }
    
        @Override
        public void notifyAllObservers(String msg) {
            for (Observer observer : observers) {
                observer.update(msg);
            }
        }
    }

    观察者的差异化实现,以及使用。

    public class ObserverOneImpl implements Observer {
        @Override
        public void update(String msg) {
            System.out.println("ObserverOne is notified,"+msg);
        }
    }
    
    public class ObserverTwoImpl implements Observer {
    
        @Override
        public void update(String msg) {
            System.out.println("ObserverTwo is notified,"+msg);
        }
    }
    
    public class ObserverDemoTest {
        public static void main(String[] args) {
            Subject subject = new ObserverableImpl();
            //添加观察者
            subject.addObserver(new ObserverOneImpl());
            subject.addObserver(new ObserverTwoImpl());
            //通知
            subject.notifyAllObservers("关注公众号:捡田螺的小男孩");
        }
    }
    //输出
    ObserverOne is notified,关注公众号:捡田螺的小男孩
    ObserverTwo is notified,关注公众号:捡田螺的小男孩

    就这样,我们实现了观察者模式啦,是不是很简单?不过上面的代码,只能算是观察者模式的模板代码,只能反映大体的设计思路。接下来,我们看下在工作中,是如何使用观察者模式的。

    4. 工作中,如何使用观察者模式的

    观察者模式的实现有两种方式,同步阻塞方式和异步非阻塞方式。第3小节就是一个同步阻塞方式的观察者模式。我们来看下,日常工作的例子:用户注册成功发消息的例子,如何实现。本小节分同步阻塞、异步阻塞、spring观察者模式三个方向探讨。

    • 同步阻塞方式的观察模式
    • 异步非阻塞方式的观察者模式
    • spring观察者模式应用

    4.1 同步阻塞方式的观察模式

    我们可以把用户注册,当做被观察者实现的逻辑,然后发消息就是观察者的实现逻辑。

    假设有两个观察者,分  别是发QQ消息和手机消息,于是有以下代码:

    public interface RegisterObserver {
        void sendMsg(String msg);
    }
    @Service
    public class ObserverMobileImpl implements RegisterObserver {
        @Override
        public void sendMsg(String msg) {
            System.out.println("发送手机短信消息"+msg);
        }
    }
    @Service
    public class ObserverQQImpl implements RegisterObserver {
        @Override
        public void sendMsg(String msg) {
            System.out.println("发送QQ消息"+msg);
        }
    }

    直接可以通过spring的ApplicationContextAware,初始化观察者列表,然后用户注册成功,通知观察者即可。代码如下:

    @RestController
    public class UserController implements ApplicationContextAware{
    
         @Autowired
         private UserService userService;
    
         //观察者列表
         private Collection<RegisterObserver> regObservers;
    
         @RequestMapping("register")
         public String register(UserParam userParam) {
              //注册成功过(类似于被观察者,做了某件事)
              userService.addUser(userParam);
              //然后就开始通知各个观察者。
              for(RegisterObserver temp:regObservers){
                   temp.sendMsg("注冊成功");
              }
              return "SUCCESS";
         }
    
         //利用spring的ApplicationContextAware,初始化所有观察者
         @Override
         public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
         }
    }

    ==可以发现,观察者模式,就是将不同的行为代码解耦,也就是说将观察者和被观察者代码解耦。但是这里大家会发现,这是同步阻塞式的观察者模式,是有缺点的,比如发QQ消息异常,就会影响用户注册,或者发消息因为某些原因耗时,就影响了用户注册,所以可以考虑异步非阻塞的观察者模式。

    4.2 异步非阻塞方式的观察者模式

    如何实现异步非阻塞,最简单就是另开个线程嘛,即新开个线程或者线程池异步跑观察者通知。代码如下:

    @RestController
    public class UserController implements ApplicationContextAware{
    
         @Autowired
         private UserService userService;
    
         private Collection<RegisterObserver> regObservers;
    
         private Executor executor = Executors.newFixedThreadPool(10);
    
         @RequestMapping("register")
         public String register(UserParam userParam) {
              userService.addUser(userParam);
              //异步通知每个观察者
              for (RegisterObserver temp : regObservers) {
                   executor.execute(() -> {
                        temp.sendMsg("注冊成功");
                   });
              }
    
              return "SUCCESS";
         }
    
         @Override
         public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
         }
    }

    线程池实现的异步非阻塞方式,还是可以的,但是异步执行逻辑都耦合在了register()函数中,不是很优雅,也增加了这部分业务代码的维护成本。一般日常工作中,我们会用spring那一套观察者模式等。

    4.3 spring观察者模式应用

    spring的观察者模式使用也是比较简单的,就是先定义个事件,继承于ApplicationEvent:

    public class MessageEvent extends ApplicationEvent {
    
        public MessageEvent(Object source) {
            super(source);
        }
    }

    然后定义一个事件监听器MessageListener,类似于观察者,它实现ApplicationListener接口。

    @Component
    public class MessageListener implements ApplicationListener<MessageEvent> {
        @Override
        public void onApplicationEvent(MessageEvent messageEvent) {
           System.out.println("用户注册成功,执行监听事件"+messageEvent.getSource());
        }
    }

    用户注册成功后,applicationEventPublisher(类似于被观察者)发布事件即可,代码如下:

    @RestController
    public class UserController implements ApplicationContextAware{
    
         @Autowired
         private UserService userService;
         
         @Autowired
         private ApplicationEventPublisher applicationEventPublisher;
    
        @RequestMapping("springListenRegister")
        public String springListenRegister(UserParam userParam) {
            System.out.println("开始注册");
            userService.addUser(userParam);
            //用户注册成功,发布事件
            applicationEventPublisher.publishEvent(new MessageEvent("666"));
            return "SUCCESS";
        }

    运行结果:

    开始注册
    用户注册成功,执行监听事件666

    这个也是同步阻塞的方式实现的,等下下个小节先介绍完spring观察者模式的原理,田螺哥再来教大家如何抽取一个通用的异步非阻塞观察者模式哈。

    5. Spring观察者模式原理

    Spring 中实现的观察者模式包含三部分:分别是Event事件(相当于消息)、Listener监听者(相当于观察者)、Publisher发送者(相当于被观察者)。用个图表示就是这样:

    这个ApplicationEvent是放到哪里的,监听者AppliactionListener是如何监听到的。接下来,我们来看下spring框架的观察者原理是怎样哈。

    我们先来看下ApplicationEventPublisher源代码(被观察者/发布者)

    @FunctionalInterface
    public interface ApplicationEventPublisher {
    
     default void publishEvent(ApplicationEvent event) {
      publishEvent((Object) event);
     }
    
     void publishEvent(Object event);
    
    }

    ApplicationEventPublisher它只是一个函数式接口,我们再看下它接口方法的实现。它的具体实现类是AbstractApplicationContext,这个类代码有点多,我把关键部分代码贴出来了:

    public abstract class AbstractApplicationContext extends ... {
      //监听者(观察者列表)
      private final Set<ApplicationListener<?>> applicationListeners;
      
      //构造器,初始化观察者列表
      public AbstractApplicationContext() {
        this.applicationListeners = new LinkedHashSet();
        //...
      }
      
      //发布事件
      public void publishEvent(ApplicationEvent event) {
        this.publishEvent(event, (ResolvableType)null);
      }
    
      public void publishEvent(Object event) {
        this.publishEvent(event, (ResolvableType)null);
      }
    
      //发布事件接口实现
      protected void publishEvent(Object event, ResolvableType eventType) {
        //...
        Object applicationEvent;
        if (event instanceof ApplicationEvent) {
          //如果event是ApplicationEvent对象,或者是它的子类
          applicationEvent = (ApplicationEvent)event;
        } else {
          // 如果不是ApplicationEvent对象或者它的子类,则将其包装成PayloadApplicationEvent事件,并获取对应的事件类型
          applicationEvent = new PayloadApplicationEvent(this, event);
          if (eventType == null) {
            eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
          }
        }
    
        if (this.earlyApplicationEvents != null) {
          this.earlyApplicationEvents.add(applicationEvent);
        } else {
          //真正的消息发送,是通过它。获取ApplicationEventMulticaster,调用multicastEvent方法广播事件
          this.getApplicationEventMulticaster().multicastEvent(
                (ApplicationEvent)applicationEvent, eventType);
        }
    
        //如果当前命名空间还有父亲节点,也需要给父亲推送该消息
        if (this.parent != null) {
          if (this.parent instanceof AbstractApplicationContext) {
            ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
          } else {
            this.parent.publishEvent(event);
          }
        }
      }
      
      //添加观察者(监听者)
      public void addApplicationListener(ApplicationListener<?> listener) {
        Assert.notNull(listener, "ApplicationListener must not be null");
        if (this.applicationEventMulticaster != null) {
        this.applicationEventMulticaster.addApplicationListener(listener);
        } else {
          this.applicationListeners.add(listener);
        }  
      }
      
      //观察者列表
      public Collection<ApplicationListener<?>> getApplicationListeners() {
        return this.applicationListeners;
      }
      
      // 注册监听器
      protected void registerListeners() {
      //把提前存储好的监听器添加到监听器容器中到ApplicationEventMulticaster
      for (ApplicationListener<?> listener : getApplicationListeners()) {
       getApplicationEventMulticaster().addApplicationListener(listener);
      }
         //获取类型是ApplicationListener的beanName集合,此处不会去实例化bean
      String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
      for (String listenerBeanName : listenerBeanNames) {
       getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
      }
    
    
      Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
      this.earlyApplicationEvents = null;
        //如果存在earlyEventsToProcess,提前处理这些事件
      if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
       for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
        getApplicationEventMulticaster().multicastEvent(earlyEvent);
       }
      }
     }
    }

    通过以上代码,我们可以发现,真正的消息发送,实际上是通过事件广播器ApplicationEventMulticaster 这个接口来完成的。multicastEvent是主要方法,这个方法的实现在类SimpleApplicationEventMulticaster中,我们一起来看下源码:

    public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
      ...
      
      //线程池
      @Nullable
     protected Executor getTaskExecutor() {
      return this.taskExecutor;
     }
      
      public void setTaskExecutor(@Nullable Executor taskExecutor) {
      this.taskExecutor = taskExecutor;
     }
    
      
     @Override
     public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
      ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        // 根据event类型获取适合的监听器
      Executor executor = getTaskExecutor();
      for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
       if (executor != null) {
            //如果executor不为空,异步调用执行监听器中的方法
        executor.execute(() -> invokeListener(listener, event));
       }
       else {
            //调用监听器的方法
        invokeListener(listener, event);
       }
      }
     }
      
     protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
      ErrorHandler errorHandler = getErrorHandler();
        //如果存在ErrorHandler,调用监听器方法(会用try...catch包一下)
      if (errorHandler != null) {
       try {
        doInvokeListener(listener, event);
       }
       catch (Throwable err) {
            //如果抛出异常则调用ErrorHandler来处理异常。
        errorHandler.handleError(err);
       }
      }
      else {
          否则直接调用监听器方法
       doInvokeListener(listener, event);
      }
     }
      ...
      }

    可以发现,默认情况下,spring实现的观察者模式,同步阻塞的。如果想异步执行事件,可以自定义SimpleApplicationEventMulticaster,然后构造一下executor线程池就好啦。代码如下:

    /**
     * 公众号:捡田螺的小男孩
     */
    @Component
    public class ListenerConfig {
    
        //把线程池赋值进去
        @Bean
        public SimpleApplicationEventMulticaster applicationEventMulticaster() {
            SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
            simpleApplicationEventMulticaster.setTaskExecutor(simpleAsyncTaskExecutor());
            return simpleApplicationEventMulticaster;
        }
    
        @Bean
        public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
            return new SimpleAsyncTaskExecutor();
        } 
    }

    demo跑一下,运行结果:

    注册开始
    当前线程名称http-nio-8080-exec-1
    注册结束
    用户注册成功2,执行监听事件666Sat Jun 18 11:44:07 GMT+08:00 2022
    当前线程名称:SimpleAsyncTaskExecutor-20
    当前线程名称:SimpleAsyncTaskExecutor-19
    用户注册成功,执行监听事件666Sat Jun 18 11:44:12 GMT+08:00 2022

    如果手动新建SimpleApplicationEventMulticaster,并设置taskExecutor的话,所有的监听响应事件,都是异步执行的哦。而有些有些场景我们希望同步执行的,这时候这种实现方式就不好了。

    其实spring提供了@Async注解,可以用来实现异步。具体怎么实现呢?其实很简单,只需要在配置类加上@EnableAsync,接着在需要异步执行的监听实现方法。加上@Async即可。代码实现如下:

    /**
     * 关注公众号:捡田螺的小男孩
     * 更多实战干货
     */
    @Component
    @EnableAsync //配置类加上```@EnableAsync```
    public class ListenerConfig2 {
    
        @Bean
        public SimpleApplicationEventMulticaster applicationEventMulticaster() {
            SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
            return simpleApplicationEventMulticaster;
        }
        
    }
    
    @Component
    public class MessageAsyncListener3 implements ApplicationListener<MessageEvent> {
    
        @Async //方法异步注解
        @Override
        public void onApplicationEvent(MessageEvent messageEvent) {
            System.out.println("用户注册成功3,执行监听事件" + messageEvent.getSource() + new Date());
            System.out.println("当前线程名称:"+Thread.currentThread().getName());
        }
    }

    日常开发中,异步执行也可以自己手动通过线程池来开启啦。回到我们本文的后端思维主题,如果每个开发,都自己定义观察者模式的实现,这种代码会很混乱,所以最好是实现一个可扩展,通用的观察者模板。

    6. 基于spring观察者模式,抽取一个模板

    接下来的最后小节,跟大家一起基于spring的观察者模式,一步一步实现并抽取个模板哈。

    我们要基于spring实现观察者模式的话,就包括这三步:

    定义Event事件(相当于消息),一般定义一个Event对象,继承ApplicationEvent。

    定义Listener监听者(相当于观察者),实现接口ApplicationListener。

    Publisher发送者(相当于被观察者),通过ApplicationEventPublisher发布。

    6.1 定义Event事件对象

    既然我们要抽取观察者模板,那肯定不是每个人自己写自己的Event,然后都去继承ApplicationEvent。

    我们可以自己定义一个项目相关的,通用的BaseEvent类,然后一些相关通用的信息属性可以放进去,比如eventId或者流水号bizSeq什么的,都可以,看你们项目需要哈。以下代码,我定义一个空空如也的BaseEvent

    /**
     * 关注公众号:捡田螺的小男孩
     * 更多实战干货
     * @desc : 事件基础对象
     */
    public class BaseEvent extends ApplicationEvent {
    
        public BaseEvent(Object source) {
            super(source);
        }
    
        public BaseEvent() {
            this("");
        }
    }

    如果你的观察者模式,是注册成功之后,发个消息的,你就可以声明一个消息类事件对象RegisterMessageEvent,继承通用的BaseEvent即可。然后属性可以自定义就好,比如messageId。

    public class RegisterMessageEvent  extends BaseEvent{
    
        private String msgId;
    
        public RegisterMessageEvent(String msgId) {
            super();
            this.msgId = msgId;
        }
    
        public String getMsgId() {
            return msgId;
        }
    
        public void setMsgId(String msgId) {
            this.msgId = msgId;
        }
        
    }

    同理,如果你想定义一个用户送礼物成功,然后发个广播,可以定义一个GiftSendEvent。

    public class GiftSendEvent extends BaseEvent {
    
        private String giftId;
    
        public GiftSendEvent(String giftId) {
            this.giftId = giftId;
        }
    
        public String getGiftId() {
            return giftId;
        }
    
        public void setGiftId(String giftId) {
            this.giftId = giftId;
        }
    }

    其他业务场景类似,只要想接入你的观察者模板,只需要自己定义事件对象,继承于你的BaseEvent即可,是不是成就感满满啦。

    6.2  定义Listener监听者(观察者啦)

    定义完Event事件,我们就可以开始定义监听者了。我们定义的监听者,只需要实现接口ApplicationListener接口即可。如果每个人也是各写各的,这些就很乱,毫无模板规范可言了。

    我们可以封装一下嘛,如何封装呢?很简单,可以先声明一个IEventListener,让它继承ApplicationListener接口,我们都知道,接口也是可以继承的哈。如下:

    public interface IEventListener extends ApplicationListener {
    
    }

    监听者的实现,关键在于实现ApplicationListener的onApplicationEvent的接口方法即可。又因为未来别的业务场景接入观察者模式,都是按你的模板来,所以各个Event事件对象,都是继承于你的BaseEvent的,所以我们可以把<T extends BaseEvent>的泛型加进去,如下:

    /**
     * 关注公众号:捡田螺的小男孩
     * @desc : 更多干货
     */
    public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T>{
        void onApplicationEvent(T event);
    }

    有些时候,可能会有这种场景,就是执行监听逻辑,只对部分数据(或者说部分特殊用户才执行)。既然我们是抽取监听模板,考虑到可扩展性,我们可以优化下IEventListener的代码。我们可以声明一个support的方法,默认是执行的,子类可以覆盖重写(让子类去控制是否执行这个监听逻辑),如下:

    public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
    
        void onApplicationEvent(T event);
    
        //接口里面,加了default,就可以写方法实现
        default boolean support(T event) {
            return true;
        }
    }

    然后呢,只有support方法返回true,才执行监听的逻辑,我们还可以定义一个handler方法,给子类去实现自己的业务逻辑,代码如下:

    /**
     * 关注公众号:捡田螺的小男孩
     * @desc : 更多干货
     */
    public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
    
        default void onApplicationEvent(T event){
            if (support(event)) {
                handler(event);
            }
        }
    
        default boolean support(T event) {
            return true;
        }
    
        //真正实现业务逻辑的接口,给子类去实现。
        void handler(T event);
    
    }

    对着以上的代码模板,小伙伴们是否还有一些优化的想法呢?

    如果方法产生了异常,我们是不是可以注意一下异常处理呢?以上小节,SimpleApplicationEventMulticaster源码分析的时候,不知道大家有没有细看,其实它就用了一个很巧妙的异常处理,我们可以借鉴一下,哈哈哈,这就是看源码的一个小小的好处了。

    我们可以给onApplicationEvent的实现try...catch...一下,如果catch住异常的话,可以定义一个handlerException异常处理方法,给子类自定义去实现,当然,异常可以默认不处理嘛,代码如下:

    /**
     * 关注公众号:捡田螺的小男孩
     * @desc : 更多干货
     */
    public interface IEventListener<T extends BaseEvent> extends ApplicationListener <T> {
    
        /**
         * 观察者的业务逻辑处理
         * @param event
         */
        default void onApplicationEvent(T event){
            try {
                if (support(event)) {
                    handler(event);
                }
            } catch (Throwable e) {
                /**
                 * 
                 */
                handleException(e);
            }
        }
    
        /**
         * 默认执行观察者的逻辑的
         * @param event
         * @return
         */
        default boolean support(T event) {
            return true;
        }
    
        /**
         *  观察者的逻辑,交给不同子类自定义实现
         * @param event
         */
        void handler(T event);
    
        /**
         * 异常默认不处理
         * @param exception
         */
        default void handleException(Throwable exception) {
        }
    
    }

    最后呢,不同业务不同的监听者(观察者),直接实现你的IEventListener就好啦,比如注册成功那个,我们声明一个RegisterMessageListenerImpl类,如下:

    @Service
    public class RegisterMessageListenerImpl implements IEventListener<RegisterMessageEvent> {
        
        @Override
        public void handler(RegisterMessageEvent event) {
            System.out.println("用户注册成功register,执行监听事件" + event.getSource() + new Date());
        }
    }

    6.3 定义Publisher发送者模板

    我们的观察者模板,最后一步就是定义发送者模板。最简单的发送,就是利用ApplicationContext直接发送就好。

    @Component
    public class EventPublish {
    
        @Autowired
        private ApplicationContext applicationContext;
    
        void publish(BaseEvent event) {
            applicationContext.publishEvent(event);
        }
        
    }

    为什么可以直接使用applicationContext来发送,文章开始介绍,我们不是用ApplicationEventPublisher来发送嘛?其实是因为applicationContext继承了ApplicationEventPublisher接口

    这个只是同步阻塞方式的观察者模式,一般来说,一个通用的观察者模板。也需要提供异步非阻塞方式的观察者模板。本文第5小节,我们知道了,在配置类加上@EnableAsync,在需要异步执行的监听加上@Async,即可实现异步。

    为了方便管理,和API语义更明确,我们可以手动设置线程池,给我们的模板发布类,提供异步发送的接口。我们先自定义一个线程池,一般不建议直接使用JDK的线程池。

    如何自定义线程池呢?

    我们在application.properties配置文件,定义线程池一些属性(核心线程数、最大线程数等等)

    executor.corePoolSize=50
    executor.maxPoolSize=100
    executor.queueCapacity=200
    executor.keepAliveSeconds=120
    executor.threadNamePrefix=threadPoolExecutor

    然后声明一个线程配置类TianLuoExecutorsConfig:

    /**
     * 关注公众号:捡田螺的小男孩
     * 更多实战干货
     */
    
    @Configuration
    @ConfigurationProperties("executor")//读取配置文件的线程池属性
    public class TianLuoExecutorsConfig {
    
        private int corePoolSize;
    
        private int maxPoolSize;
    
        private int queueCapacity;
    
        private int keepAliveSeconds;
    
        private String threadNamePrefix;
        
        //省略get和set的方法
    }

    通过线程配置类TianLuoExecutorsConfig,初始化线程池TianLuoExecutorPool,代码如下:

    /**
     * 关注公众号:捡田螺的小男孩
     * 更多实战干货
     */
    @Configuration
    public class TianLuoExecutorPool {
    
        @Autowired
        private TianLuoExecutorsConfig tianLuoExecutorsConfig;
    
        @Bean
        public ThreadPoolTaskExecutor eventExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(tianLuoExecutorsConfig.getCorePoolSize());
            executor.setMaxPoolSize(tianLuoExecutorsConfig.getMaxPoolSize());
            executor.setKeepAliveSeconds(tianLuoExecutorsConfig.getKeepAliveSeconds());
            executor.setQueueCapacity(tianLuoExecutorsConfig.getQueueCapacity());
            executor.setThreadNamePrefix(tianLuoExecutorsConfig.getThreadNamePrefix());
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    }

    然后回到EventPublish,把异步发布的接口加上,如下:

    /**
     * 观察者模式,事件发布通用接口
     **/
    @Component
    public class EventPublish {
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private TianLuoExecutorPool tianLuoExecutorPool;
     
        //同步阻塞
        public void publish(BaseEvent event) {
            applicationContext.publishEvent(event);
        }
        
        //异步发布(异步非阻塞)
        public void asyncPublish(BaseEvent event) {
            tianLuoExecutorPool.eventExecutor().execute(()->{
                publish(event);
            });
        }
    }
  • 相关阅读:
    Pjax 下动态加载插件方案
    解决GDAL 写FileGDB的中文属性字段和字段值乱码。
    vue3+ts,处理树形结构数据
    你 offo 来了!阿里限产教科书级 Elasticsearch 核心手册
    技术路线扩容,韶音的雄心不止于骨传导耳机?
    Linux:进程(概念)
    《人类简史》笔记四—— 想象构建的秩序
    Pyglet综合应用|推箱子游戏地图编辑器之图片跟随鼠标
    面试题 17.16. 按摩师
    Go-Zero从0到1实现微服务项目开发(二)
  • 原文地址:https://blog.csdn.net/JavaMonsterr/article/details/125544403