• SparkStreaming 案例实操 完整使用 (第十七章)


    一、环境准备

    1、pom 文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Spark</artifactId>
            <groupId>com.spark</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>spark-code</artifactId>
    
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.10.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.27</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-yarn_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
    
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.10</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <!-- 声明绑定到 maven 的 compile 阶段 -->
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    2、生产数据

    package com.spack.bigdata.xm
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    
    import java.util.Properties
    import scala.collection.mutable.ListBuffer
    import scala.util.Random
    
    /**
     * TODO 项目
     * TODO 环境和数据准备---生产数据
     */
    object SparkStreaming10_MockData {
      def main(args: Array[String]): Unit = {
        //生成模拟数据
        //格式:timestamp  area  city  userId  adId
        //含义  时间戳      区域   城市    用户    广告
    
        //Application => Kafka => SparkStreaming =>Analysis
    
        //生产消息
        val producer = producerKafka()
    
        while (true) {
          mockData().foreach(
            data => {
              //向kafka中生成数据
              val record = new ProducerRecord[String, String]("atguiguNew", data)
              println(data)
              producer.send(record)
            }
          )
          Thread.sleep(2000)
        }
    
    
      }
    
    
      /**
       * TODO 生产数据
       *
       * @return
       */
      def mockData() = {
        val list = ListBuffer[String]()
        val areaList: ListBuffer[String] = ListBuffer[String]("华北", "华东", "华南")
        val cityList: ListBuffer[String] = ListBuffer[String]("北京", "上海", "深圳")
    
        for (i <- 1 to new Random().nextInt(50)) {
          //    for (i <- 1 to 30) {
          val area = areaList(new Random().nextInt(3))
          val city = cityList(new Random().nextInt(3))
    
          val userId = new Random().nextInt(6) + 1
          val adId = new Random().nextInt(6) + 1
          list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}")
    
        }
        list
      }
    
    
      /**
       * TODO 发送消息配置
       *
       * @return
       */
      def producerKafka() = {
        // 创建配置对象
        val prop = new Properties()
        // 添加配置
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    
        // 根据配置创建 Kafka 生产者
        val value: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
        value
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    二、需求一:广告黑名单

    实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
    注:黑名单保存到 MySQL 中

    1) 思路分析

    1)读取 Kafka 数据之后,并对 MySQL 中存储的黑名单数据做校验;
    2)校验通过则对给用户点击广告次数累加一并存入 MySQL;
    3)在存入 MySQL 之后对数据做校验,如果单日超过 100 次则将该用户加入黑名单。

    2) 存放黑名单用户的表

    CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);
    
    • 1

    3) 存放单日各用户点击每个广告的次数

    CREATE TABLE user_ad_count (
    dt varchar(255),
    userid CHAR (1),
    adid CHAR (1),
    count BIGINT,
    PRIMARY KEY (dt, userid, adid)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4) 代码实现

    package com.spack.bigdata.xm
    
    import com.spack.bigdata.util.JDBCUtils
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import java.text.SimpleDateFormat
    import java.util.Date
    import scala.collection.mutable.ListBuffer
    
    /**
     * TODO ===================================正式项目消费环境--演示数据获取
     * TODO 环境和数据准备
     */
    object SparkStreaming11_Req1_BlackList {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        //StreamingContext 创建时、需要传递两个参数
        //第一个参数表示环境变量
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    
        //第二个参数表示批量处理的周期(采集周期)---Seconds以秒为单位
        val scc = new StreamingContext(sparkConf, Seconds(3))
    
        //设置Kafka参数
        val kaConfin: Map[String, Object] = setKafKaConfin
    
        //    println("kaConfin " + kaConfin)
    
        //读取KafKa数据创建DStream、采集的数据传过来的K,V 是字符串
        val KafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          scc, //上下文的环境对象
          LocationStrategies.PreferConsistent, //位置的策略、采集节点、和计算该如何做匹配、TODO 自动选择、由框架来匹配
          ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kaConfin) //消费者策略
        )
    
    
        val adClickData = KafkaDataDS.map(
          kafkaData => {
            val data = kafkaData.value()
            val datas = data.split(" ")
    
            /**
             * 拿到数据
             * 1658972865679 华北 深圳 4 3
             * 1658972865679 华东 深圳 1 2
             * 进行样例类转换
             */
            AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
          }
        )
    
    
        //transform -可直接拿到RDD --而且是周期性的拿到RDD
        val ds = adClickData.transform(
          rdd => {
            //TODO 通过JDBC周期性获取黑名单数据
            val blackList = ListBuffer[String]()
    
            //查询数据库
            val connection = JDBCUtils.getConnection
            val pstat = connection.prepareStatement("select userid from black_list")
            val rs = pstat.executeQuery()
    
            while (rs.next()) {
              //取出第一个字段、添加到 blackList 集合中
              blackList.append(rs.getString(1))
            }
            rs.close()
            pstat.close()
            connection.close()
    
            //TODO 判断点击用户是否存在黑名单中
            val filterRDD = rdd.filter(
              data => {
                //不在里面要保留 所以加!
                !blackList.contains(data.user)
              }
            )
    
            //TODO 1、 如果用户不在黑名单中、那么进行统计数量(每个采集周期)
            //每天,每个广告的用户
            filterRDD.map(
              data => {
                val sdf = new SimpleDateFormat("yyyy-MM-dd")
                val day = sdf.format(new Date(data.ts.toLong))
                val user = data.user
                val ad = data.ad
    
                ((day, user, ad), 1) // (word ,count)
              }
            ).reduceByKey(_ + _)
    
          }
        )
    
    
        ds.foreachRDD(
          //对每条数据做操作
          rdd => {
            rdd.foreach {
              //每条数据都应该判断有没有超过阈值
              //模式匹配会更好
              case ((day, user, ad), count) => {
                println(s"${day} ${user} ${count}")
                if (count >= 30) {
                  //TODO 如果统计数量超过点击阈值(30)那吗将用户拉入到黑名单
                  val connection = JDBCUtils.getConnection
    
                  //DUPLICATE KEY 重复key的时候--发现有重复的 更新
                  val pstat = connection.prepareStatement(
                    """
                      | insert into black_list (userid)  values (?)
                      | on DUPLICATE KEY
                      | UPDATE userid = ?
                      |""".stripMargin)
                  pstat.setString(1, user)
                  pstat.setString(2, user)
                  //              println("TODO 如果统计数量超过点击阈值(30)那吗将用户拉入到黑名单")
                  pstat.executeUpdate()
                  pstat.close()
                  connection.close()
    
    
                } else {
                  //TODO 如果没有超过阈值,那吗将当天的广告点击量进行更新
                  val connection = JDBCUtils.getConnection
                  val pstat = connection.prepareStatement(
                    """
                      | select
                      |    *
                      | from user_ad_count
                      | where dt = ? and userid = ? and adid = ?
                      |""".stripMargin)
    
                  pstat.setString(1, day)
                  pstat.setString(2, user)
                  pstat.setString(3, ad)
                  val rs = pstat.executeQuery()
                  //              println("TODO 如果没有超过阈值,那吗将当天的广告点击量进行更新")
                  //查询统计表数据
                  if (rs.next()) {
                    //TODO 如果存在数据、那吗更新---更新完是否超过阈值
                    val pstat1 = connection.prepareStatement(
                      """
                        | update user_ad_count
                        | set count = count + ?
                        | where dt = ? and userid = ? and adid = ?
                        |""".stripMargin)
                    pstat1.setInt(1, count)
                    pstat1.setString(2, day)
                    pstat1.setString(3, user)
                    pstat1.setString(4, ad)
                    //                println("如果存在数据、那吗更新---更新完是否超过阈值")
                    pstat1.executeUpdate()
                    pstat1.close()
    
                    //TODO 判断更新后的点击数据是否超过阅值,如果超过,那么将用户拉入到黑名单。
                    val pstat2 = connection.prepareStatement(
                      """
                        | select *
                        | from user_ad_count
                        | where dt =? and userid = ? and adid = ? and count >= 30
                        |""".stripMargin)
    
                    pstat2.setString(1, day)
                    pstat2.setString(2, user)
                    pstat2.setString(3, ad)
    
                    val rs2 = pstat2.executeQuery()
                    //                println("TODO 判断更新后的点击数据是否超过阅值,如果超过,那么将用户拉入到黑名单。")
                    if (rs2.next()) {
                      val pstat3 = connection.prepareStatement(
                        """
                          | insert into black_list (userid) values (?)
                          | on DUPLICATE KEY
                          | UPDATE userid = ?
                          |""".stripMargin)
                      pstat3.setString(1, user)
                      pstat3.setString(2, user)
                      pstat3.executeUpdate()
    
                      //                  println("TODO2 判断更新后的点击数据是否超过阅值,如果超过,那么将用户拉入到黑名单。")
                      pstat3.close()
    
                    }
    
                    rs2.close()
                    pstat2.close()
    
    
                  } else {
                    //如果不存在数据、那吗新增
                    val pstat1 = connection.prepareStatement(
                      """
                        | insert into user_ad_count (dt,userid,adid,count) values(?,?,?,?)
                        |""".stripMargin)
    
                    pstat1.setString(1, day)
                    pstat1.setString(2, user)
                    pstat1.setString(3, ad)
                    pstat1.setInt(4, count)
                    //                println("如果不存在数据、那吗新增")
                    pstat1.executeUpdate()
                    pstat1.close()
    
                  }
    
                  rs.close()
                  pstat.close()
                  connection.close()
                }
              }
            }
    
    
          }
        )
    
    
        //    print(1 >= 2)
    
        //开启后台启动
        scc.start()
        scc.awaitTermination()
      }
    
    
      case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
    
    
      /**
       * Kafka配置
       *
       * @return
       */
      def setKafKaConfin(): Map[String, Object] = {
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        kafkaPara
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251

    5) 优化

    package com.spack.bigdata.xm
    
    import com.spack.bigdata.util.JDBCUtils
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import java.text.SimpleDateFormat
    import java.util.Date
    import scala.collection.mutable.ListBuffer
    
    /**
     * TODO ===================================正式项目消费环境--演示数据获取
     * TODO 环境和数据准备
     */
    object SparkStreaming11_Req1_BlackList1 {
      def main(args: Array[String]): Unit = {
        // 创建环境对象
        // StreamingContext 创建时、需要传递两个参数
        // 第一个参数表示环境变量
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    
        //第二个参数表示批量处理的周期(采集周期)---Seconds以秒为单位
        val scc = new StreamingContext(sparkConf, Seconds(3))
    
        // 设置Kafka参数
        val kaConfin: Map[String, Object] = setKafKaConfin
    
        //    println("kaConfin " + kaConfin)
    
        // 读取KafKa数据创建DStream、采集的数据传过来的K,V 是字符串
        val KafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          scc, //上下文的环境对象
          LocationStrategies.PreferConsistent, //位置的策略、采集节点、和计算该如何做匹配、TODO 自动选择、由框架来匹配
          ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kaConfin) //消费者策略
        )
    
    
        val adClickData = KafkaDataDS.map(
          kafkaData => {
            val data = kafkaData.value()
            val datas = data.split(" ")
    
            /**
             * 拿到数据
             * 1658972865679 华北 深圳 4 3
             * 1658972865679 华东 深圳 1 2
             * 进行样例类转换
             */
            AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
          }
        )
    
        //transform -可直接拿到RDD --而且是周期性的拿到RDD
        val ds = adClickData.transform(
          rdd => {
            //TODO 通过JDBC周期性获取黑名单数据
            val blackList = ListBuffer[String]()
    
            //查询数据库
            val connection = JDBCUtils.getConnection
            val pstat = connection.prepareStatement("select userid from black_list")
            val rs = pstat.executeQuery()
    
            while (rs.next()) {
              //取出第一个字段、添加到 blackList 集合中
              blackList.append(rs.getString(1))
            }
            rs.close()
            pstat.close()
            connection.close()
    
            //TODO 判断点击用户是否存在黑名单中
            val filterRDD = rdd.filter(
              data => {
                //不在里面要保留 所以加!
                !blackList.contains(data.user)
              }
            )
    
            //TODO 1、 如果用户不在黑名单中、那么进行统计数量(每个采集周期)
            //每天,每个广告的用户
            filterRDD.map(
              data => {
                val sdf = new SimpleDateFormat("yyyy-MM-dd")
                val day = sdf.format(new Date(data.ts.toLong))
                val user = data.user
                val ad = data.ad
    
                ((day, user, ad), 1) // (word ,count)
              }
            ).reduceByKey(_ + _)
    
          }
        )
    
    
        //从集合中获取人员来判断黑名单是否存在
        ds.foreachRDD(
          rdd => {
            //TODO  RDD 提供了一个算子可以有效提示效率:foreachPartition
            // 可以一个分区创建一个连接对象,这样可以大幅度减少连接对象的数量,提升效率
            //TODO 这样的方式不是一条数据创建一个链接、而是一个Partition创建一个链接
            // 这样可以大幅度减少连接对象的数量、提升效率
            //通过算子来简化JDBC的操作
            //        rdd.foreachPartition(
            //          iter =>{
            //
            //            val connection = JDBCUtils.getConnection
            //            iter.foreach{
            //              //每条数据都应该判断有没有超过阈值
            //              //模式匹配会更好
            //              case ((day, user, ad), count) => {
            //                println(s"${day} ${user} ${count}")
            //                if (count >= 30) {
            //                  //TODO 如果统计数量超过点击阈值(30)那吗将用户拉入到黑名单
            //
            //
            //                  //DUPLICATE KEY 重复key的时候--发现有重复的 更新
            //                  val sql =
            //                    """
            //                      | insert into black_list (userid)  values (?)
            //                      | on DUPLICATE KEY
            //                      | UPDATE userid = ?
            //                      |""".stripMargin
            //                  JDBCUtils.executeUpdate(connection, sql, Array(user, user))
            //                  connection.close()
            //
            //
            //                } else {
            //                  //TODO 如果没有超过阈值,那吗将当天的广告点击量进行更新
            //                  val connection = JDBCUtils.getConnection
            //                  val sql =
            //                    """
            //                      | select
            //                      |    *
            //                      | from user_ad_count
            //                      | where dt = ? and userid = ? and adid = ?
            //                      |""".stripMargin;
            //                  val flg = JDBCUtils.isExist(connection, sql, Array(day, user, ad))
            //
            //                  //查询统计表数据
            //                  if (flg) {
            //
            //                    val sql =
            //                      """
            //                        | update user_ad_count
            //                        | set count = count + ?
            //                        | where dt = ? and userid = ? and adid = ?
            //                        |""".stripMargin
            //
            //                    JDBCUtils.executeUpdate(connection, sql, Array(count, day, user, ad))
            //                    //TODO 如果存在数据、那吗更新---更新完是否超过阈值
            //
            //
            //                    val sql2 =
            //                      """
            //                        | select *
            //                        | from user_ad_count
            //                        | where dt =? and userid = ? and adid = ? and count >= 30
            //                        |""".stripMargin
            //
            //                    val flg1 = JDBCUtils.isExist(connection, sql2, Array(count, day, user, ad))
            //
            //                    //TODO 判断更新后的点击数据是否超过阅值,如果超过,那么将用户拉入到黑名单。
            //
            //                    if (flg1) {
            //                      val sql3 =
            //                        """
            //                          | insert into black_list (userid)  values (?)
            //                          | on DUPLICATE KEY
            //                          | UPDATE userid = ?
            //                          |""".stripMargin
            //
            //                      JDBCUtils.executeUpdate(connection, sql3, Array(user, user))
            //                    }
            //
            //                  } else {
            //
            //                    val sql4 =
            //                      """
            //                        | insert into user_ad_count (dt,userid,adid,count) values(?,?,?,?)
            //                        |""".stripMargin
            //
            //                    JDBCUtils.executeUpdate(connection, sql4, Array(day,user,ad,count))
            //                    //如果不存在数据、那吗新增
            //
            //
            //                  }
            //
            //                  connection.close()
            //                }
            //              }
            //            }
            //          }
            //        )
    
    
            //TODO 对每条数据做操作
            // rdd.foreach 方法会每一条数据创建链接
            // foreach 方法是RDD的算子、算子之外的代码是Driver端执行,算子内的代码是在Ececutor端执行
            // 这样就会涉及闭包操作,Driver端的数据就需要传递到Executor端,需要将数据进行序列化
            // 数据库的链接对象是不能序列化的
            rdd.foreach {
              //每条数据都应该判断有没有超过阈值
              //模式匹配会更好
              case ((day, user, ad), count) => {
                println(s"${day} ${user} ${count}")
                if (count >= 30) {
                  //TODO 如果统计数量超过点击阈值(30)那吗将用户拉入到黑名单
                  val connection = JDBCUtils.getConnection
    
                  //DUPLICATE KEY 重复key的时候--发现有重复的 更新
                  val sql =
                    """
                      | insert into black_list (userid)  values (?)
                      | on DUPLICATE KEY
                      | UPDATE userid = ?
                      |""".stripMargin
                  JDBCUtils.executeUpdate(connection, sql, Array(user, user))
                  connection.close()
    
    
                } else {
                  //TODO 如果没有超过阈值,那吗将当天的广告点击量进行更新
                  val connection = JDBCUtils.getConnection
                  val sql =
                    """
                      | select
                      |    *
                      | from user_ad_count
                      | where dt = ? and userid = ? and adid = ?
                      |""".stripMargin;
                  val flg = JDBCUtils.isExist(connection, sql, Array(day, user, ad))
    
                  //查询统计表数据
                  if (flg) {
    
                    val sql =
                      """
                        | update user_ad_count
                        | set count = count + ?
                        | where dt = ? and userid = ? and adid = ?
                        |""".stripMargin
    
                    JDBCUtils.executeUpdate(connection, sql, Array(count, day, user, ad))
                    //TODO 如果存在数据、那吗更新---更新完是否超过阈值
    
    
                    val sql2 =
                      """
                        | select *
                        | from user_ad_count
                        | where dt =? and userid = ? and adid = ? and count >= 30
                        |""".stripMargin
    
                    val flg1 = JDBCUtils.isExist(connection, sql2, Array(count, day, user, ad))
    
                    //TODO 判断更新后的点击数据是否超过阅值,如果超过,那么将用户拉入到黑名单。
    
                    if (flg1) {
                      val sql3 =
                        """
                          | insert into black_list (userid)  values (?)
                          | on DUPLICATE KEY
                          | UPDATE userid = ?
                          |""".stripMargin
    
                      JDBCUtils.executeUpdate(connection, sql3, Array(user, user))
                    }
    
                  } else {
    
                    val sql4 =
                      """
                        | insert into user_ad_count (dt,userid,adid,count) values(?,?,?,?)
                        |""".stripMargin
    
                    JDBCUtils.executeUpdate(connection, sql4, Array(day, user, ad, count))
                    //如果不存在数据、那吗新增
    
    
                  }
    
                  connection.close()
                }
              }
            }
    
    
          }
        )
    
    
        //    print(1 >= 2)
    
        //开启后台启动
        scc.start()
        scc.awaitTermination()
      }
    
    
      case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
    
    
      /**
       * Kafka配置
       *
       * @return
       */
      def setKafKaConfin(): Map[String, Object] = {
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        kafkaPara
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324

    三、需求二:广告点击量实时统计

    描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。

    1、思路分析

    1)单个批次内对数据进行按照天维度的聚合统计;
    2)结合 MySQL 数据跟当前批次数据更新原有的数据。

    2、MySQL 建表

    CREATE TABLE area_city_ad_count (
    dt VARCHAR(255),
    area VARCHAR(255),
    city VARCHAR(255),
    adid VARCHAR(255),
     count BIGINT,
    PRIMARY KEY (dt,area,city,adid)
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    代码实现

    package com.spack.bigdata.xm.xm2
    
    import com.spack.bigdata.util.JDBCUtils
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    /**
     * 需求二:广告点击量实时统计
         描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
          7.4.1 思路分析
                 1)单个批次内对数据进行按照天维度的聚合统计;
                 2)结合 MySQL 数据跟当前批次数据更新原有的数据。
     */
    object SparkStreaming12_Req2 {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        //StreamingContext 创建时、需要传递两个参数
        //第一个参数表示环境变量
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    
        //第二个参数表示批量处理的周期(采集周期)---Seconds以秒为单位
        val scc = new StreamingContext(sparkConf, Seconds(3))
    
        //设置Kafka参数
        val kaConfin = setKafKaConfin
    
        //读取KafKa数据创建DStream、采集的数据传过来的K,V 是字符串
        val KafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          scc, //上下文的环境对象
          LocationStrategies.PreferConsistent, //位置的策略、采集节点、和计算该如何做匹配、TODO 自动选择、由框架来匹配
          ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kaConfin) //消费者策略
        )
    
    
        val adClickData = KafkaDataDS.map(
          kafkaData => {
            val data = kafkaData.value()
            val datas = data.split(" ")
            AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
          }
        )
    
        val reduceDS = adClickData.map(
          data => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day = sdf.format(new Date(data.ts.toLong))
            val area = data.area
            val city = data.city
            val ad = data.ad
    
            ((day, area, city, ad), 1)
          }
        ).reduceByKey(_ + _)
    
    
        reduceDS.foreachRDD(
          rdd => {
            rdd.foreachPartition(
              iter => {
                val connection = JDBCUtils.getConnection
                val pstat = connection.prepareStatement(
                  """
                    | insert into area_city_ad_count (dt, area, city, adid, count)
                    | values (?,?,?,?,?)
                    | on DUPLICATE KEY
                    | UPDATE count = count + ?
                    |""".stripMargin)
                iter.foreach {
                  case ((day, area, city, ad), sum) => {
                    pstat.setString(1, day)
                    pstat.setString(2, area)
                    pstat.setString(3, city)
                    pstat.setString(4, ad)
                    pstat.setInt(5, sum)
                    pstat.setInt(6, sum)
                    pstat.executeUpdate()
                  }
                }
                pstat.close()
                connection.close()
              }
    
            )
          }
        )
    
    
    
        //开启后台启动
        scc.start()
        scc.awaitTermination()
      }
    
    
      case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
    
    
      /**
       * Kafka配置
       *
       * @return
       */
      def setKafKaConfin(): Map[String, Object] = {
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        kafkaPara
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    四、需求三:最近一小时广告点击量

    结果展示:
    1:List [15:50->10,15:51->25,15:52->30]
    2:List [15:50->10,15:51->25,15:52->30]
    3:List [15:50->10,15:51->25,15:52->30]

    1、 思路分析

    1)开窗确定时间范围;
    2)在窗口内将数据转换数据结构为((adid,hm),count);
    3)按照广告 id 进行分组处理,组内按照时分排序。

    2、代码实现

    package com.spack.bigdata.xm.xm3
    
    import com.spack.bigdata.util.JDBCUtils
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    /**
     * 需求二:广告点击量实时统计
     * 描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
     * 7.4.1 思路分析
     * 1)单个批次内对数据进行按照天维度的聚合统计;
     * 2)结合 MySQL 数据跟当前批次数据更新原有的数据。
     */
    object SparkStreaming13_Req3 {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        //StreamingContext 创建时、需要传递两个参数
        //第一个参数表示环境变量
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    
        //第二个参数表示批量处理的周期(采集周期)---Seconds以秒为单位
        val scc = new StreamingContext(sparkConf, Seconds(3))
    
        //设置Kafka参数
        val kaConfin = setKafKaConfin
    
        //读取KafKa数据创建DStream、采集的数据传过来的K,V 是字符串
        val KafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          scc, //上下文的环境对象
          LocationStrategies.PreferConsistent, //位置的策略、采集节点、和计算该如何做匹配、TODO 自动选择、由框架来匹配
          ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kaConfin) //消费者策略
        )
    
    
        val adClickData = KafkaDataDS.map(
          kafkaData => {
            val data = kafkaData.value()
            val datas = data.split(" ")
            AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
          }
        )
    
        adClickData.map(
          data => {
            val ts = data.ts.toLong
            val newTS = ts / 10000 * 10000
            (newTS, 1)
          }
        ).reduceByKeyAndWindow((x: Int, y: Int) => {
          x + y
        }, Seconds(60), Seconds(10))
    
    
        //开启后台启动
        scc.start()
        scc.awaitTermination()
      }
    
    
      case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
    
    
      /**
       * Kafka配置
       *
       * @return
       */
      def setKafKaConfin(): Map[String, Object] = {
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        kafkaPara
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    优化页面展示

    package com.spack.bigdata.xm.xm3
    
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import java.io.{File, FileWriter, PrintWriter}
    import java.text.SimpleDateFormat
    import java.util.Date
    import scala.collection.mutable.ListBuffer
    
    /**
     * 需求二:广告点击量实时统计
     * 描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
     * 7.4.1 思路分析
     * 1)单个批次内对数据进行按照天维度的聚合统计;
     * 2)结合 MySQL 数据跟当前批次数据更新原有的数据。
     */
    object SparkStreaming13_Req31 {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        //StreamingContext 创建时、需要传递两个参数
        //第一个参数表示环境变量
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    
        //第二个参数表示批量处理的周期(采集周期)---Seconds以秒为单位
        val scc = new StreamingContext(sparkConf, Seconds(5))
    
        //设置Kafka参数
        val kaConfin = setKafKaConfin
    
        //读取KafKa数据创建DStream、采集的数据传过来的K,V 是字符串
        val KafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          scc, //上下文的环境对象
          LocationStrategies.PreferConsistent, //位置的策略、采集节点、和计算该如何做匹配、TODO 自动选择、由框架来匹配
          ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kaConfin) //消费者策略
        )
    
    
        val adClickData = KafkaDataDS.map(
          kafkaData => {
            val data = kafkaData.value()
            val datas = data.split(" ")
            AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
          }
        )
    
        val reduceDS = adClickData.map(
          data => {
            val ts = data.ts.toLong
            val newTS = ts / 10000 * 10000
            (newTS, 1)
          }
        ).reduceByKeyAndWindow((x: Int, y: Int) => {
          x + y
        }, Seconds(60), Seconds(10))
    
    
        reduceDS.foreachRDD(
          rdd => {
            val list = ListBuffer[String]()
            val datas: Array[(Long, Int)] = rdd.sortByKey(true).collect()
    
            datas.foreach {
              case (time, cnt) => {
                val str = new SimpleDateFormat("mm:ss").format(new Date(time.toLong))
                list.append(s"""{"xtime":"${str}","yval":"${cnt}"}""")
              }
            }
    
            //输出文件
            val out = new PrintWriter(new FileWriter(new File("D:\\utils\\development\\project\\Spark\\datas\\adclick\\adclick.json")))
            out.println("[" + list.mkString(",") + "]")
            out.flush()
            out.close()
          }
        )
    
    
        //开启后台启动
        scc.start()
        scc.awaitTermination()
      }
    
    
      case class AdClickData(ts: String, area: String, city: String, user: String, ad: String)
    
    
      /**
       * Kafka配置
       *
       * @return
       */
      def setKafKaConfin(): Map[String, Object] = {
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        kafkaPara
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
  • 相关阅读:
    信息学奥赛一本通 1339:【例3-4】求后序遍历 | 洛谷 P1827 [USACO3.4] 美国血统 American Heritage
    关于GPU显卡的介绍
    集群离线搭建k8s
    pytest多进程/多线程执行测试用例
    git在merge时做了些什么
    使用vue自定义实现Tree组件和拖拽功能
    识别评估项目风险常用6大方法
    《Python进阶系列》二十八:mmap模块(处理大文本)
    Cortex-M架构系统定时器阻塞和非阻塞延时
    如何写论文
  • 原文地址:https://blog.csdn.net/qq_42082701/article/details/126126428