目录
用到的全部依赖:
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.27</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>1.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
- <version>2.4.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.10.1</version>
- </dependency>
-
- </dependencies>
- <build>
- <plugins>
- <!--该插件用于把Scala代码编译成为class文件-->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- <executions>
- <execution>
- <!--声明绑定到maven的compile阶段-->
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD都会作为一个DStream处理。
循环创建几个RDD并放入队列。通过SparkStream创建Dstream计算WordCount。
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import scala.collection.mutable
-
- object RDD {
- def main(args: Array[String]): Unit = {
- //TODO 创建配置环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val ssc = new StreamingContext(sparkConf,Seconds(5))
-
- //TODO 操作
- //创建RDD队列
- val Rdd = new mutable.Queue[RDD[Int]]()
- //创建queueStream
- val imputStream = ssc.queueStream(Rdd, oneAtATime = false)
- //处理RDD数据
- val mapRDD = imputStream.map((_, 1))
- val reduceRDD = mapRDD.reduceByKey(_ + _)
- //打印
- reduceRDD.print()
- //启动任务
- ssc.start()
- //循环创建RDD
- for (i <- 1 to 50) {
- Rdd += ssc.sparkContext.makeRDD(1 to 100,5)
- Thread.sleep(2000)
- }
-
- ssc.awaitTermination()
- }
-
- }
结果:
继承Receiver,实现onStart、onStop方法来自定义数据源采集。
案例1代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.receiver.Receiver
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import scala.util.Random
-
- object UserDefined_DataSource {
- def main(args: Array[String]): Unit = {
- //TODO 创建配置环境
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
- val ssc = new StreamingContext(sparkConf,Seconds(5))
- //采集数据
- val messageDS = ssc.receiverStream(new MyReceiver())
- messageDS.print()
- //开始
- ssc.start()
- ssc.awaitTermination()
- }
- /*
- 自定义数据采集器
- 1.继承Receiver,定义泛型,传递参数
- 2.重写方法
- */
- class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
- private var fig = true
- //最初启动,读数据
- override def onStart(): Unit = {
- new Thread(new Runnable {
- override def run(): Unit = {
- while (true) {
- val message = "采集的数据为:" + new Random().nextInt(10).toString
- store(message)
- Thread.sleep(500)
- }
- }
- }).start()
- }
- //停止
- override def onStop(): Unit = {
- fig = false
- }
- }
- }
结果:
案列2代码:
- import java.io.{BufferedReader, InputStreamReader}
- import java.net.Socket
- import java.nio.charset.StandardCharsets
- import org.apache.spark.SparkConf
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.receiver.Receiver
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object UserDefined_DataSource02 {
- def main(args: Array[String]): Unit = {
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- //创建自定义数据接收器
- val lineStream = ssc.receiverStream(new MyReceiver("hadoop01", 9999))
- //数据切分
- val word = lineStream.flatMap(_.split("\t"))
- //映射为(word,1)
- val word2 = word.map((_, 1))
- //统计
- val wordCount = word2.reduceByKey(_ + _)
- //7.打印
- wordCount.print()
- //启动
- ssc.start()
- ssc.awaitTermination()
- }
- /*
- 自定义数据采集器
- 1.继承Receiver,定义泛型,传递参数
- 2.重写方法
- */
- class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
- //最初启动,调用方法
- override def onStart(): Unit = {
- new Thread("Socket Receiver") {
- override def run() {
- receive()
- }
- }.start()
- }
- //读数据并发送给Spark
- def receive(): Unit = {
- //创建Socket
- var socket: Socket = new Socket(host, port)
- //定义变量,接收数据
- var input: String = null
- //创建BufferedReader,读取数据
- val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
- //读数据
- input = reader.readLine()
- //如果receiver没有关闭且输入数据不为空
- //循环发送数据给Spark
- while (!isStopped() && input != null) {
- store(input)
- input = reader.readLine()
- }
- //跳出循环,关闭资源
- reader.close()
- socket.close()
- //重启
- restart("restart")
- }
- override def onStop(): Unit = {}
- }
- }
结果:
ReceiverAPI:需要一个专门的Executor接收数据,然后发送给其他的Executor计算。存在数据的节点内存溢出问题。
DirectAPI:由计算的Executor主动消费Kafka的数据,速度由自身控制。
添加依赖
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.10.1</version>
- </dependency>
代码:
- import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.dstream.InputDStream
- import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object UserDefined_DataSource03_kafka {
- def main(args: Array[String]): Unit = {
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Kafka")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(15))
- //定义kafka参数
- val kafkaPara: Map[String, Object] = Map[String, Object](
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092",
- ConsumerConfig.GROUP_ID_CONFIG -> "testkafka",
- "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
- "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
- )
- //读取Kafka数据创建DStream
- val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
- KafkaUtils.createDirectStream[String, String](
- ssc,
- LocationStrategies.PreferConsistent,
- ConsumerStrategies.Subscribe[String, String](Set("testkafka"), kafkaPara))
- //取出KV对
- val value = kafkaDStream.map(record => record.value())
- //计算
- value.flatMap(_.split(" "))
- .map((_,1))
- .reduceByKey(_+_)
- .print()
- //启动
- ssc.start()
- ssc.awaitTermination()
- }
- }
测试:
1)开启zookeeper
2)开启kafka
3)创建主题
进入kafka目录,运行:
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic testkafka
4)打开生产者
进入kafka目录,运行:
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic testkafka
5)打开消费者
打开另一个窗口,进入kafka目录,运行:
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic testkafka
6)运行程序
结果
DStream的操作和RDD类似,分为Transformations(转换)和Output Operations(输出)两种,转换操作中还有一些比较特殊的原语:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化和有状态转化:是否保存一个采集周期的数据,保存就是有状态,不保存就是无状态。
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个 RDD。
注意:1)针对键值对的DStream转化操作要添加import StreamingContext._才能在Scala上使用。
2)每个DStream在内部是由许多RDD组成,且无状态转化操作是分别应用到每个RDD上,例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
join
两个流之间的join需要两个流的批次大小一致才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join使与两个RDD的join效果相同。
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object Join {
- def main(args: Array[String]): Unit = {
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Join")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(15) )
- //从端口获取数据创建流
- val line01 = ssc.socketTextStream("hadoop01", 9999)
- val line02 = ssc.socketTextStream("hadoop01", 8888)
- //转换为KV类型
- val word01 = line01.flatMap(_.split(" ")).map((_,1))
- val word02 = line02.flatMap(_.split(" "))map((_,"YES"))
- //JOIN
- val join = word01.join(word02)
- //打印
- join.print()
- //启动任务
- ssc.start()
- ssc.awaitTermination()
- }
- }
测试结果:
transform
transform允许DStream上执行任意的RDD-to-RDD函数;该函数并没有在DStream的API中暴露出来方便扩展Spark API;该函数每一批次调度一次;其实也就是对DStream中的RDD应用转换。
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object Transform {
- def main(args: Array[String]): Unit = {
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Traneform")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(15) )
- //创建Dstream
- val line = ssc.socketTextStream("hadoop01", 9999)
- //转化为RDD
- val DStreamToRDD = line.transform(
- rdd => {
- val word = rdd.flatMap(_.split(" "))
- val wordmap = word.map((_, 1))
- val value = wordmap.reduceByKey(_ + _)
- value
- }
- )
- //打印
- DStreamToRDD.print()
- //启动任务
- ssc.start()
- ssc.awaitTermination()
- }
- }
测试结果:
UpdateStateByKey
UpdateStateByKey原语用于记录历史记录。
有时我们需要在DStream中跨批次维护状态(如:流计算中累加wordcount);updateStateByKey()提供对一个状态变量的访问,给一个由(键,事件)对构成的DStream,传递一个函数,该函数指定根据新的事件更新每个键对应的状态并构建出一个新的DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object UpdateStateByKey {
- def main(args: Array[String]): Unit = {
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Count")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(3))
- //使用有状态操作时需要设置检查点路径
- ssc.checkpoint("/ck")
- //创建DStream
- val lines = ssc.socketTextStream("hadoop01", 9999)
- val wordMap = lines.map((_,1))
- //updateStateByKey:根据key进行数据状态更新
- //第一个值:相同key的value数据
- //第二个值:缓冲区相同key的value数据
- val wordCount = wordMap.updateStateByKey(
- (seq: Seq[Int], buff: Option[Int]) => {
- val newCount = buff.getOrElse(0) + seq.sum
- Option(newCount)
- }
- )
- wordCount.print()
- //启动任务
- ssc.start()
- ssc.awaitTermination()
- }
-
- }
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态;所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动周期。
窗口时长:计算内容的时间范围;
滑动周期:隔多久触发一次计算;
注:这两者都必须为采集周期大小的整数倍。
例1:采集周期为3秒,窗口时长为12秒,滑动周期为6秒。
解:3秒采集一次数据,6秒计算一次,计算的时间范围为12秒也就是4次采集的数据,部分数据可能会重复计算
例2:采集周期为3秒,窗口时长为6秒,滑动周期为6秒。
(3秒采集一次数据,6秒计算一次,计算的时间范围为6秒也就是2次采集的数据,数据不会重复计算,因为计算时间和滑动时间一样)
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object WindowOperations {
- def main(args: Array[String]): Unit = {
- //TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(3) )
- //创建Dstream
- val line = ssc.socketTextStream("hadoop01", 9999)
- //window(窗口时长,滑动周期)
- //窗口的范围应该是采集数据周期的整数倍
- //默认以一个采集周期进行滑动
- //为了避免重复数据可以改变滑动的幅度,窗口时长=滑动周期时避免重复计算
- val word = line.map((_, 1))
- //窗口时长为:12秒 滑动周期为:6秒
- val windowDS = word.window(Seconds(12),Seconds(6))
- val Count = windowDS.reduceByKey(_ + _)
- Count.print()
- //启动任务
- ssc.start()
- ssc.awaitTermination()
- }
- }
测试结果:
输入13个a
结果有21个a,数据重复计算了。
Window操作的其它方法:
1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的 Dstream;
2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数会返回一个新的(K,V)对的DStream,通过对滑动窗口中的批次数据使用reduce函数来整合每个key的value值;
5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算;通过reduce进入到滑动窗口数据并反向reduce离开窗口的旧数据来实现这个操作;
一个例子是随着窗口滑动对keys的“加”“减”计数。
当窗口时长大,滑动周期小时,通过增加数据和删除数据避免重复计算;
例1:采集周期为3秒,窗口时长为6秒,滑动周期为3秒。
输入数据(个)(3秒内):0,6,4,3,1,0,0
输出(3秒):
0,6(0+6),10(6-0+4),7(10-6+3),4(7-4+1),1(4-3+0),0(1-1+0)
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object WindowOperations2 {
- def main(args: Array[String]): Unit = {
- //TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(3) )
- ssc.checkpoint("ck")
- //创建Dstream
- val line = ssc.socketTextStream("hadoop01", 9999)
- val word = line.map((_, 1))
- //窗口时长为:12秒 滑动周期为:6秒
- val windowDS = word.reduceByKeyAndWindow(
- (x:Int,y:Int) => {x + y},
- (x:Int,y:Int) => {x - y},
- Seconds(6),
- Seconds(3)
- )
- windowDS.print()
- //启动任务
- ssc.start()
- ssc.awaitTermination()
- }
- }
DStream输出操作:流数据经转化操作得到的数据所要执行的操作(保存到外部数据库或输出到屏幕)。
与RDD中的惰性求值类似,一个DStream及其派生DStream都没有被执行输出操作,则这些DStream就不会被求值;如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作:
(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。
(2)foreachRDD(func):这是最通用的输出操作,将函数func用于产生于Stream的每一个RDD;其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统(如将RDD存入文件或者通过网络将其写入数据库)。
(3)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容;每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
(4)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".。
(5)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为Hadoop files,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算;这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。
注: 1) 连接不能写在driver层面(序列化);
2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
3) 增加foreachPartition,在分区创建(获取)。
优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭。
流式任务需要7*24小时执行,有时升级代码需要主动停止程序,分布式程序没办法做到一个个进程的杀死。 应该使用外部文件系统来控制内部程序关闭。
MonitorStop
代码:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
-
- object MonitorStop2 {
- def main(args: Array[String]): Unit = {
- /*
- 线程关闭
- val thread = new Thread()
- thread.start()//线程开启
- thread.stop()//强制关闭
- */
-
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(3) )
- //创建Dstream
- val line = ssc.socketTextStream("hadoop01", 9999)
- val word = line.map((_, 1))
- word.print()
- //启动任务
- ssc.start()
- //创建新的线程用作关闭线程
- //而且需要在第三方程序中增加关闭程序
- new Thread(
- new Runnable {
- override def run(): Unit = {
- //优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭
- //Mysql:Table(stopSpark) => Row => data
- //Redis:Data(K-V)
- //ZK:/stopSpark
- //HDFS:/stopSpark
- Thread.sleep(5000)
- //获取SparkStreaming状态
- val state = ssc.getState()
- if (state == StreamingContextState.ACTIVE) {
- ssc.stop(true,true)
- }
- //退出线程
- System.exit(0)
- }
- }
- ).start()
- ssc.awaitTermination() //block阻塞main线程
- }
- }
HDFS
代码:
- import java.net.URI
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{FileSystem, Path}
- import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
-
- class MonitorStop(ssc: StreamingContext) extends Runnable {
- override def run(): Unit = {
- val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000"), new
- Configuration(), "spark")
- while (true) {
- try
- Thread.sleep(5000)
- catch {
- case e: InterruptedException =>
- e.printStackTrace()
- }
- val state: StreamingContextState = ssc.getState
- val bool: Boolean = fs.exists(new Path("hdfs://hadoop01:9000/stopSpark"))
- if (bool) {
- if (state == StreamingContextState.ACTIVE) {
- ssc.stop(stopSparkContext = true, stopGracefully = true)
- System.exit(0)
- }
- }
- }
- }
- }
SparkTest
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- object SparkTest {
- def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
- val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
- Option[Int]) => {
- //当前批次内容的计算
- val sum: Int = values.sum
- //取出状态信息中上一次状态
- val lastStatu: Int = status.getOrElse(0)
- Some(sum + lastStatu)
- }
- val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkTest")
- //设置优雅的关闭
- sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- ssc.checkpoint("./ck")
- val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
- val word: DStream[String] = line.flatMap(_.split(" "))
- val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
- val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
- wordAndCount.print()
- ssc
- }
- def main(args: Array[String]): Unit = {
- val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
- new Thread(new MonitorStop(ssc)).start()
- ssc.start()
- ssc.awaitTermination()
- }
- }
恢复数据
设置checkpoint用于恢复数据。
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- object RecoverData {
- def main(args: Array[String]): Unit = {
- //TODO 恢复数据
- val ssc = StreamingContext.getActiveOrCreate("ck", () => {
- //初始化Spark配置信息
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
- //初始化SparkStreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(3))
- //创建Dstream
- val line = ssc.socketTextStream("hadoop01", 9999)
- val word = line.map((_, 1))
- word.print()
- ssc
- })
- ssc.checkpoint("ck")
- //启动任务
- ssc.start()
- ssc.awaitTermination() //block阻塞main线程
- }
- }
本文仅仅是学习笔记的记录!!!