kafkaStream
实时计算
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
可以随时更新公交车方位,计算多久到达站牌等
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回4
了alice
。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
(1)需求分析,求单词个数(word count)
(2)引入依赖
在之前的kafka-demo工程的pom文件中引入
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>connect-json</artifactId>
- <groupId>org.apache.kafka</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
(3)创建原生的kafka staream入门案例
- package com.heima.kafka.sample;
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Properties;
-
- /**
- * 流式处理
- */
- public class KafkaStreamQuickStart {
-
- public static void main(String[] args) {
-
- //kafka的配置信息
- Properties prop = new Properties();
- prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
- prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
-
- //stream 构建器
- StreamsBuilder streamsBuilder = new StreamsBuilder();
-
- //流式计算
- streamProcessor(streamsBuilder);
-
- //创建kafkaStream对象
- KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
- //开启流式计算
- kafkaStreams.start();
- }
-
- /**
- * 流式计算
- * 消息的内容:hello kafka hello itcast
- * @param streamsBuilder
- */
- private static void streamProcessor(StreamsBuilder streamsBuilder) {
- //创建kstream对象,同时指定从那个topic中接收消息
- KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
- /**
- * 处理消息的value
- */
- stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.split(" "));
- }
- })
- //按照value进行聚合处理
- .groupBy((key,value)->value)
- //时间窗口
- .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
- //统计单词的个数
- .count()
- //转换为kStream
- .toStream()
- .map((key,value)->{
- System.out.println("key:"+key+",vlaue:"+value);
- return new KeyValue<>(key.key().toString(),value.toString());
- })
- //发送消息
- .to("itcast-topic-out");
- }
- }
(4)测试准备
结果:
(1)自定配置参数
- package com.heima.kafka.config;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
- */
- @Setter
- @Getter
- @Configuration
- @EnableKafkaStreams
- @ConfigurationProperties(prefix="kafka")
- public class KafkaStreamConfig {
- private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
- private String hosts;
- private String group;
- @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
- public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
- Map<String, Object> props = new HashMap<>();
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//连接信息
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//组
- props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//应用名称
- props.put(StreamsConfig.RETRIES_CONFIG, 10);//重试次数
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- return new KafkaStreamsConfiguration(props);
- }
- }
修改application.yml文件,在最下方添加自定义配置
- kafka:
- hosts: 192.168.200.130:9092
- group: ${spring.application.name}
(2)新增配置类,创建KStream对象,进行聚合
- package com.heima.kafka.stream;
- import java.time.Duration;
- import java.util.Arrays;
-
- @Configuration
- @Slf4j
- public class KafkaStreamHelloListener {
-
- @Bean
- public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
- //创建kstream对象,同时指定从那个topic中接收消息
- KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
- stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.split(" "));
- }
- })
- //根据value进行聚合分组
- .groupBy((key,value)->value)
- //聚合计算时间间隔
- .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
- //求单词的个数
- .count()
- .toStream()
- //处理后的结果转换为string字符串
- .map((key,value)->{
- System.out.println("key:"+key+",value:"+value);
- return new KeyValue<>(key.key().toString(),value.toString());
- })
- //发送消息
- .to("itcast-topic-out");
- return stream;
- }
- }
测试:
启动微服务,正常发送消息,可以正常接收到消息
①在heima-leadnews-behavior微服务中集成kafka生产者配置
修改nacos,新增内容
- spring:
- application:
- name: leadnews-behavior
- kafka:
- bootstrap-servers: 192.168.200.130:9092
- producer:
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
②修改ApLikesBehaviorServiceImpl新增发送消息
定义消息发送封装类:UpdateArticleMess
- package com.heima.model.mess;
-
- import lombok.Data;
-
- @Data
- public class UpdateArticleMess {
-
- /**
- * 修改文章的字段类型
- */
- private UpdateArticleType type;
- /**
- * 文章ID
- */
- private Long articleId;
- /**
- * 修改数据的增量,可为正负
- */
- private Integer add;
-
- public enum UpdateArticleType{
- COLLECTION,COMMENT,LIKES,VIEWS;
- }
- }
topic常量类:
- package com.heima.common.constants;
- public class HotArticleConstants {
- public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
- }
完整代码如下:
- package com.heima.behavior.service.impl;
-
- import org.springframework.transaction.annotation.Transactional;
-
- @Service
- @Transactional
- @Slf4j
- public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
-
- @Autowired
- private CacheService cacheService;
-
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
-
- @Override
- public ResponseResult like(LikesBehaviorDto dto) {
-
- //1.检查参数
- if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
- }
-
- //2.是否登录
- ApUser user = AppThreadLocalUtil.getUser();
- if (user == null) {
- return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
- }
-
- UpdateArticleMess mess = new UpdateArticleMess();
- mess.setArticleId(dto.getArticleId());
- mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
-
- //3.点赞 保存数据
- if (dto.getOperation() == 0) {
- Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
- if (obj != null) {
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
- }
- // 保存当前key
- log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
- cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
- mess.setAdd(1);
- } else {
- // 删除当前key
- log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
- cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
- mess.setAdd(-1);
- }
-
- //发送消息,数据聚合
- kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
-
- return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
- }
- /**
- * 检查参数
- *
- * @return
- */
- private boolean checkParam(LikesBehaviorDto dto) {
- if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
- return true;
- }
- return false;
- }
- }
③修改阅读行为的类ApReadBehaviorServiceImpl发送消息
完整代码:
- package com.heima.behavior.service.impl;
- import org.springframework.transaction.annotation.Transactional;
-
- @Service
- @Transactional
- @Slf4j
- public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
-
- @Autowired
- private CacheService cacheService;
-
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
-
- @Override
- public ResponseResult readBehavior(ReadBehaviorDto dto) {
- //1.检查参数
- if (dto == null || dto.getArticleId() == null) {
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
- }
-
- //2.是否登录
- ApUser user = AppThreadLocalUtil.getUser();
- if (user == null) {
- return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
- }
- //更新阅读次数
- String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
- if (StringUtils.isNotBlank(readBehaviorJson)) {
- ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
- dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
- }
- // 保存当前key
- log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
- cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
-
- //发送消息,数据聚合
- UpdateArticleMess mess = new UpdateArticleMess();
- mess.setArticleId(dto.getArticleId());
- mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
- mess.setAdd(1);
- kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
-
- return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
- }
- }
①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)
②定义实体类,用于聚合之后的分值封装
- package com.heima.model.article.mess;
- import lombok.Data;
-
- @Data
- public class ArticleVisitStreamMess {
- /**
- * 文章id
- */
- private Long articleId;
- /**
- * 阅读
- */
- private int view;
- /**
- * 收藏
- */
- private int collect;
- /**
- * 评论
- */
- private int comment;
- /**
- * 点赞
- */
- private int like;
- }
修改常量类:增加常量
- package com.heima.common.constans;
- public class HotArticleConstants {
- public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
- public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
- }
③ 定义stream,接收消息并聚合
- package com.heima.article.stream;
- import java.time.Duration;
-
- @Configuration
- @Slf4j
- public class HotArticleStreamHandler {
-
- @Bean
- public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
- //接收消息
- KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
- //聚合流式处理
- stream.map((key,value)->{
- UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
- //重置消息的key:1234343434 和 value: likes:1
- return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
- })
- //按照文章id进行聚合
- .groupBy((key,value)->key)
- //时间窗口
- .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
- /**
- * 自行的完成聚合的计算
- */
- .aggregate(new Initializer<String>() {
- /**
- * 初始方法,返回值是消息的value
- * @return
- */
- @Override
- public String apply() {
- return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
- }
- /**
- * 真正的聚合操作,返回值是消息的value
- */
- }, new Aggregator<String, String, String>() {
- @Override
- public String apply(String key, String value, String aggValue) {
- if(StringUtils.isBlank(value)){
- return aggValue;
- }
- String[] aggAry = aggValue.split(",");
- int col = 0,com=0,lik=0,vie=0;
- for (String agg : aggAry) {
- String[] split = agg.split(":");
- /**
- * 获得初始值,也是时间窗口内计算之后的值
- */
- switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
- case COLLECTION:
- col = Integer.parseInt(split[1]);
- break;
- case COMMENT:
- com = Integer.parseInt(split[1]);
- break;
- case LIKES:
- lik = Integer.parseInt(split[1]);
- break;
- case VIEWS:
- vie = Integer.parseInt(split[1]);
- break;
- }
- }
- /**
- * 累加操作
- */
- String[] valAry = value.split(":");
- switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
- case COLLECTION:
- col += Integer.parseInt(valAry[1]);
- break;
- case COMMENT:
- com += Integer.parseInt(valAry[1]);
- break;
- case LIKES:
- lik += Integer.parseInt(valAry[1]);
- break;
- case VIEWS:
- vie += Integer.parseInt(valAry[1]);
- break;
- }
-
- String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
- System.out.println("文章的id:"+key);
- System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
- return formatStr;
- }
- }, Materialized.as("hot-atricle-stream-count-001"))
- .toStream()
- .map((key,value)->{
- return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
- })
- //发送消息
- .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
-
- return stream;
- }
-
- /**
- * 格式化消息的value数据
- * @param articleId
- * @param value
- * @return
- */
- public String formatObj(String articleId,String value){
- ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
- mess.setArticleId(Long.valueOf(articleId));
- //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
- String[] valAry = value.split(",");
- for (String val : valAry) {
- String[] split = val.split(":");
- switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
- case COLLECTION:
- mess.setCollect(Integer.parseInt(split[1]));
- break;
- case COMMENT:
- mess.setComment(Integer.parseInt(split[1]));
- break;
- case LIKES:
- mess.setLike(Integer.parseInt(split[1]));
- break;
- case VIEWS:
- mess.setView(Integer.parseInt(split[1]));
- break;
- }
- }
- log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
- return JSON.toJSONString(mess);
- }
- }
①在ApArticleService添加方法,用于更新数据库中的文章分值
- /**
- * 更新文章的分值 同时更新缓存中的热点文章数据
- * @param mess
- */
- public void updateScore(ArticleVisitStreamMess mess);
实现类方法
- /**
- * 更新文章的分值 同时更新缓存中的热点文章数据
- * @param mess
- */
- @Override
- public void updateScore(ArticleVisitStreamMess mess) {
- //1.更新文章的阅读、点赞、收藏、评论的数量
- ApArticle apArticle = updateArticle(mess);
- //2.计算文章的分值
- Integer score = computeScore(apArticle);
- score = score * 3;
-
- //3.替换当前文章对应频道的热点数据
- replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
-
- //4.替换推荐对应的热点数据
- replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
- }
-
- /**
- * 替换数据并且存入到redis
- * @param apArticle
- * @param score
- * @param s
- */
- private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
- String articleListStr = cacheService.get(s);
- if (StringUtils.isNotBlank(articleListStr)) {
- List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
-
- boolean flag = true;
-
- //如果缓存中存在该文章,只更新分值
- for (HotArticleVo hotArticleVo : hotArticleVoList) {
- if (hotArticleVo.getId().equals(apArticle.getId())) {
- hotArticleVo.setScore(score);
- flag = false;
- break;
- }
- }
-
- //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
- if (flag) {
- if (hotArticleVoList.size() >= 30) {
- hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
- HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
- if (lastHot.getScore() < score) {
- hotArticleVoList.remove(lastHot);
- HotArticleVo hot = new HotArticleVo();
- BeanUtils.copyProperties(apArticle, hot);
- hot.setScore(score);
- hotArticleVoList.add(hot);
- }
-
- } else {
- HotArticleVo hot = new HotArticleVo();
- BeanUtils.copyProperties(apArticle, hot);
- hot.setScore(score);
- hotArticleVoList.add(hot);
- }
- }
- //缓存到redis
- hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
- cacheService.set(s, JSON.toJSONString(hotArticleVoList));
- }
- }
-
- /**
- * 更新文章行为数量
- * @param mess
- */
- private ApArticle updateArticle(ArticleVisitStreamMess mess) {
- ApArticle apArticle = getById(mess.getArticleId());
- apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
- apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
- apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
- apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
- updateById(apArticle);
- return apArticle;
- }
-
- /**
- * 计算文章的具体分值
- * @param apArticle
- * @return
- */
- private Integer computeScore(ApArticle apArticle) {
- Integer score = 0;
- if(apArticle.getLikes() != null){
- score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
- }
- if(apArticle.getViews() != null){
- score += apArticle.getViews();
- }
- if(apArticle.getComment() != null){
- score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
- }
- if(apArticle.getCollection() != null){
- score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
- }
- return score;
- }