• SparkStreaming入门


    概述

    实时/离线

    • 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
    • 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。

    SparkStreaming简介

    1. 支持的输入源:Kafka, Flume, HDFS等
    2. 数据输入后,可以用RDD处理数据
    3. 结果可以保存在很多地方,比如HDFS,数据库等

    SparkStreaming架构

    DStream

    SparkCore的基本单位RDD
    SparkSQL的基本单位是DataFreme, DataSet
    Spark Streaming的基本单位是Dstream

    每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。

    架构图

    在这里插入图片描述
    由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。

    DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。

    Hello World案例

    1. 添加依赖
    <dependencies>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-streaming_2.12artifactId>
            <version>3.3.1version>
        dependency>
    
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-core_2.12artifactId>
            <version>3.3.1version>
    dependency>
    
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-streaming-kafka-0-10_2.12artifactId>
        <version>3.3.1version>
     dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1. 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    
    
    public class Test01_HelloWorld {
        public static void main(String[] args) throws InterruptedException {
            // 创建流环境
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));
    
            // 创建配置参数
            HashMap<String, Object> map = new HashMap<>();
            map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
            map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");
            map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    
            // 需要消费的主题
            ArrayList<String> strings = new ArrayList<>();
            strings.add("topic_db");
    
            JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));
    
            JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
                @Override
                public Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                    String[] words = consumerRecord.value().split(" ");
                    return Arrays.stream(words).iterator();
                }
            });
    		
            flatMap .print();
            // 执行流的任务
            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();//线程阻塞
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    window算子窗口操作

    由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。

    窗口时长:计算内容的时间范围
    滑动步长:隔多久触发一次计算

    //4 添加窗口 窗口大小12s 滑动步长6s
            JavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));
    
            //5 对加过窗口的数据流进行计算
            JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    JAVA毕业设计科普网站计算机源码+lw文档+系统+调试部署+数据库
    就业班 第三阶段(nginx) 2401--4.25 day4 nginx4 流量控制+访问
    Java 异常中 e.getMessage() 和 e.toString() e.printStackTrace()的区别
    2 蓝桥杯打题记录
    SVG的常用元素和动画运用
    双十一数码产品开箱体验当贝NEW D3X,颜值高功能多,不愧是三千元性价比最高
    编程实例:多人同时计时计费管理系统软件,可适用于钓场计时等管理
    人生苦短,我用Python。
    ASUS华硕天选2锐龙版笔记本电脑FA506ICB/FA706IC原装出厂Windows11系统,预装OEM系统恢复安装开箱状态
    学生台灯用led灯好还是荧光灯好?推荐几款高品质的LED灯
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/133957404