• 黑马头条 热点文章实时计算、kafkaStream


      热点文章-实时计算

    1 今日内容

    1.1 定时计算与实时计算

    1.2 今日内容

    kafkaStream

    • 什么是流式计算
    • kafkaStream概述
    • kafkaStream入门案例
    • Springboot集成kafkaStream

    实时计算

    • 用户行为发送消息
    • kafkaStream聚合处理消息
    • 更新文章行为数量
    • 替换热点文章数据

    2 实时流式计算

    2.1 概念

    一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

    流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

    2.2 应用场景

    • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等实时的进行数据分析,帮助企业进行决策

    • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

    • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌

    • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

    2.3 技术方案选型

    • Hadoop

    • Apche Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

    • Kafka Stream

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

    3 Kafka Stream

    3.1 概述

    Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

    Kafka Stream的特点如下:

    • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
    • 除了Kafka外,无任何外部依赖
    • 充分利用Kafka分区机制实现水平扩展和顺序性保证
    • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
    • 支持正好一次处理语义
    • 提供记录级的处理能力,从而实现毫秒级的低延迟
    • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
    • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

    3.2 Kafka Streams的关键概念

    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
    • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

    3.3 KStream

    (1)数据结构类似于map,如下图,key-value键值对

    (2)KStream

    KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

    数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

    KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

    为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

    (“ alice”,1)->(“” alice“,3)

    如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

    3.4 Kafka Stream入门案例编写

    (1)需求分析,求单词个数(word count)

    (2)引入依赖

    在之前的kafka-demo工程的pom文件中引入

    1. <dependency>
    2. <groupId>org.apache.kafka</groupId>
    3. <artifactId>kafka-streams</artifactId>
    4. <exclusions>
    5. <exclusion>
    6. <artifactId>connect-json</artifactId>
    7. <groupId>org.apache.kafka</groupId>
    8. </exclusion>
    9. <exclusion>
    10. <groupId>org.apache.kafka</groupId>
    11. <artifactId>kafka-clients</artifactId>
    12. </exclusion>
    13. </exclusions>
    14. </dependency>

    (3)创建原生的kafka staream入门案例

    1. package com.heima.kafka.sample;
    2. import java.time.Duration;
    3. import java.util.Arrays;
    4. import java.util.Properties;
    5. /**
    6. * 流式处理
    7. */
    8. public class KafkaStreamQuickStart {
    9. public static void main(String[] args) {
    10. //kafka的配置信息
    11. Properties prop = new Properties();
    12. prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
    13. prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    14. prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    15. prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
    16. //stream 构建器
    17. StreamsBuilder streamsBuilder = new StreamsBuilder();
    18. //流式计算
    19. streamProcessor(streamsBuilder);
    20. //创建kafkaStream对象
    21. KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
    22. //开启流式计算
    23. kafkaStreams.start();
    24. }
    25. /**
    26. * 流式计算
    27. * 消息的内容:hello kafka hello itcast
    28. * @param streamsBuilder
    29. */
    30. private static void streamProcessor(StreamsBuilder streamsBuilder) {
    31. //创建kstream对象,同时指定从那个topic中接收消息
    32. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
    33. /**
    34. * 处理消息的value
    35. */
    36. stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    37. @Override
    38. public Iterable<String> apply(String value) {
    39. return Arrays.asList(value.split(" "));
    40. }
    41. })
    42. //按照value进行聚合处理
    43. .groupBy((key,value)->value)
    44. //时间窗口
    45. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    46. //统计单词的个数
    47. .count()
    48. //转换为kStream
    49. .toStream()
    50. .map((key,value)->{
    51. System.out.println("key:"+key+",vlaue:"+value);
    52. return new KeyValue<>(key.key().toString(),value.toString());
    53. })
    54. //发送消息
    55. .to("itcast-topic-out");
    56. }
    57. }

    (4)测试准备

    • 使用生产者在topic为:itcast_topic_input中发送多条消息
    • 使用消费者接收topic为:itcast_topic_out

    结果:

    • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

    3.5 SpringBoot集成Kafka Stream

    (1)自定配置参数

    1. package com.heima.kafka.config;
    2. import java.util.HashMap;
    3. import java.util.Map;
    4. /**
    5. * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
    6. */
    7. @Setter
    8. @Getter
    9. @Configuration
    10. @EnableKafkaStreams
    11. @ConfigurationProperties(prefix="kafka")
    12. public class KafkaStreamConfig {
    13. private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    14. private String hosts;
    15. private String group;
    16. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    17. public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
    18. Map<String, Object> props = new HashMap<>();
    19. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
    20. props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//
    21. props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
    22. props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
    23. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
    24. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    25. return new KafkaStreamsConfiguration(props);
    26. }
    27. }

    修改application.yml文件,在最下方添加自定义配置

    1. kafka:
    2. hosts: 192.168.200.130:9092
    3. group: ${spring.application.name}

    (2)新增配置类,创建KStream对象,进行聚合

    1. package com.heima.kafka.stream;
    2. import java.time.Duration;
    3. import java.util.Arrays;
    4. @Configuration
    5. @Slf4j
    6. public class KafkaStreamHelloListener {
    7. @Bean
    8. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
    9. //创建kstream对象,同时指定从那个topic中接收消息
    10. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
    11. stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    12. @Override
    13. public Iterable<String> apply(String value) {
    14. return Arrays.asList(value.split(" "));
    15. }
    16. })
    17. //根据value进行聚合分组
    18. .groupBy((key,value)->value)
    19. //聚合计算时间间隔
    20. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    21. //求单词的个数
    22. .count()
    23. .toStream()
    24. //处理后的结果转换为string字符串
    25. .map((key,value)->{
    26. System.out.println("key:"+key+",value:"+value);
    27. return new KeyValue<>(key.key().toString(),value.toString());
    28. })
    29. //发送消息
    30. .to("itcast-topic-out");
    31. return stream;
    32. }
    33. }

    测试:

    启动微服务,正常发送消息,可以正常接收到消息

    3 app端热点文章计算

    3.1 思路说明

    3.2 功能实现

    3.2.1 用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

    ①在heima-leadnews-behavior微服务中集成kafka生产者配置

    修改nacos,新增内容

    1. spring:
    2. application:
    3. name: leadnews-behavior
    4. kafka:
    5. bootstrap-servers: 192.168.200.130:9092
    6. producer:
    7. retries: 10
    8. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    9. value-serializer: org.apache.kafka.common.serialization.StringSerializer

    ②修改ApLikesBehaviorServiceImpl新增发送消息

    定义消息发送封装类:UpdateArticleMess

    1. package com.heima.model.mess;
    2. import lombok.Data;
    3. @Data
    4. public class UpdateArticleMess {
    5. /**
    6. * 修改文章的字段类型
    7. */
    8. private UpdateArticleType type;
    9. /**
    10. * 文章ID
    11. */
    12. private Long articleId;
    13. /**
    14. * 修改数据的增量,可为正负
    15. */
    16. private Integer add;
    17. public enum UpdateArticleType{
    18. COLLECTION,COMMENT,LIKES,VIEWS;
    19. }
    20. }

    topic常量类:

    1. package com.heima.common.constants;
    2. public class HotArticleConstants {
    3. public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    4. }

    完整代码如下:

    1. package com.heima.behavior.service.impl;
    2. import org.springframework.transaction.annotation.Transactional;
    3. @Service
    4. @Transactional
    5. @Slf4j
    6. public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
    7. @Autowired
    8. private CacheService cacheService;
    9. @Autowired
    10. private KafkaTemplate<String,String> kafkaTemplate;
    11. @Override
    12. public ResponseResult like(LikesBehaviorDto dto) {
    13. //1.检查参数
    14. if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
    15. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    16. }
    17. //2.是否登录
    18. ApUser user = AppThreadLocalUtil.getUser();
    19. if (user == null) {
    20. return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
    21. }
    22. UpdateArticleMess mess = new UpdateArticleMess();
    23. mess.setArticleId(dto.getArticleId());
    24. mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
    25. //3.点赞 保存数据
    26. if (dto.getOperation() == 0) {
    27. Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
    28. if (obj != null) {
    29. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
    30. }
    31. // 保存当前key
    32. log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
    33. cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
    34. mess.setAdd(1);
    35. } else {
    36. // 删除当前key
    37. log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
    38. cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
    39. mess.setAdd(-1);
    40. }
    41. //发送消息,数据聚合
    42. kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
    43. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    44. }
    45. /**
    46. * 检查参数
    47. *
    48. * @return
    49. */
    50. private boolean checkParam(LikesBehaviorDto dto) {
    51. if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
    52. return true;
    53. }
    54. return false;
    55. }
    56. }

    ③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

    完整代码:

    1. package com.heima.behavior.service.impl;
    2. import org.springframework.transaction.annotation.Transactional;
    3. @Service
    4. @Transactional
    5. @Slf4j
    6. public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
    7. @Autowired
    8. private CacheService cacheService;
    9. @Autowired
    10. private KafkaTemplate<String,String> kafkaTemplate;
    11. @Override
    12. public ResponseResult readBehavior(ReadBehaviorDto dto) {
    13. //1.检查参数
    14. if (dto == null || dto.getArticleId() == null) {
    15. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    16. }
    17. //2.是否登录
    18. ApUser user = AppThreadLocalUtil.getUser();
    19. if (user == null) {
    20. return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
    21. }
    22. //更新阅读次数
    23. String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
    24. if (StringUtils.isNotBlank(readBehaviorJson)) {
    25. ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
    26. dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
    27. }
    28. // 保存当前key
    29. log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
    30. cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
    31. //发送消息,数据聚合
    32. UpdateArticleMess mess = new UpdateArticleMess();
    33. mess.setArticleId(dto.getArticleId());
    34. mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
    35. mess.setAdd(1);
    36. kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
    37. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    38. }
    39. }
    3.2.2 使用kafkaStream实时接收消息,聚合内容

    ①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

    ②定义实体类,用于聚合之后的分值封装

    1. package com.heima.model.article.mess;
    2. import lombok.Data;
    3. @Data
    4. public class ArticleVisitStreamMess {
    5. /**
    6. * 文章id
    7. */
    8. private Long articleId;
    9. /**
    10. * 阅读
    11. */
    12. private int view;
    13. /**
    14. * 收藏
    15. */
    16. private int collect;
    17. /**
    18. * 评论
    19. */
    20. private int comment;
    21. /**
    22. * 点赞
    23. */
    24. private int like;
    25. }

    修改常量类:增加常量

    1. package com.heima.common.constans;
    2. public class HotArticleConstants {
    3. public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    4. public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
    5. }

    ③ 定义stream,接收消息并聚合

    1. package com.heima.article.stream;
    2. import java.time.Duration;
    3. @Configuration
    4. @Slf4j
    5. public class HotArticleStreamHandler {
    6. @Bean
    7. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
    8. //接收消息
    9. KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
    10. //聚合流式处理
    11. stream.map((key,value)->{
    12. UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
    13. //重置消息的key:1234343434value: likes:1
    14. return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
    15. })
    16. //按照文章id进行聚合
    17. .groupBy((key,value)->key)
    18. //时间窗口
    19. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    20. /**
    21. * 自行的完成聚合的计算
    22. */
    23. .aggregate(new Initializer<String>() {
    24. /**
    25. * 初始方法,返回值是消息的value
    26. * @return
    27. */
    28. @Override
    29. public String apply() {
    30. return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
    31. }
    32. /**
    33. * 真正的聚合操作,返回值是消息的value
    34. */
    35. }, new Aggregator<String, String, String>() {
    36. @Override
    37. public String apply(String key, String value, String aggValue) {
    38. if(StringUtils.isBlank(value)){
    39. return aggValue;
    40. }
    41. String[] aggAry = aggValue.split(",");
    42. int col = 0,com=0,lik=0,vie=0;
    43. for (String agg : aggAry) {
    44. String[] split = agg.split(":");
    45. /**
    46. * 获得初始值,也是时间窗口内计算之后的值
    47. */
    48. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
    49. case COLLECTION:
    50. col = Integer.parseInt(split[1]);
    51. break;
    52. case COMMENT:
    53. com = Integer.parseInt(split[1]);
    54. break;
    55. case LIKES:
    56. lik = Integer.parseInt(split[1]);
    57. break;
    58. case VIEWS:
    59. vie = Integer.parseInt(split[1]);
    60. break;
    61. }
    62. }
    63. /**
    64. * 累加操作
    65. */
    66. String[] valAry = value.split(":");
    67. switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
    68. case COLLECTION:
    69. col += Integer.parseInt(valAry[1]);
    70. break;
    71. case COMMENT:
    72. com += Integer.parseInt(valAry[1]);
    73. break;
    74. case LIKES:
    75. lik += Integer.parseInt(valAry[1]);
    76. break;
    77. case VIEWS:
    78. vie += Integer.parseInt(valAry[1]);
    79. break;
    80. }
    81. String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
    82. System.out.println("文章的id:"+key);
    83. System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
    84. return formatStr;
    85. }
    86. }, Materialized.as("hot-atricle-stream-count-001"))
    87. .toStream()
    88. .map((key,value)->{
    89. return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
    90. })
    91. //发送消息
    92. .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
    93. return stream;
    94. }
    95. /**
    96. * 格式化消息的value数据
    97. * @param articleId
    98. * @param value
    99. * @return
    100. */
    101. public String formatObj(String articleId,String value){
    102. ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
    103. mess.setArticleId(Long.valueOf(articleId));
    104. //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
    105. String[] valAry = value.split(",");
    106. for (String val : valAry) {
    107. String[] split = val.split(":");
    108. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
    109. case COLLECTION:
    110. mess.setCollect(Integer.parseInt(split[1]));
    111. break;
    112. case COMMENT:
    113. mess.setComment(Integer.parseInt(split[1]));
    114. break;
    115. case LIKES:
    116. mess.setLike(Integer.parseInt(split[1]));
    117. break;
    118. case VIEWS:
    119. mess.setView(Integer.parseInt(split[1]));
    120. break;
    121. }
    122. }
    123. log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
    124. return JSON.toJSONString(mess);
    125. }
    126. }
    3.2.3 重新计算文章的分值,更新到数据库和缓存中

    ①在ApArticleService添加方法,用于更新数据库中的文章分值

    1. /**
    2. * 更新文章的分值 同时更新缓存中的热点文章数据
    3. * @param mess
    4. */
    5. public void updateScore(ArticleVisitStreamMess mess);

    实现类方法

    1. /**
    2. * 更新文章的分值 同时更新缓存中的热点文章数据
    3. * @param mess
    4. */
    5. @Override
    6. public void updateScore(ArticleVisitStreamMess mess) {
    7. //1.更新文章的阅读、点赞、收藏、评论的数量
    8. ApArticle apArticle = updateArticle(mess);
    9. //2.计算文章的分值
    10. Integer score = computeScore(apArticle);
    11. score = score * 3;
    12. //3.替换当前文章对应频道的热点数据
    13. replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
    14. //4.替换推荐对应的热点数据
    15. replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
    16. }
    17. /**
    18. * 替换数据并且存入到redis
    19. * @param apArticle
    20. * @param score
    21. * @param s
    22. */
    23. private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    24. String articleListStr = cacheService.get(s);
    25. if (StringUtils.isNotBlank(articleListStr)) {
    26. List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
    27. boolean flag = true;
    28. //如果缓存中存在该文章,只更新分值
    29. for (HotArticleVo hotArticleVo : hotArticleVoList) {
    30. if (hotArticleVo.getId().equals(apArticle.getId())) {
    31. hotArticleVo.setScore(score);
    32. flag = false;
    33. break;
    34. }
    35. }
    36. //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
    37. if (flag) {
    38. if (hotArticleVoList.size() >= 30) {
    39. hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
    40. HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
    41. if (lastHot.getScore() < score) {
    42. hotArticleVoList.remove(lastHot);
    43. HotArticleVo hot = new HotArticleVo();
    44. BeanUtils.copyProperties(apArticle, hot);
    45. hot.setScore(score);
    46. hotArticleVoList.add(hot);
    47. }
    48. } else {
    49. HotArticleVo hot = new HotArticleVo();
    50. BeanUtils.copyProperties(apArticle, hot);
    51. hot.setScore(score);
    52. hotArticleVoList.add(hot);
    53. }
    54. }
    55. //缓存到redis
    56. hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
    57. cacheService.set(s, JSON.toJSONString(hotArticleVoList));
    58. }
    59. }
    60. /**
    61. * 更新文章行为数量
    62. * @param mess
    63. */
    64. private ApArticle updateArticle(ArticleVisitStreamMess mess) {
    65. ApArticle apArticle = getById(mess.getArticleId());
    66. apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
    67. apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
    68. apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
    69. apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    70. updateById(apArticle);
    71. return apArticle;
    72. }
    73. /**
    74. * 计算文章的具体分值
    75. * @param apArticle
    76. * @return
    77. */
    78. private Integer computeScore(ApArticle apArticle) {
    79. Integer score = 0;
    80. if(apArticle.getLikes() != null){
    81. score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
    82. }
    83. if(apArticle.getViews() != null){
    84. score += apArticle.getViews();
    85. }
    86. if(apArticle.getComment() != null){
    87. score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
    88. }
    89. if(apArticle.getCollection() != null){
    90. score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
    91. }
    92. return score;
    93. }

  • 相关阅读:
    C规范编辑笔记(一)
    分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
    Docker容器+DPDK+virtio网卡的使用与测试
    JS学习笔记
    javascript之防抖(debounce)和节流(throttle)
    2023/10/05 部分汇编指令
    【Qt之JSON文件】QJsonDocument、QJsonObject、QJsonArray等类介绍及使用
    Docker快速构建HaProxy集群,并配置好rabbitmq的负载均衡
    TensorFlow(1):深度学习的介绍
    mp3stego(mp3隐写工具)使用手册
  • 原文地址:https://blog.csdn.net/m0_67184231/article/details/132877209