• 大数据——Spark Streaming


    是什么

    Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。
    之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个实时的数据大屏,显示实时订单。
    在这里插入图片描述
    实时计算框架对比

    框架类别框架类型数据单位其他吞吐量延迟
    Storm流式计算框架record的处理数据单位支持micro-batch方式一般更低
    Spark批处理计算框架RDD处理数据单位支持micro-batch流式处理数据更强一般

    Spark Streaming组件

    • Streaming Context
      • 一个Context启动,则不能有新的DStream建立或者添加;
      • 一个Context停止,不能重新启动;
      • 在JVM中,只能有一个Streaming Context活跃;一个Spark Context会创建一个Streaming Context;
      • Streaming Context上调用stop方法,SparkContext也会关闭,如果只想关闭Streaming Context,可以设置stop()方法里的false参数;
      • 一个SparkContext对象可以重复创建多个Streaming Context对象,但每次只能运行一个,即需要关闭一个再开下一个。
    • DStream
      • 表示一个连续的数据流;
      • DStream内部是由一系列的RDD组成;
      • DStream中的每个RDD都有确定时间间隔内的数据;
      • 对DStream的操作都转换成对DStream隐含的RDD操作;
      • 数据源:
    数据源类型
    基本源TCP/IP or FileSystem
    高级源Kafka or Flume

    Spark Streaming编码步骤

    import os
    # 配置spark driver和pyspark运⾏时,所使⽤的python解释器路径
    PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
    JAVA_HOME='/root/bigdata/jdk'
    SPARK_HOME = "/root/bigdata/spark"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    os.environ['JAVA_HOME']=JAVA_HOME
    os.environ["SPARK_HOME"] = SPARK_HOME
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
    	sc = SparkContext("local[2]",appName="NetworkWordCount")
    	#参数2:指定执⾏计算的时间间隔
    	ssc = StreamingContext(sc, 1)
    	#监听ip,端⼝上的上的数据
    	lines = ssc.socketTextStream('localhost',9999)
    	#将数据按空格进⾏拆分为多个单词
    	words = lines.flatMap(lambda line: line.split(" "))
    	#将单词转换为(单词,1)的形式
    	pairs = words.map(lambda word:(word,1))
    	#统计单词个数
    	wordCounts = pairs.reduceByKey(lambda x,y:x+y)
    	#打印结果信息,会使得前⾯的transformation操作执⾏
    	wordCounts.pprint()
    	#启动StreamingContext
    	ssc.start()
    	#等待计算结束
    	ssc.awaitTermination()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    Spark Streaming状态操作

    Spark Streaming存在两种状态操作:UpdateStateByKey和Window操作。

    • updateStateByKey
      如果没有updateStateByKey,我们需要将每一秒的数据计算好放入mysql中,再用mysql进行计算,而updateStateByKey将每隔一段数据进行打包,封装成RDD,这样每个时间片段的数据之间是没有关联的。一般为以下步骤:
    1. ⾸先,要定义⼀个state,可以是任意的数据类型
    2. 其次,要定义state更新函数–指定⼀个函数如何使⽤之前的state和新值来更新state
    3. 对于每个batch,Spark都会为每个之前已经存在的key去应⽤⼀次state更新函数,⽆论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
    4. 对于每个新出现的key,也会执⾏state更新函数
    • Window
      在这里插入图片描述
      Window操作是基于窗⼝⻓度和滑动间隔来⼯作的;窗⼝的⻓度控制考虑前⼏批次数据量;默认为批处理的滑动间隔来确定计算结果的频率。
      窗口长度L是运算的数据量;
      滑动间隔G是控制每隔多长时间做一次运算。
  • 相关阅读:
    STM32:串口发送/接收HEX数据包代码篇(内含:实物图接线图+代码部分+个人笔记)
    gitlab
    Java-内部类详解
    AIGC ChatGPT 4 将数据接口文件使用Python进行入库Mysql
    【基础】JVM(JVM优化、垃圾回收、内存泄漏)面试题
    试玩ESP32S3 BOX Lite
    V4L2里的FourCC编码
    手把手教你springboot集成mybatis
    Opencv3.4版本+ffmpeg联合编译
    vue项目中使用svg
  • 原文地址:https://blog.csdn.net/gjinc/article/details/133671618