• kafkaStream实时流式计算


    2 实时流式计算

    2.1 概念

    一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

    流式计算就相当于扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

    2.2 应用场景

    • 日志分析

      网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

    • 大屏看板统计

      可以实时的查看网站注册数量,订单数量,购买数量,金额等。

    • 公交实时数据

      可以随时更新公交车方位,计算多久到达站牌等

    • 实时文章分值计算

      头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

    2.3 技术方案选型

    • Hadoop

    • Apche Storm

      Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

    • Kafka Stream

      可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

    3 Kafka Stream

    3.1 概述

    Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

    Kafka Stream的特点如下:

    • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
    • 除了Kafka外,无任何外部依赖
    • 充分利用Kafka分区机制实现水平扩展和顺序性保证
    • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
    • 支持正好一次处理语义
    • 提供记录级的处理能力,从而实现毫秒级的低延迟
    • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
    • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

    在这里插入图片描述

    3.2 Kafka Streams的关键概念

    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
    • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

    在这里插入图片描述

    3.3 KStream

    (1)数据结构类似于map,key-value键值对

    (2)KStream

    在这里插入图片描述

    KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
    数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

    KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

    为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

    (“ alice”,1)->(“” alice“,3)

    如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

    3.4 Kafka Stream入门案例编写

    (1)需求分析,求单词个数(word count)

    在这里插入图片描述

    (2)引入依赖

    在之前的kafka-demo工程的pom文件中引入

    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-streamsartifactId>
        <exclusions>
            <exclusion>
                <artifactId>connect-jsonartifactId>
                <groupId>org.apache.kafkagroupId>
            exclusion>
            <exclusion>
                <groupId>org.apache.kafkagroupId>
                <artifactId>kafka-clientsartifactId>
            exclusion>
        exclusions>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    (3)创建原生的kafka staream入门案例

    package com.heima.kafka.sample;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.kstream.ValueMapper;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * 流式处理
     */
    public class KafkaStreamQuickStart {
    
        public static void main(String[] args) {
    
            //kafka的配置信心
            Properties prop = new Properties();
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
            prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
    
            //stream 构建器
            StreamsBuilder streamsBuilder = new StreamsBuilder();
    
            //流式计算
            streamProcessor(streamsBuilder);
    
    
            //创建kafkaStream对象
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
            //开启流式计算
            kafkaStreams.start();
        }
    
        /**
         * 流式计算
         * 消息的内容:hello kafka  hello itcast
         * @param streamsBuilder
         */
        private static void streamProcessor(StreamsBuilder streamsBuilder) {
            //创建kstream对象,同时指定从那个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String value) {
                    return Arrays.asList(value.split(" "));
                }
            })
                    //按照value进行聚合处理
                    .groupBy((key,value)->value)
                    //时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    //统计单词的个数
                    .count()
                    //转换为kStream
                    .toStream()
                    .map((key,value)->{
                        System.out.println("key:"+key+",vlaue:"+value);
                        return new KeyValue<>(key.key().toString(),value.toString());
                    })
                    //发送消息
                    .to("itcast-topic-out");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    (4)测试准备

    • 使用生产者在topic为:itcast_topic_input中发送多条消息

    • 使用消费者接收topic为:itcast_topic_out

    结果:

    • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

    3.5 SpringBoot集成Kafka Stream

    (1)自定配置参数

    package com.heima.kafka.config;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafkaStreams;
    import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
    import org.springframework.kafka.config.KafkaStreamsConfiguration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
     */
    
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {
        private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
        private String hosts;
        private String group;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
            props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
            props.put(StreamsConfig.RETRIES_CONFIG, 10);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            return new KafkaStreamsConfiguration(props);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    修改application.yml文件,在最下方添加自定义配置

    kafka:
      hosts: 192.168.200.130:9092
      group: ${spring.application.name}
    
    • 1
    • 2
    • 3

    (2)新增配置类,创建KStream对象,进行聚合

    package com.heima.kafka.stream;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.kstream.ValueMapper;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.time.Duration;
    import java.util.Arrays;
    
    @Configuration
    @Slf4j
    public class KafkaStreamHelloListener {
    
        @Bean
        public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
            //创建kstream对象,同时指定从那个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String value) {
                    return Arrays.asList(value.split(" "));
                }
            })
                    //根据value进行聚合分组
                    .groupBy((key,value)->value)
                    //聚合计算时间间隔
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    //求单词的个数
                    .count()
                    .toStream()
                    //处理后的结果转换为string字符串
                    .map((key,value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(),value.toString());
                    })
                    //发送消息
                    .to("itcast-topic-out");
            return stream;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    测试:

    ​ 启动微服务,正常发送消息,可以正常接收到消息

  • 相关阅读:
    [量化投资-学习笔记016]Python+TDengine从零开始搭建量化分析平台-日志输出
    求最小生成树
    DefaultListableBeanFactory
    Spring Security(五)密码加密与更新密码
    特殊类的设计
    TypeScript 4.8 beta 发布:正在路上的装饰器、类型收窄增强、模板字符串类型中的 infer
    带你深入学习k8s--(四) 控制器(k8s核心)
    Marin说PCB之国产电源芯片方案 ---STC2620Q
    【软件测试】PDM、PTM、IPD介绍(捣鼓一晚上的血泪知识)
    进程与线程
  • 原文地址:https://blog.csdn.net/qq_47949604/article/details/132892633