• SpringBoot项目实现发布订阅模式,真的很简单


    大家好,我是老三,在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……

    这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

    下单之后的逻辑

    所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。

    观察者模式

    一开始,我们都是自己去写观察者模式。

    自己实现观察者模式

    观察者简图

    观察者

    • 观察者定义接口
    /**
     * @Author: fighter3
     * @Description: 观察者接口
     * @Date: 2022/11/7 11:40 下午
     */
    public interface OrderObserver {
    
        void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 具体观察者

      • 监控埋点观察者
      @Slf4j
      public class OrderMetricsObserver implements OrderObserver {
      
          @Override
          public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
              log.info("[afterPlaceOrder] metrics");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 日志记录观察者
      @Slf4j
      public class OrderLogObserver implements OrderObserver{
      
          @Override
          public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
              log.info("[afterPlaceOrder] log.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 业务通知观察者
      @Slf4j
      public class OrderNotifyObserver implements OrderObserver{
      
          @Override
          public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
              log.info("[afterPlaceOrder] notify.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

    被观察者

    • 消息实体定义
    @Data
    public class PlaceOrderMessage implements Serializable {
        /**
         * 订单号
         */
        private String orderId;
        /**
         * 订单状态
         */
        private Integer orderStatus;
        /**
         * 下单用户ID
         */
        private String userId;
        //……
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 被观察者抽象类
    public abstract class OrderSubject {
        //定义一个观察者列表
        private List<OrderObserver> orderObserverList = new ArrayList<>();
        //定义一个线程池,这里参数随便写的
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
    
        //增加一个观察者
        public void addObserver(OrderObserver o) {
            this.orderObserverList.add(o);
        }
    
        //删除一个观察者
        public void delObserver(OrderObserver o) {
            this.orderObserverList.remove(o);
        }
    
        //通知所有观察者
        public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
            for (OrderObserver orderObserver : orderObserverList) {
                //利用多线程异步执行
                threadPoolExecutor.execute(() -> {
                    orderObserver.afterPlaceOrder(placeOrderMessage);
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里利用了多线程,来异步执行观察者。

    • 被观察者实现类
    /**
     * @Author: fighter3
     * @Description: 订单实现类-被观察者实现类
     * @Date: 2022/11/7 11:52 下午
     */
    @Service
    @Slf4j
    public class OrderServiceImpl extends OrderSubject implements OrderService {
    
        /**
         * 下单
         */
        @Override
        public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
            PlaceOrderResVO resVO = new PlaceOrderResVO();
            //添加观察者
            this.addObserver(new OrderMetricsObserver());
            this.addObserver(new OrderLogObserver());
            this.addObserver(new OrderNotifyObserver());
            //通知观察者
            this.notifyObservers(new PlaceOrderMessage());
            log.info("[placeOrder] end.");
            return resVO;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    测试

        @Test
        @DisplayName("下单")
        void placeOrder() {
            PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
            orderService.placeOrder(placeOrderReqVO);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 测试执行结果
    2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver        : [afterPlaceOrder] metrics
    2022-11-08 00:11:13.618  INFO 20235 --- [           main] cn.fighter3.obverser.OrderServiceImpl    : [placeOrder] end.
    2022-11-08 00:11:13.618  INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver  : [afterPlaceOrder] notify.
    2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver    : [afterPlaceOrder] log.
    
    • 1
    • 2
    • 3
    • 4

    可以看到,观察者是异步执行的。

    利用Spring精简

    可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。

    Spring精简观察者模式

    观察者实现类:定义成Bean

    • OrderLogObserver

      @Slf4j
      @Service
      public class OrderLogObserver implements OrderObserver {
      
          @Override
          public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
              log.info("[afterPlaceOrder] log.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • OrderMetricsObserver

    @Slf4j
    @Service
    public class OrderMetricsObserver implements OrderObserver {
    
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] metrics");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • OrderNotifyObserver
    @Slf4j
    @Service
    public class OrderNotifyObserver implements OrderObserver {
    
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] notify.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    被观察者:自动注入Bean

    • OrderSubject

      public abstract class OrderSubject {
          /**
           * 利用Spring的特性直接注入观察者
           */
          @Autowired
          protected List<OrderObserver> orderObserverList;
      
          //定义一个线程池,这里参数随便写的
          ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
      
          //通知所有观察者
          public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
              for (OrderObserver orderObserver : orderObserverList) {
                  //利用多线程异步执行
                  threadPoolExecutor.execute(() -> {
                      orderObserver.afterPlaceOrder(placeOrderMessage);
                  });
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • OrderServiceImpl

    @Service
    @Slf4j
    public class OrderServiceImpl extends OrderSubject implements OrderService {
    
        /**
         * 实现类里也要注入一下
         */
        @Autowired
        private List<OrderObserver> orderObserverList;
    
        /**
         * 下单
         */
        @Override
        public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
            PlaceOrderResVO resVO = new PlaceOrderResVO();
            //通知观察者
            this.notifyObservers(new PlaceOrderMessage());
            log.info("[placeOrder] end.");
            return resVO;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。

    Spring Event实现发布/订阅模式

    Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。

    自定义事件

    • PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。
    public class PlaceOrderEvent extends ApplicationEvent {
    
        public PlaceOrderEvent(PlaceOrderEventMessage source) {
            super(source);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • PlaceOrderEventMessage:事件消息,定义了事件的消息体。
    @Data
    public class PlaceOrderEventMessage implements Serializable {
        /**
         * 订单号
         */
        private String orderId;
        /**
         * 订单状态
         */
        private Integer orderStatus;
        /**
         * 下单用户ID
         */
        private String userId;
        //……
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    事件监听者

    事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。

    事件监听者实现

    实现ApplicationListener接口

    实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。

    • OrderLogListener
    @Slf4j
    @Service
    public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {
    
        @Override
        public void onApplicationEvent(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • OrderMetricsListener
    @Slf4j
    @Service
    public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {
    
        @Override
        public void onApplicationEvent(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] metrics");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • OrderNotifyListener
    @Slf4j
    @Service
    public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {
    
        @Override
        public void onApplicationEvent(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] notify.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    使用@EventListener注解

    使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。

    • OrderLogListener

      @Slf4j
      @Service
      public class OrderLogListener  {
      
          @EventListener
          public void orderLog(PlaceOrderEvent event) {
              log.info("[afterPlaceOrder] log.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • OrderMetricsListener

      @Slf4j
      @Service
      public class OrderMetricsListener {
      
          @EventListener
          public void metrics(PlaceOrderEvent event) {
              log.info("[afterPlaceOrder] metrics");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • OrderNotifyListener

      @Slf4j
      @Service
      public class OrderNotifyListener{
      
          @EventListener
          public void notify(PlaceOrderEvent event) {
              log.info("[afterPlaceOrder] notify.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    异步和自定义线程池

    异步执行

    异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:

    • OrderLogListener
    @Slf4j
    @Service
    public class OrderLogListener  {
    
        @EventListener
        @Async
        public void orderLog(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。

    @SpringBootApplication
    @EnableAsync
    public class DailyApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DairlyLearnApplication.class, args);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    自定义线程池

    使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

    自定义线程池有三种方式:

    @Async自定义线程池

    • 实现接口AsyncConfigurer
    • 继承AsyncConfigurerSupport
    • 配置由自定义的TaskExecutor替代内置的任务执行器

    我们来看看三种写法:

    • 实现接口AsyncConfigurer
    @Configuration
    @Slf4j
    public class AsyncConfiguration implements AsyncConfigurer {
    
        @Bean("fighter3AsyncExecutor")
        public ThreadPoolTaskExecutor executor() {
            //Spring封装的一个线程池
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //随便写的一些配置
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(50);
            executor.setQueueCapacity(30);
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setThreadNamePrefix("fighter3AsyncExecutor-");
            executor.initialize();
            return executor;
        }
    
        @Override
        public Executor getAsyncExecutor() {
            return executor();
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 继承AsyncConfigurerSupport
    @Configuration
    @Slf4j
    public class SpringAsyncConfigurer extends AsyncConfigurerSupport {
    
        @Bean
        public ThreadPoolTaskExecutor asyncExecutor() {
            ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
            //随便写的一些配置
            threadPool.setCorePoolSize(10);
            threadPool.setMaxPoolSize(30);
            threadPool.setWaitForTasksToCompleteOnShutdown(true);
            threadPool.setAwaitTerminationSeconds(60 * 15);
            return threadPool;
        }
    
        @Override
        public Executor getAsyncExecutor() {
            return asyncExecutor();
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 配置自定义的TaskExecutor

      • 配置线程池

        @Configuration
        public class TaskPoolConfig {
        
            @Bean(name = "asyncExecutor")
            public Executor taskExecutor() {
                ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
                //随便写的一些配置
                executor.setCorePoolSize(10);
                executor.setMaxPoolSize(20);
                executor.setQueueCapacity(200);
                executor.setKeepAliveSeconds(60);
                executor.setThreadNamePrefix("asyncExecutor-");
                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                return executor;
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
      • 使用@Async注解的时候,指定线程池,推荐使用这种方式,因为在项目里,尽量做到线程池隔离,不同的任务使用不同的线程池

      @Slf4j
      @Service
      public class OrderLogListener  {
      
          @EventListener
          @Async("asyncExecutor")
          public void orderLog(PlaceOrderEvent event) {
              log.info("[afterPlaceOrder] log.");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

    异步和自定义线程池这一部分只是一些扩展,稍微占了一些篇幅,大家可不要觉得Spring Event用起来很繁琐。

    嗨

    发布事件

    发布事件也非常简单,只需要使用Spring 提供的ApplicationEventPublisher来发布自定义事件。

    • OrderServiceImpl

      @Service
      @Slf4j
      public class OrderServiceImpl implements OrderService {
          @Autowired
          private ApplicationEventPublisher applicationEventPublisher;
      
          /**
           * 下单
           */
          @Override
          public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
              log.info("[placeOrder] start.");
              PlaceOrderResVO resVO = new PlaceOrderResVO();
              //消息
              PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
              //发布事件
              applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
              log.info("[placeOrder] end.");
              return resVO;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21

    在Idea里查看事件的监听者也比较方便,点击下面图中的图标,就可以查看监听者。

    查看监听者

    监听者

    测试

    最后,我们还是测试一下。

        @Test
        void placeOrder() {
            PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
            orderService.placeOrder(placeOrderReqVO);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 执行结果
    2022-11-08 10:05:14.415  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] start.
    2022-11-08 10:05:14.424  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] end.
    2022-11-08 10:05:14.434  INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener    : [afterPlaceOrder] notify.
    2022-11-08 10:05:14.435  INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener   : [afterPlaceOrder] metrics
    2022-11-08 10:05:14.436  INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener       : [afterPlaceOrder] log.
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看到,异步执行,而且用到了我们自定义的线程池。

    小结

    这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?




    参考:

    [1]. 《设计模式之禅》

  • 相关阅读:
    java文件查看大小,压缩,文件下载工具类
    mysql 数据备份与恢复使用详解
    颜值在线的 Keychron K8 实际上手体验
    【单链表经典习题讲解】
    机械臂速成小指南(七):机械臂位姿的描述方法
    HTTP学习——协议与术语、HTTP、缓存、Cookie
    【Mysql专题】视图介绍及其基本操作
    已解决json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    3dMax2024中MAXScript的新增功能
    第17章 标准库特殊设施【C++】
  • 原文地址:https://blog.csdn.net/sinat_40770656/article/details/127936952