• Kafka/Spark-01消费topic到写出到topic


    1 Kafka的工具类

    1.1 从kafka消费数据的方法

    1. 消费者代码
      def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={
        consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)
    
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
        kafkaDStream
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 注意点
    • consumerConfigs是定义的可变的map的类型的,具体如下
    private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
        // kafka集群位置
    
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),
    
        // kv反序列化器
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        // groupId
        // offset提交  自动 手动
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
        //自动提交的时间间隔
        //ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
        // offset重置  "latest"  "earliest"
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
        // .....
      )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参

    • 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下

    在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:
    
    1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
    2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
    3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
    4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
    5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。
    
    总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    • Subscribe传参需要指定泛型,这边指定string,表示指定主题的键和值的类型,即Array(topic), consumerConfigs传参是string

    在这里插入图片描述

    • 最后方法返回一个kafkaDStream

    1.2 kafka的生产数据的方法

    1. 生产者代码
    • 创建与配置
    /**
        * 生产者对象
        */
      val producer : KafkaProducer[String,String] = createProducer()
    
      /**
        * 创建生产者对象
        */
      def createProducer():KafkaProducer[String,String] = {
        val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
        //生产者配置类 ProducerConfig
        //kafka集群位置
        //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
        //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
        producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
        //kv序列化器
        producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
        producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
        //acks
        producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
        //batch.size  16kb
        //linger.ms   0
        //retries
        //幂等配置
        producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")
    
        val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
        producer
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 生产方法
      /**
        * 生产(按照默认的黏性分区策略)
        */
      def send(topic : String  , msg : String ):Unit = {
        producer.send(new ProducerRecord[String,String](topic , msg ))
      }
    
      /**或者!
        * 生产(按照key进行分区)
        */
      def send(topic : String  , key : String ,  msg : String ):Unit = {
        producer.send(new ProducerRecord[String,String](topic , key ,  msg ))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 关闭生产
    /**
        * 关闭生产者对象
        */
      def close():Unit = {
        if(producer != null ) producer.close()
      }
    
      /**
        * 刷写 ,将缓冲区的数据刷写到磁盘
        *
        */
      def flush(): Unit ={
        producer.flush()
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2 消费数据

    2.1 消费到数据

    单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口

    在这里插入图片描述

    因此需要转换成通用的jsonobject对象

    //3. 处理数据
        //3.1 转换数据结构
        val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
          consumerRecord => {
            //获取ConsumerRecord中的value,value就是日志数据
            val log: String = consumerRecord.value()
            //转换成Json对象
            val jsonObj: JSONObject = JSON.parseObject(log)
            //返回
            jsonObj
          }
        )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.2 数据分流发送到对应topic

    1. 提取错误数据并发送到对应的topic中
    jsonObjDStream.foreachRDD(
          rdd => {
    
            rdd.foreachPartition(
              jsonObjIter => {
                for (jsonObj <- jsonObjIter) {
                  //分流过程
                  //分流错误数据
                  val errObj: JSONObject = jsonObj.getJSONObject("err")
                  if(errObj != null){
                    //将错误数据发送到 DWD_ERROR_LOG_TOPIC
                    MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )
                  }else{
                      
                  }
                }
              }
            }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 将公共字段和页面数据发送到DWD_PAGE_DISPLAY_TOPIC
      在这里插入图片描述
    else{
                    // 提取公共字段
                    val commonObj: JSONObject = jsonObj.getJSONObject("common")
                    val ar: String = commonObj.getString("ar")
                    val uid: String = commonObj.getString("uid")
                    val os: String = commonObj.getString("os")
                    val ch: String = commonObj.getString("ch")
                    val isNew: String = commonObj.getString("is_new")
                    val md: String = commonObj.getString("md")
                    val mid: String = commonObj.getString("mid")
                    val vc: String = commonObj.getString("vc")
                    val ba: String = commonObj.getString("ba")
                    //提取时间戳
                    val ts: Long = jsonObj.getLong("ts")
                    // 页面数据
                    val pageObj: JSONObject = jsonObj.getJSONObject("page")
                    if(pageObj != null ){
                      //提取page字段
                      val pageId: String = pageObj.getString("page_id")
                      val pageItem: String = pageObj.getString("item")
                      val pageItemType: String = pageObj.getString("item_type")
                      val duringTime: Long = pageObj.getLong("during_time")
                      val lastPageId: String = pageObj.getString("last_page_id")
                      val sourceType: String = pageObj.getString("source_type")
    
                      //封装成PageLog,这边还写了bean实体类去接收
                      var pageLog =
                        PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
                      //发送到DWD_PAGE_LOG_TOPIC
                      MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段
                    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    1. 其他曝光、事件、启动数据如下
                      //提取曝光数据
                      val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
                      if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
                        for(i <- 0 until displaysJsonArr.size()){
                          //循环拿到每个曝光
                          val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
                          //提取曝光字段
                          val displayType: String = displayObj.getString("display_type")
                          val displayItem: String = displayObj.getString("item")
                          val displayItemType: String = displayObj.getString("item_type")
                          val posId: String = displayObj.getString("pos_id")
                          val order: String = displayObj.getString("order")
    
                          //封装成PageDisplayLog
                          val pageDisplayLog =
                            PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
                          // 写到 DWD_PAGE_DISPLAY_TOPIC
                          MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
                        }
                      }
                      //提取事件数据(课下完成)
                      val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
                      if(actionJsonArr != null && actionJsonArr.size() > 0 ){
                        for(i <- 0 until actionJsonArr.size()){
                          val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
                          //提取字段
                          val actionId: String = actionObj.getString("action_id")
                          val actionItem: String = actionObj.getString("item")
                          val actionItemType: String = actionObj.getString("item_type")
                          val actionTs: Long = actionObj.getLong("ts")
    
                          //封装PageActionLog
                          var pageActionLog =
                            PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
                          //写出到DWD_PAGE_ACTION_TOPIC
                          MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
                        }
                      }
                    }
                    // 启动数据(课下完成)
                    val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
                    if(startJsonObj != null ){
                      //提取字段
                      val entry: String = startJsonObj.getString("entry")
                      val loadingTime: Long = startJsonObj.getLong("loading_time")
                      val openAdId: String = startJsonObj.getString("open_ad_id")
                      val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
                      val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")
    
                      //封装StartLog
                      var startLog =
                        StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
                      //写出DWD_START_LOG_TOPIC
                      MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    2.3 精确一次消费

    1. 背景

    发送kafka的是自动提交,如果提交有误,会出现漏消费或者重复消费

    1. 相关语义
    • 至少一次消费:数据不会丢失,但存在数据重复
    • 最多一次消费:数据不会重复,但可能丢失数据
    • 精确一次消费:不多不少一次消费
  • 相关阅读:
    一文读懂二级分销返利模式,商城系统源码机制分享
    采集侠-免费采集侠-免费采集侠插件
    《UNIX 传奇:历史与回忆》读后感
    realEngine(UE4)实现开关门效果
    vue中的深拷贝lodash的用法及解释
    运动控制:编码器滤波
    CUDA动态并行实现快速排序
    从0到1开始运营你的开源产品
    Java 程序结构
    用go实现一个循环队列
  • 原文地址:https://blog.csdn.net/m0_46507516/article/details/132795186