• 社区系统项目复盘-5


    Kafka消息队列实现系统通知功能

    • 阻塞队列

      可以用阻塞队列来实现消息队列,阻塞队列是一个接口:BlockingQueue,可以用来解决线程通信问题,依靠两个方法,这两个方法都是阻塞的,put方法往队列里存数据,take从队列里拿数据。

      https://res.craft.do/user/full/fd148a50-4a5b-9a85-bec3-e1645571e2c7/doc/6C708C00-2D5F-456E-9347-E93ADC36F3FB/296D14C4-3C6D-45C5-906D-E3DCEC0BDF9F_2/vY6efFTgZHTVJayyxExIASo6BbgkZavx4u2WWRLPLOoz/Image.png

    生产者消费者模式:

    • 生产者:产生数据的线程
    • 消费者:使用数据的线程

    阻塞队列BlockingQueue接口的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

    代码示例:

    public class BlockingQueueTests {
    
        public static void main(String[] args) {
            BlockingQueue queue = new ArrayBlockingQueue(10);
            new Thread(new Producer(queue)).start();
            new Thread(new Consumer(queue)).start();
            new Thread(new Consumer(queue)).start();
            new Thread(new Consumer(queue)).start();
        }
    }
    
    // 生产者线程
    class Producer implements Runnable {
    
        private BlockingQueue<Integer> queue;
    
        public Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(20);
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    // 消费者线程
    class Consumer implements Runnable {
    
        private BlockingQueue<Integer> queue;
    
        public Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    Thread.sleep(new Random().nextInt(1000));
                    queue.take();
                    System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    什么是Kafka?

    • Kafka 简介
      • Kafaka是一个分布式的流处理平台
      • 应用:消息系统、日志收集、用户行为追踪、流式处理
    • Kafaka 特点
      • 高吞吐量、消息持久化、高可靠性、高扩展性
    • Kafka术语
      • Broker:Kafka的服务器,kafka集群中的每一台服务器称为一个broker
      • Zookeeper:同来管理其他的集群,kafka中内置zookeeper
      • Topic、Partition、Offset:kafka采用发布订阅模式,生产者把消息发布到的那个地方就叫topic,partition是分区的意思,一个主题分为多个分区,offset表示消息在分区内存放的索引。
      • Leader Replica、Follow Replica:Kafka通过副本的形式对数据存储多份,主副本可以处理请求,从副本只是备份,不负责做响应,当主副本挂掉时,集群会从众多从副本中选择一个作为leader。
    • 下载安装kafka
      • 从Kafka官网下载安装包到本地,解压缩就可以
      • 修改配置文件 config/zookeeeper.properties,dataDir
      • 修改配置文件 config/server.properties,log.dirs
      • 启动zookeeper:
        • cd /usr/local/kafka_2.13-3.2.1
        • sh bin/zookeeper-server-start.sh config/zookeeper.properties
      • 启动kafka:
        • cd /usr/local/kafka_2.13-3.2.1
        • sh bin/kafka-server-start.sh config/server.properties

    Spring是怎么整合Kafka的?

    • 第一步:引入依赖 spring-kafka

      <dependency>
      	<groupId>org.springframework.kafkagroupId>
      	<artifactId>spring-kafkaartifactId>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
    • 第二步:配置Kafka

      • 配置server、consumer
      # KafkaProperties
      spring.kafka.bootstrap-servers=localhost:9092
      spring.kafka.consumer.group-id=community-consumer-group
      spring.kafka.consumer.enable-auto-commit=true
      spring.kafka.consumer.auto-commit-interval=3000
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 第三步:访问Kafka

      • 生产者:kafkaTemplate.send(topic,data);
      • 消费者:
        • @KafkaListener(topics={“test”})
        • public void handleMessage(ConsumerRecord record) { }

      代码示例:

      public class KafkaTests {
          @Autowired
          private KafkaProducer kafkaProducer;
          @Test
          public void testKafka() {
              kafkaProducer.sendMessage("test", "你好");
              kafkaProducer.sendMessage("test", "在吗");
              try {
                  Thread.sleep(1000 * 10);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      
      @Component
      class KafkaProducer {
          @Autowired
          private KafkaTemplate kafkaTemplate;
          public void sendMessage(String topic, String content) {
              kafkaTemplate.send(topic, content);
          }
      }
      
      @Component
      class KafkaConsumer {
          @KafkaListener(topics = {"test"})
          public void handleMessage(ConsumerRecord record) {
              System.out.println(record.value());
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31

    发送系统通知

    功能描述:当用户收到评论、点赞和关注后,会收到来自系统的通知信息。

    • 实现:从技术上来讲,使用kafka消息队列的方式,根据不同的主题处理不同的任务。从业务的角度来讲,采用事件驱动的方式,评论、点赞、关注分别为事件。开发时,在Kafka框架的基础上,基于事件对代码逻辑进一步的封装。

      https://res.craft.do/user/full/fd148a50-4a5b-9a85-bec3-e1645571e2c7/doc/6C708C00-2D5F-456E-9347-E93ADC36F3FB/B54638E1-E998-4C07-A7CC-D87D5D3AF893_2/a7thjomjJ16dZx8NUwC6dYgi2sNZDpDw1HdYgbgWmOMz/Image.png

    • 触发事件

      • 评论后,发布通知
      • 点赞后,发布通知
      • 关注后,发布通知
    • 处理事件

      • 封装事件对象
      • 开发事件的生产者
      • 开发事件的消费者

    在这里插入图片描述

    具体实现:

    1.封装事件对象 Event,对类中的set函数都进行改写,使得构造对象时可以连续set

    private String topic;
    private int userId;
    private int entityType;
    private int entityId;
    private int entityUserId;
    private Map<String,Object> data = new HashMap<>();
    // 思考重写set方法的用处,全部都做这样的处理,以topic为例。
    // 好处:可以连续set
    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.开发事件的生产者EventProducer,将事件发布到指定的主题,主要有评论、点赞、关注3个topic

    @Component
    public class EventProducer {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        // 处理事件
        public void fireEvent(Event event){
            // 将事件发布到指定的主题
            kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.开发事件的消费者EventConsumer

    @Component
    public class EventConsumer implements CommunityConstant {
        private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    
    	@Autowired
        private MessageService messageService;
    
    	@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
        public void handleCommentMessage(ConsumerRecord record){
            if(record == null || record.value() == null){
                logger.error("消息的内容为空!");
                return;
            }
    
            Event event = JSONObject.parseObject(record.value().toString(),Event.class);
            if(event == null){
                logger.error("消息格式错误!");
                return;
            }
    
            // 发送站内通知
            Message message = new Message();
            message.setFromId(SYSTEM_USER_ID);
            message.setToId(event.getEntityUserId());
            message.setConversationId(event.getTopic());
            message.setCreateTime(new Date());
    
            Map<String,Object> content = new HashMap<>();
            content.put("userId",event.getUserId());
            content.put("entityType",event.getEntityType());
            content.put("entityId",event.getEntityId());
            if(!event.getData().isEmpty()){
                for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                    content.put(entry.getKey(),entry.getValue());
                }
            }
            message.setContent(JSONObject.toJSONString(content));
    
            messageService.addMessage(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    4.触发事件,有3个地方会触发该事件

    • 发布评论后,触发事件,系统向被评论用户发送通知
    // 触发评论事件
    Event event = new Event()
            .setTopic(TOPIC_COMMENT)
            .setUserId(hostHolder.getUser().getId())
            .setEntityType(comment.getEntityType())
            .setEntityId(comment.getEntityId())
            .setData("postId",discussPostId);
    if(comment.getEntityType() == ENTITY_TYPE_POST){
        DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
        event.setEntityUserId(target.getUserId());
    }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
        Comment target = commentService.findCommentById(comment.getEntityId());
        event.setEntityUserId(target.getUserId());
    }
    eventProducer.fireEvent(event);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 点赞后,触发事件,系统向被赞用户发送通知。⚠️ 需要判断点赞状态。
    // 触发点赞事件
    if(likeStatus == 1){
        Event event = new Event()
                .setTopic(TOPIC_LIKE)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityUserId)
                .setData("postId",postId);
        eventProducer.fireEvent(event);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 关注后,触发事件,系统向被关注用户发送通知
    // 触发关注事件
    Event event = new Event()
            .setTopic(TOPIC_FOLLOW)
            .setUserId(hostHolder.getUser().getId())
            .setEntityType(entityType)
            .setEntityId(entityId)
            .setEntityUserId(entityId);
    eventProducer.fireEvent(event);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    **小结:**首先需要创建事件生产者类,将事件发布到指定主题,主题包括评论、点赞、关注这3种,然后创建事件消费者类,定义消费该类事件的方法,使用@KafkaListener注解,属性topics中是消费的时间类型,然后在相应的controller类中加入触发事件的代码块,发布评论时、点赞后、关注后都需要触发事件。

    显示系统通知

    功能描述:

    • 通知列表:显示评论、点赞、关注三种类型的通知
    • 通知详情:分页显示某一类主题所包含的通知
    • 未读消息:在页面头部显示所有的未读消息数量;使用拦截器实现,拦截器首先获取当前用户,如果用户不为null,说明已经登录了,那么就查询未读私信数量和未读通知数量,总数显示在页面上

    在这里插入图片描述

  • 相关阅读:
    5.9图书管理案例
    基于Linux的Web小型服务器HTTP项目的自主实现
    如何通俗理解逻辑回归(Logistic Regression)
    许可分析 license分析 第十七章
    【算能】sail的python-pcie的编译时候,报错:
    KVM管理平台选型与开源企业级虚拟化平台oVirt详解
    下一代英伟达H100 GPU发布时,国产芯片能追上吗?
    考研概率论与数理统计(知识点梳理)
    [NCTF 2018]flask真香
    RabbitMQ入门与进阶实战
  • 原文地址:https://blog.csdn.net/wangws_sb/article/details/128101031