• Hudi 数据湖的插入,更新,查询,分析操作示例


    Hudi 数据湖的插入,更新,查询,分析操作示例

    作者:Grey

    原文地址:

    博客园:Hudi 数据湖的插入,更新,查询,分析操作示例

    CSDN:Hudi 数据湖的插入,更新,查询,分析操作示例

    前置工作

    首先,需要先完成

    Linux 下搭建 Kafka 环境

    Linux 下搭建 Hadoop 环境

    Linux 下搭建 HBase 环境

    Linux 下搭建 Hive 环境

    本文基于上述四个环境已经搭建完成的基础上进行 Hudi 数据湖的插入,更新,查询操作。

    开发环境

    Scala 2.11.8

    JDK 1.8

    需要熟悉 Maven 构建项目和 Scala 一些基础语法。

    操作步骤

    master 节点首先启动集群,执行:

    stop-dfs.sh && start-dfs.sh
    
    • 1

    启动 yarn,执行:

    stop-yarn.sh && start-yarn.sh
    
    • 1

    然后准备一个 Mave 项目,在 src/main/resources 目录下,将 Hadoop 的一些配置文件拷贝进来,分别是

    $HADOOP_HOME/etc/hadoop/core-site.xml 文件

    
    
    <configuration>
        <property>
            <name>fs.default.namename>
            <value>hdfs://master:9000value>
        property>
        <property>
            <name>hadoop.tmp.dirname>
            <value>/usr/local/hadoop/tmpvalue>
        property>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    注意,需要在你访问集群的机器上配置 host 文件,这样才可以识别 master 节点。

    $HADOOP_HOME/etc/hadoop/hdfs-site.xml 文件

    
    
    
    <configuration>
        <property>
            <name>dfs.replicationname>
            <value>1value>
        property>
        <property>
            <name>dfs.permissionsname>
            <value>falsevalue>
        property>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    $HADOOP_HOME/etc/hadoop/yarn-site.xml 文件,目前还没有任何配置

    
    
    <configuration>
    configuration>
    
    • 1
    • 2
    • 3
    • 4

    然后,设计实体的数据结构,

    package git.snippet.entity
    
    case class MyEntity(uid: Int,
                        uname: String,
                        dt: String
                       )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    插入数据代码如下

    package git.snippet.test
    
    
    import git.snippet.entity.MyEntity
    import git.snippet.util.JsonUtil
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object DataInsertion {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setMaster("local[*]")
        val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val ssc = sparkSession.sparkContext
        ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
        insertData(sparkSession)
      }
    
      def insertData(sparkSession: SparkSession) = {
        import org.apache.spark.sql.functions._
        import sparkSession.implicits._
        val commitTime = System.currentTimeMillis().toString //生成提交时间
        val df = sparkSession.read.text("/mydata/data1")
          .mapPartitions(partitions => {
            partitions.map(item => {
              val jsonObject = JsonUtil.getJsonData(item.getString(0))
              MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
            })
          })
        val result = df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列
          .withColumn("uuid", col("uid"))
          .withColumn("hudipart", col("dt")) //增加hudi分区列
        result.write.format("org.apache.hudi")
          .option("hoodie.insert.shuffle.parallelism", 2)
          .option("hoodie.upsert.shuffle.parallelism", 2)
          .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列
          .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列
          .option("hoodie.table.name", "myDataTable")
          .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分区列
          .mode(SaveMode.Overwrite)
          .save("/snippet/data/hudi")
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    然后,在 master 节点先准备好数据

    vi data1
    
    • 1

    输入如下数据

    {'uid':1,'uname':'grey','dt':'2022/09'}
    {'uid':2,'uname':'tony','dt':'2022/10'}
    
    • 1
    • 2

    然后创建文件目录,

    hdfs dfs -mkdir /mydata/
    
    • 1

    把 data1 放入目录下

    hdfs dfs -put data1 /mydata/
    
    • 1

    访问:http://192.168.100.130:50070/explorer.html#/mydata

    可以查到这个数据

    image

    接下来执行插入数据的 scala 代码,执行完毕后,验证一下

    访问:http://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022

    可以查看到插入的数据

    image

    准备一个 data2 文件

    cp data1 data2 && vi data2
    
    • 1

    data2 的数据更新为

    {'uid':1,'uname':'grey1','dt':'2022/11'}
    {'uid':2,'uname':'tony1','dt':'2022/12'}
    
    • 1
    • 2

    然后执行

    hdfs dfs -put data2 /mydata/
    
    • 1

    更新数据的代码,我们可以做如下调整,完整代码如下

    package git.snippet.test
    
    import git.snippet.entity.MyEntity
    import git.snippet.util.JsonUtil
    import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object DataUpdate {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setMaster("local[*]")
        val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val ssc = sparkSession.sparkContext
        ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
        updateData(sparkSession)
      }
    
      def updateData(sparkSession: SparkSession) = {
        import org.apache.spark.sql.functions._
        import sparkSession.implicits._
        val commitTime = System.currentTimeMillis().toString //生成提交时间
        val df = sparkSession.read.text("/mydata/data2")
          .mapPartitions(partitions => {
            partitions.map(item => {
              val jsonObject = JsonUtil.getJsonData(item.getString(0))
              MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
            })
          })
        val result = df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列
          .withColumn("uuid", col("uid")) //添加uuid 列
          .withColumn("hudipart", col("dt")) //增加hudi分区列
        result.write.format("org.apache.hudi")
          //      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
          .option("hoodie.insert.shuffle.parallelism", 2)
          .option("hoodie.upsert.shuffle.parallelism", 2)
          .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列
          .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列
          .option("hoodie.table.name", "myDataTable")
          .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分区列
          .mode(SaveMode.Append)
          .save("/snippet/data/hudi")
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    执行更新数据的代码。

    验证一下,访问:http://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022

    可以查看到更新的数据情况

    image

    数据查询的代码也很简单,完整代码如下

    package git.snippet.test
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object DataQuery {
    
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setMaster("local[*]")
        val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val ssc = sparkSession.sparkContext
        ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
        queryData(sparkSession)
      }
    
    
      def queryData(sparkSession: SparkSession) = {
        val df = sparkSession.read.format("org.apache.hudi")
          .load("/snippet/data/hudi/*/*")
        df.show()
        println(df.count())
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    执行,输出以下信息,验证成功。

    image

    数据查询也支持很多查询条件,比如增量查询,按时间段查询等。

    接下来是 flink 实时数据分析的服务,首先需要在 master 上启动 kafka,并创建 一个名字为 mytopic 的 topic,详见Linux 下搭建 Kafka 环境

    相关命令如下

    创建topic

    kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topic  mytopic
    
    • 1

    生产者启动配置

    kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic
    
    • 1

    消费者启动配置

    kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic
    
    • 1

    然后运行如下代码

    package git.snippet.analyzer;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import java.util.Properties;
    
    public class DataAnalyzer {
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
            properties.setProperty("group.id", "snippet");
            //构建FlinkKafkaConsumer
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);
            //指定偏移量
            myConsumer.setStartFromLatest();
            final DataStream<String> stream = env.addSource(myConsumer);
            env.enableCheckpointing(5000);
            stream.print();
            try {
                env.execute("DataAnalyzer");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    其中

    properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
    
    • 1

    根据自己的配置调整,然后通过 kakfa 的生产者客户端输入一些数据,这边可以收到这个数据,验证完毕。

    完整代码见

    data-lake

  • 相关阅读:
    Coze+Discord:打造你的免费AI助手(教您如何免费使用GPT-4o/Gemini等最新最强的大模型/Discord如何正确连接Coze)
    spark复习
    直击“三夏”生产:丰收喜报频传 夏播紧锣密鼓
    建模助手 | 建筑界的难兄难弟?浅谈BIM与装配式的恩怨纠缠
    spice Link过程分析
    蓝桥杯第十一届电子类单片机组程序设计
    代码随想录——比较含退格的字符串
    华为面试题
    es带用户名密码验证并配置elasticsearch-head连接
    Android修行手册 - 一文全了解Kotlin几种静态变量、函数实现的那些事
  • 原文地址:https://blog.csdn.net/hotonyhui/article/details/127418093