目录
a、使用场景:测试
b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理
- val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
- val ssc = new StreamingContext(sparkconf,Seconds(3))
-
- // 创建一个队列对象,队列中存放的是RDD
- val queue = new mutable.Queue[RDD[String]]()
- // 通过队列创建DStream
- val queueDS: InputDStream[String] = ssc.queueStream(queue)
-
- queueDS.print()
-
- // 启动采集器
- ssc.start()
- //这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产
- for(i <- 1 to 5 ){
- // 循环创建rdd
- val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
- // 将RDD存放到队列中
- queue.enqueue(rdd)
- // 当前线程休眠1秒
- Thread.sleep(6000)
- }
- // 等待采集器的结束
- ssc.awaitTermination()
- }
- val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
- val ssc = new StreamingContext(sparkConf,Seconds(3))
-
- //从文件中读取数据
- val textDS: DStream[String] = ssc.textFileStream("in")
- textDS.print()
-
- // 启动采集器
- ssc.start()
-
- // 等待采集器的结束
- ssc.awaitTermination()
1. 自定义采集器
2. 什么情况下需要自定采集器呢?
比如从mysql、hbase中读取数据。
采集器的作用是从指定的地方,按照采集周期对数据进行采集。
目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
3. 自定义采集器的步骤,模仿socketTextStream
a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
b、重写onStart和onStop方法
onStart:采集器的如何启动
onStop:采集的如何停止
- val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
- val ssc = new StreamingContext(sparkConf, Seconds(3))
-
- // 获取采集的流
- val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
- ds.print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-
- // 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
- class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
-
- private var socket: Socket = _
- def receive = {
- // 获取输入流
- val reader = new BufferedReader(
- new InputStreamReader(
- socket.getInputStream,
- "UTF-8"
- )
- )
- // 设定一个间接变量
- var s: String = null
- while (true) {
- // 按行读取数据
- s = reader.readLine()
- if (s != null) {
- // 将数据进行封装
- store(s)
- }
- }
-
- }
-
- // 1. 启动采集器
- override def onStart(): Unit = {
- socket = new Socket(host, port)
- new Thread("Socket Receiver") {
- setDaemon(true)
- override def run() {
- receive
- }
- }.start()
-
-
- }
-
- // 2. 停止采集器
- override def onStop(): Unit = {
- socket.close()
- socket = null
-
-
- }
- }
-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法
- // TODO Spark环境
- // SparkStreaming使用核数最少是2个
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
- val ssc = new StreamingContext(sparkConf, Seconds(3))
-
- // TODO 使用SparkStreaming读取Kafka的数据
-
- // Kafka的配置信息
- val kafkaPara: Map[String, Object] = Map[String, Object](
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
- ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
- "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
- "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
- )
-
- val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
- KafkaUtils.createDirectStream[String, String](
- ssc,
- LocationStrategies.PreferConsistent,
- ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
- )
- // 获取数据,key是null,value是真实的数据
- val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
-
- valueDStream.flatMap(_.split(" "))
- .map((_, 1))
- .reduceByKey(_ + _)
- .print()
-
- ssc.start()
- // 等待采集器的结束
- ssc.awaitTermination()