• Halo 开源项目学习(六):事件监听机制


    基本介绍

    Halo 项目中,当用户或博主执行某些操作时,服务器会发布相应的事件,例如博主登录管理员后台时发布 "日志记录" 事件,用户浏览文章时发布 "访问文章" 事件。事件发布后,负责监听的 Bean 会做出相应的处理,这种设计称为事件监听机制,其作用是可以实现业务逻辑之间的解耦,提高程序的扩展性和可维护性。

    ApplicationEvent 和 Listener

    Halo 使用 ApplicationEvent 和 Listener 来实现事件的发布与监听,二者由 Spring 提供,其中 ApplicationEvent 是需要发布的事件,Listener 则是监听器。用户可在监听器中自定义事件的处理逻辑,当事件发生时,只需要将事件发布,监听器会根据用户定义的逻辑自动处理该事件。

    定义事件

    事件需要继承 ApplicationEvent 类,且需要重载构造方法,以 LogEvent 为例:

    public class LogEvent extends ApplicationEvent {
    
        private final LogParam logParam;
    
        /**
         * Create a new ApplicationEvent.
         *
         * @param source the object on which the event initially occurred (never {@code null})
         * @param logParam login param
         */
        public LogEvent(Object source, LogParam logParam) {
            super(source);
    
            // Validate the log param
            ValidationUtils.validate(logParam);
    
            // Set ip address
            logParam.setIpAddress(ServletUtils.getRequestIp());
    
            this.logParam = logParam;
        }
    
        public LogEvent(Object source, String logKey, LogType logType, String content) {
            this(source, new LogParam(logKey, logType, content));
        }
    
        public LogParam getLogParam() {
            return logParam;
        }
    }
    

    构造方法中的 source 指的是触发事件的 Bean,也称为事件源,通常用 this 关键字代替,其它参数可由用户任意指定。

    发布事件

    ApplicationContext 接口的 publishEvent 方法可用于发布事件,例如博客初始化完成后发布 LogEvent 事件(InstallConroller 中的 installBlog 方法):

    public BaseResponse<String> installBlog(@RequestBody InstallParam installParam) {
        // 省略部分代码
    
        eventPublisher.publishEvent(
            new LogEvent(this, user.getId().toString(), LogType.BLOG_INITIALIZED, "博客已成功初始化")
        );
    
        return BaseResponse.ok("安装完成!");
    }
    

    监听器

    监听器的创建方式有多种,例如实现 ApplicationListener 接口、SmartApplicationListener 接口,或者添加 @EventListener 注解。项目中使用注解来定义监听器,如 LogEventListener:

    @Component
    public class LogEventListener {
    
        private final LogService logService;
    
        public LogEventListener(LogService logService) {
            this.logService = logService;
        }
    
        @EventListener
        @Async
        public void onApplicationEvent(LogEvent event) {
            // Convert to log
            Log logToCreate = event.getLogParam().convertTo();
    
            // Create log
            logService.create(logToCreate);
        }
    }
    

    用户可在 @EventListener 注解修饰的方法中定义事件的处理逻辑,方法接收的参数为监听的事件类型。@Async 注解的作用是实现异步监听,以上文中的 installBlog 方法为例,如果不添加该注解,那么程序需要等待 onApplicationEvent 方法执行结束后才能返回 "安装完成!"。加上 @Async 注解后,onApplicationEvent 方法会在新的线程中执行,installBlog 方法可以立即返回。若要使 @Async 注解生效,还需要在启动类或配置类上添加 @EnableAsync 注解。

    事件处理

    接下来我们分析一下 Halo 项目中不同事件的处理过程:

    日志记录事件

    日志记录事件 LogEvent 由 LogEventListener 中的 onApplicationEvent 方法处理,该方法的处理逻辑非常简单,就是在 logs 表中插入一条系统日志,插入的记录用于在管理员界面展示:

    需要注意的是,不同类型日志的 logKey、logType 以及 content 会有所区别,例如用户登录时,logKey 为用户的 userName,logType 为 LogType.LOGGED_IN,content 为用户的 nickName:

    eventPublisher.publishEvent(
            new LogEvent(this, user.getUsername(), LogType.LOGGED_IN, user.getNickname()));
    

    发布文章时,logKey 为文章的 id,logType 为 LogType.POST_PUBLISHED,content 为文章的 title:

    LogEvent logEvent = new LogEvent(this, createdPost.getId().toString(),
        LogType.POST_PUBLISHED, createdPost.getTitle());
    eventPublisher.publishEvent(logEvent);
    

    文章访问事件

    文章访问事件 PostVisitEvent 由 AbstractVisitEventListener 中的 handleVisitEvent 方法处理,该方法的处理的逻辑是将当前文章的访问量加一:

    protected void handleVisitEvent(@NonNull AbstractVisitEvent event) throws InterruptedException {
        Assert.notNull(event, "Visit event must not be null");
        // 获取文章 id
        // Get post id
        Integer id = event.getId();
    
        log.debug("Received a visit event, post id: [{}]", id);
    
        // 如果当前 postId 具有对应的 BlockingQueue, 那么直接返回该 BlockingQueue, 否则为当前 postId 创建一个新的 BlockingQueue
        // Get post visit queue
        BlockingQueue<Integer> postVisitQueue =
            visitQueueMap.computeIfAbsent(id, this::createEmptyQueue);
        // 如果当前 postId 具有对应的 PostVisitTask, 不做任何处理, 否则为当前 postId 创建一个新的 PostVisitTask 任务
        visitTaskMap.computeIfAbsent(id, this::createPostVisitTask);
        // 将当前 postId 存入到对应的 BlockingQueue
        // Put a visit for the post
        postVisitQueue.put(id);
    }
    

    上述方法首先获取当前被访问文章的 postId,然后查询 visitQueueMap 中是否存在 postId 对应的阻塞队列(实际类型为 LinkedBlockingQueue),如果存在那么直接返回该队列, 否则为当前 postId 创建一个新的阻塞队列并存入到 visitQueueMap。接着查询 visitTaskMap 中是否存在 postId 对应的 PostVisitTask 任务(任务的作用是将文章的访问量加一),如果没有,那么就为 postId 创建一个新的 PostVisitTask 任务,并将该任务交给线程池 ThreadPoolExecutor(Executors.newCachedThreadPool())执行。之后将 postId 添加到对应的阻塞队列,这一步的目的是管理 PostVisitTask 任务的执行次数。

    visitQueueMap 和 visitTaskMap 都是 ConcurrentHashMap 类型的对象,使用 ConcurrentHashMap 是为了保证线程安全,因为监听器的事件处理方法被 @Async 注解修饰。默认情况下,@Async 注解修饰的方法会由 Spring 创建的线程池 ThreadPoolTaskExecutor 中的线程执行,因此当某一篇文章被多个用户同时浏览时,ThreadPoolTaskExecutor 中的多个线程可能会同时在 visitQueueMap 中创建阻塞队列,或在 visitTaskMap 中创建 PostVisitTask 任务。

    下面看一下 PostVisitTask 任务中 run 方法的处理逻辑:

    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                BlockingQueue<Integer> postVisitQueue = visitQueueMap.get(id);
                Integer postId = postVisitQueue.take();
    
                log.debug("Took a new visit for post id: [{}]", postId);
    
                // Increase the visit
                basePostService.increaseVisit(postId);
    
                log.debug("Increased visits for post id: [{}]", postId);
            } catch (InterruptedException e) {
                log.debug(
                    "Post visit task: " + Thread.currentThread().getName() + " was interrupted",
                    e);
                // Ignore this exception
            }
        }
    
        log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
    }
    
    

    线程池 ThreadPoolExecutor 中的一个线程处理该任务:

    1. 从 visitQueueMap 获取 postId 对应的阻塞队列(这里的 id 其实就是 postId),并取出队首元素。

    2. 将 postId 对应的文章的点赞量加一。

    3. 只要线程不被中断,就一直重复步骤 1 和步骤 2,如果队列为空,那么线程进入阻塞。

    综上,文章访问事件的处理流程总结如下:

    当 id 为 postId 的文章被访问时,系统会为其创建一个 LinkedBlockingQueue 类型的阻塞队列和一个负责将文章点赞量加一的 PostVisitTask 任务。然后 postId 入队,线程池 ThreadPoolExecutor 分配一个线程执行 PostVisitTask 任务,阻塞队列有多少个 postId 该任务就执行多少次。

    结语

    事件监听机制是一个非常重要的知识点,实际开发中,如果某些业务处理起来比较耗时,且与主要业务的关联性并不是很强,那么可以考虑做任务拆分,利用事件监听机制将串行执行异步化,改为并行执行(当然也可以使用消息队列)。Halo 中还有新增评论、主题更新等事件,这些事件的的处理思路与文章访问事件相似,所以本文就不再过多陈述了 ( ⊙‿⊙)。

    作者:John同学
    欢迎任何形式的转载,但请务必注明出处。
    文章和代码如有不当之处,欢迎批评指正!

  • 相关阅读:
    两数乘积:输出1~100整数乱序列表中两数乘积是目标整数的最小下标对
    基于CycleGAN的山水风格画迁移
    向下沟通(上):无权无势,他们不听你的怎么办?
    国庆作业 day1
    算法----数组常见知识点
    RocketMQ 消费者分类与分组
    学习C++第二十二课--类模版概念与函数模版的定义、调用笔记
    浏览器绑定快捷键KeyboardEvent
    【软件测试】资深测试聊一聊,测试架构师是怎么样的,做一名成功的测试工程师......
    一个数据库版本兼容问题
  • 原文地址:https://www.cnblogs.com/johnlearning/p/16201473.html