• 【黑马头条】-day11热点文章实时计算-kafka-kafkaStream-Redis




    今日内容

    在这里插入图片描述

    1 实时流式计算

    在这里插入图片描述

    1.1 应用场景

    在这里插入图片描述

    1.2 技术方案选型

    在这里插入图片描述

    2 Kafka Stream

    2.1 概述

    在这里插入图片描述

    在这里插入图片描述

    2.2 KafkaStream

    在这里插入图片描述

    2.3 入门demo

    2.3.1 需求分析

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    2.3.2 实现

    在这里插入图片描述

    还是在kafka-demo的模块里实现

    2.3.2.1 添加依赖
    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-streamsartifactId>
        <exclusions>
            <exclusion>
                <artifactId>connect-jsonartifactId>
                <groupId>org.apache.kafkagroupId>
            exclusion>
            <exclusion>
                <groupId>org.apache.kafkagroupId>
                <artifactId>kafka-clientsartifactId>
            exclusion>
        exclusions>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    2.3.2.2 创建快速启动,生成kafka流
    public class KafkaStreamQuickStart {
    
        public static void main(String[] args) {
    
            //kafka的配置信息
            Properties prop = new Properties();
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
            prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
    
            //stream 构建器
            StreamsBuilder streamsBuilder = new StreamsBuilder();
    
            //流式计算
            streamProcessor(streamsBuilder);
    
            //创建kafkaStream对象
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
            //开启流式计算
            kafkaStreams.start();
        }
    
        /**
         * 流式计算
         * 消息的内容:hello kafka  hello itcast
         * @param streamsBuilder
         */
        private static void streamProcessor(StreamsBuilder streamsBuilder) {
            //创建kstream对象,同时指定从那个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                        @Override
                        public Iterable<String> apply(String value) {
                            return Arrays.asList(value.split(" "));
                        }
                    })
                    //按照value进行聚合处理
                    .groupBy((key,value)->value)
                    //时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    //统计单词的个数
                    .count()
                    //转换为kStream
                    .toStream()
                    .map((key,value)->{
                        System.out.println("key:"+key+",vlaue:"+value);
                        return new KeyValue<>(key.key().toString(),value.toString());
                    })
                    //发送消息
                    .to("itcast-topic-out");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    2.3.2.3 修改生产者

    修改com.heima.kafka.sample.ProducerQuickStart的方法

    public class ProducerQuickStart {
     
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1.kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092");
            //发送失败,失败的重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,5);
            //消息key的序列化器
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            //消息value的序列化器
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
     
            //2.生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    
            /**
             * 第一个参数:topic 第二个参数:key 第三个参数:value
             */
            //封装发送的消息
            //ProducerRecord record = new ProducerRecord("topic-first","key-001","hello kafka");
            for(int i=0;i<5;i++){
                ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","hello kafka"+" "+i);
                //3.发送消息
                producer.send(record);
            }
            producer.close();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    2.3.2.4 修改消费者

    修改com.heima.kafka.sample.ConsumerQuickStart的方法

    public class ConsumerQuickStart {
     
        public static void main(String[] args) {
            //1.添加kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092");
            //消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            //消息的反序列化器
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //手动提交偏移量
            //properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            //2.消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
     
            //3.订阅主题
            consumer.subscribe(Collections.singletonList("itcast-topic-out"));
    
     
            //当前线程一直处于监听状态
            while (true) {
                //4.获取消息
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key());
                    System.out.println(consumerRecord.value());
                    System.out.println(consumerRecord.offset());
                    System.out.println(consumerRecord.partition());
                }
            }
     
        }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    2.3.2.5 测试

    先启动消费者,再启动kafkaStream,再启动生产者

    发送消息为"hello kafka"+" "+i,一共五次

    在这里插入图片描述

    符合我们发的

    2.4 SpringBoot集合KafkaStream

    在这里插入图片描述

    在这里插入图片描述

    2.4.1 创建自定配置参数类

    在kafka-demo中创建com.heima.kafka.config.KafkaStreamConfig类

    /**
     * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
     */
    @Getter
    @Setter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {
        private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
        private String hosts;
        private String group;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
            props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
            props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            return new KafkaStreamsConfiguration(props);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.4.2 修改配置文件

    修改heima-leadnews-test/kafka-demo/src/main/resources/application.yaml

    将其放到最底下

    kafka:
      hosts: 192.168.204.129:9092
      group: ${spring.application.name}
    
    • 1
    • 2
    • 3
    server:
      port: 9991
    spring:
      application:
        name: kafka-demo
      kafka:
        bootstrap-servers: 192.168.204.129:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: ${spring.application.name}-test
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    kafka:
      hosts: 192.168.204.129:9092
      group: ${spring.application.name}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.4.3 创建配置类创建KStream对象

    创建com.heima.kafka.stream.KafkaStreamHelloListener

    等于KStream放入spring容器中进行直接监听

    @Configuration
    @Slf4j
    public class KafkaStreamHelloListener {
        @Bean
        public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
            //创建kstream对象,同时指定从那个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String value) {
                    return Arrays.asList(value.split(" "));
                }
            })
                    //根据value进行聚合分组
                    .groupBy((key,value)->value)
                    //聚合计算时间间隔
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    //求单词的个数
                    .count()
                    .toStream()
                    //处理后的结果转换为string字符串
                    .map((key,value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(),value.toString());
                    })
                    //发送消息
                    .to("itcast-topic-out");
            return stream;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.4.4 测试

    启动kafka启动类,启动消费者和生产者

    发送消息是"hello kafka",一共五次

    在这里插入图片描述

    3 热点文章实时计算

    3.1 思路说明

    在这里插入图片描述

    3.2 实现步骤

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    3.3 具体实现

    3.3.1 为行为微服务添加kafka配置

    在heima-leadnews-behavior微服务中集成kafka生产者配置

    spring:
      application:
        name: leadnews-behavior
      kafka:
        bootstrap-servers: 192.168.204.129:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.3.2 行为微服务中发送消息的消息体实体类

    定义消息发送封装类:UpdateArticleMess

    在heima-leadnews-model中创建com.heima.model.message.UpdateArticleMess实体类

    package com.heima.model.message;
     
    import lombok.Data;
     
    @Data
    public class UpdateArticleMess {
     
        /**
         * 修改文章的字段类型
          */
        private UpdateArticleType type;
        /**
         * 文章ID
         */
        private Long articleId;
        /**
         * 修改数据的增量,可为正负
         */
        private Integer add;
     
        public enum UpdateArticleType{
            COLLECTION,COMMENT,LIKES,VIEWS;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3.3.3 定义kafka流接收的topic

    在heima-leadnews-common中创建com.heima.common.constants.HotArticleConstants常量类

    package com.heima.common.constants;
    public class HotArticleConstants {
        public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    }
    
    • 1
    • 2
    • 3
    • 4

    3.3.4 修改用户行为后的逻辑(相当于生产者)

    点赞之后就要发送消息了,所以去修改用户点赞的实现类com.heima.behavior.service.impl.ApLikesBehaviorServiceImpl

    @Service
    @Transactional
    @Slf4j
    public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
    
        @Autowired
        private CacheService cacheService;
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Override
        public ResponseResult like(LikesBehaviorDto dto) {
    
            //1.检查参数
            if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
            }
    
            //2.是否登录
            ApUser user = AppThreadLocalUtil.getUser();
            if (user == null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
            }
    
            //组装发送给kafka的消息类
            UpdateArticleMess message =new UpdateArticleMess();
            message.setArticleId(dto.getArticleId());
            message.setType(UpdateArticleMess.UpdateArticleType.LIKES);
    
            //3.点赞  保存数据
            if (dto.getOperation() == 0) {
                Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
                if (obj != null) {
                    return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
                }
                // 保存当前key
                log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
                cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
                //添加行为的正负
                message.setAdd(1);
            } else {
                // 删除当前key
                log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
                cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
                //添加行为的正负
                message.setAdd(-1);
            }
    
            //4.给kafka发送消息
            kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(message));
            
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    
        }
    
        /**
         * 检查参数
         *
         * @return
         */
        private boolean checkParam(LikesBehaviorDto dto) {
            if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
                return true;
            }
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    点赞有,阅读都有,一样需要改

    @Service
    @Transactional
    @Slf4j
    public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
    
        @Autowired
        private CacheService cacheService;
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Override
        public ResponseResult readBehavior(ReadBehaviorDto dto) {
            //1.检查参数
            if (dto == null || dto.getArticleId() == null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
            }
    
            //2.是否登录
            ApUser user = AppThreadLocalUtil.getUser();
            if (user == null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
            }
            //组装发送给kafka的消息类
            UpdateArticleMess message =new UpdateArticleMess();
            message.setArticleId(dto.getArticleId());
            message.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
    
            //更新阅读次数
            String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            if (StringUtils.isNotBlank(readBehaviorJson)) {
                ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
                dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
            }
            // 保存当前key
            log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
            cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
            //添加行为的正负
            message.setAdd(1);
            //发送消息
            kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(message));
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.3.5 Stream聚合

    因为用户行为最后都体现在文章上面,所以kafkaStream的数据聚合应该在文章微服务中。

    3.3.5.1 创建自定配置参数类

    在heima-leadnews-article中创建com.heima.article.config.KafkaStreamConfig配置类

    @Getter
    @Setter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {
        private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
        private String hosts;
        private String group;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
            props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
            props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            return new KafkaStreamsConfiguration(props);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    3.3.5.2 添加kafkaStream的配置

    在nacos中为文章微服务添加kafkaStream的配置

    kafka:
      hosts: 192.168.204.129:9092
      group: ${spring.application.name}
    
    • 1
    • 2
    • 3
    3.3.5.3 定义kafka流转发的topic

    在com.heima.common.constants.HotArticleConstants中添加HOT_ARTICLE_INCR_HANDLE_TOPIC

    public class HotArticleConstants {
        public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
        public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
    }
    
    • 1
    • 2
    • 3
    • 4
    3.3.5.4 文章微服务中的发送消息的消息体实体类

    因为聚合后的数据是COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0,不包含文章id,所以需要一个类把他们封装起来

    在heima-leadnews-model中创建com.heima.model.message.ArticleVisitStreamMess实体类

    @Data
    public class ArticleVisitStreamMess {
        /**
         * 文章id
         */
        private Long articleId;
        /**
         * 阅读
         */
        private int view;
        /**
         * 收藏
         */
        private int collect;
        /**
         * 评论
         */
        private int comment;
        /**
         * 点赞
         */
        private int like;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    3.3.5.5 创建配置类创建KStream对象

    定义stream,接收消息并聚合,创建com.heima.article.stream.HotArticleStreamHandler类

    @Configuration
    @Slf4j
    public class HotArticleStreamHandler {
    
        @Bean
        public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
            //接收消息
            KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
            //聚合流式处理
            stream.map((key,value)->{
                        UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                        //重置消息的key:1234343434   和  value: likes:1
                        return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
                    })
                    //按照文章id进行聚合
                    .groupBy((key,value)->key)
                    //时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    /**
                     * 自行的完成聚合的计算
                     */
                    .aggregate(new Initializer<String>() {
                        /**
                         * 初始方法,返回值是消息的value 初始值,也就是aggValue
                         * @return
                         */
                        @Override
                        public String apply() {
                            return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                        }
                        /**
                         * 真正的聚合操作,返回值是消息的value
                         */
                    }, new Aggregator<String, String, String>() {
                        /**
                         * key:文章id value:消息的value  aggValue:初始值
                         * @param key key:1234343434
                         * @param value value: likes:1
                         * @param aggValue 初始值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                         * @return
                         */
                        @Override
                        public String apply(String key, String value, String aggValue) {
                            if(StringUtils.isBlank(value)){
                                return aggValue;
                            }
                            String[] aggAry = aggValue.split(",");
                            int col = 0,com=0,lik=0,vie=0;
                            for (String agg : aggAry) {
                                String[] split = agg.split(":");
                                /**
                                 * 获得初始值,也是时间窗口内计算之后的值
                                 */
                                switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                    case COLLECTION:
                                        col = Integer.parseInt(split[1]);
                                        break;
                                    case COMMENT:
                                        com = Integer.parseInt(split[1]);
                                        break;
                                    case LIKES:
                                        lik = Integer.parseInt(split[1]);
                                        break;
                                    case VIEWS:
                                        vie = Integer.parseInt(split[1]);
                                        break;
                                }
                            }
                            /**
                             * 累加操作
                             */
                            String[] valAry = value.split(":");
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                                case COLLECTION:
                                    col += Integer.parseInt(valAry[1]);
                                    break;
                                case COMMENT:
                                    com += Integer.parseInt(valAry[1]);
                                    break;
                                case LIKES:
                                    lik += Integer.parseInt(valAry[1]);
                                    break;
                                case VIEWS:
                                    vie += Integer.parseInt(valAry[1]);
                                    break;
                            }
    
                            String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                            System.out.println("文章的id:"+key);
                            System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                            return formatStr;
                        }
                    }, Materialized.as("hot-atricle-stream-count-001"))
                    .toStream()
                    /**
                     * 格式化消息的key和value
                     */
                    .map((key,value)->{
                        return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                    })
                    //发送消息
                    .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
    
            return stream;
        }
    
        /**
         * 格式化消息的value数据
         * @param articleId
         * @param value
         * @return
         */
        public String formatObj(String articleId,String value){
            ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
            mess.setArticleId(Long.valueOf(articleId));
            //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
            String[] valAry = value.split(",");
            for (String val : valAry) {
                String[] split = val.split(":");
                switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                    case COLLECTION:
                        mess.setCollect(Integer.parseInt(split[1]));
                        break;
                    case COMMENT:
                        mess.setComment(Integer.parseInt(split[1]));
                        break;
                    case LIKES:
                        mess.setLike(Integer.parseInt(split[1]));
                        break;
                    case VIEWS:
                        mess.setView(Integer.parseInt(split[1]));
                        break;
                }
            }
            log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
            return JSON.toJSONString(mess);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138

    3.3.6 创建消费者,用于监听聚合后的消息

    创建com.heima.article.listener.ArticleIncrHandleListener用于监听和处理聚合后的消息

    @Component
    @Slf4j
    public class ArticleIncrHandleListener {
     
        @Autowired
        private ApArticleService apArticleService;
     
        @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
        public void onMessage(String mess){
            if(StringUtils.isNotBlank(mess)){
                ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
                apArticleService.updateScore(articleVisitStreamMess);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.3.7 在文章微服务中更新当前分值

    在这里插入图片描述

    3.3.8 Service添加updateScore方法

    在文章微服务的service中完善功能

    接口

    void updateScore(ArticleVisitStreamMess articleVisitStreamMess);
    
    • 1

    实现:

    /**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {
        //1.更新文章的阅读、点赞、收藏、评论的数量
        ApArticle apArticle = updateArticle(mess);
        //2.计算文章的分值
        Integer score = computeScore(apArticle);
        score = score * 3;
    
        //3.替换当前文章对应频道的热点数据
        replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
    
        //4.替换推荐对应的热点数据
        replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
    }
    
    /**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
    private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
        String articleListStr = cacheService.get(s);
        if (StringUtils.isNotBlank(articleListStr)) {
            List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
    
            boolean flag = true;
    
            //如果缓存中存在该文章,只更新分值
            for (HotArticleVo hotArticleVo : hotArticleVoList) {
                if (hotArticleVo.getId().equals(apArticle.getId())) {
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                }
            }
    
            //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
            if (flag) {
                if (hotArticleVoList.size() >= 30) {
                    hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                    if (lastHot.getScore() < score) {
                        hotArticleVoList.remove(lastHot);
                        HotArticleVo hot = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle, hot);
                        hot.setScore(score);
                        hotArticleVoList.add(hot);
                    }
    
                } else {
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }
            }
            //缓存到redis
            hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
            cacheService.set(s, JSON.toJSONString(hotArticleVoList));
        }
    }
    
    /**
     * 更新文章行为数量
     * @param mess
     */
    private ApArticle updateArticle(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
        updateById(apArticle);
        return apArticle;
    }
    
    /**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) {
        Integer score = 0;
        if(apArticle.getLikes() != null){
            score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
        }
        if(apArticle.getViews() != null){
            score += apArticle.getViews()* ArticleConstants.HOT_ARTICLE_VIEW_WEIGHT;
        }
        if(apArticle.getComment() != null){
            score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
        }
        if(apArticle.getCollection() != null){
            score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
        }
        return score;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    3.4 测试

    前端有问题,就不测试了,功能能明白就行。

  • 相关阅读:
    三端植物大战僵尸杂交版来了
    先进电机技术 —— 转子悬浮电机
    求点集中最近点对--期望线性法
    基于低代码平台开发的督办系统为企业管理赋能
    python实现维特比算法
    Windows安装MySQL8.0详细步骤
    力扣一.链表的运用
    java计算机毕业设计能源类网站平台源码+系统+数据库+lw文档+mybatis+运行部署
    C语言代码优化的艺术:深度探索与实践策略
    学习Python的经历和一些经验分享
  • 原文地址:https://blog.csdn.net/qq_45400167/article/details/137962287