• Dataset 的一些 Java api 操作


    一、给 RDD 添加新列

    方式一:将已有的列扩展为新列
    Dataset hehe = haha.withColumn("k_ts", functions.expr("unix_timestamp(Time, 'yyyy-MM-dd HH:mm:ss') as k_ts"));
    
    // 添加一个新的列,计算两个现有列的和
    df.withColumn("sum", functions.col("column1").plus(functions.col("column2")))
    
    // 添加一个新的列
    df.withColumn("column1", functions.col("column3"));
    
    // 添加一个新的列,使用正则和替换等操作
    Dataset hehe = haha.withColumn("city", functions.regexp_replace(functions.translate(haha.col("p"), "<>", ""),  "[[^\\w]+]", "_"))
    
    // 添加一个新的列
    Dataset hehe = haha..withColumn("itemName", functions.lit("city"))
    
    // 使用UDF添加新的列:
    // 定义一个UDF
    UDF1, String> reverseUDF = new UDF1, String>() {
        @Override
        public String call(String s) throws Exception {
            return new StringBuilder(s).reverse().toString();
        }
    };
     
    // 注册UDF
    spark.udf().register("reverse", reverseUDF, DataTypes.StringType);
     
    // 使用UDF添加新的列
    df.withColumn("reversedColumn", functions.callUDF("reverse", functions.col("column1")));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    方式二:使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列

      在应用 mapPartition 函数后创建一个新的数据框:

    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    public class Handler implements Serializable {
    
        public void handler(Dataset<Row> sourceData) {
            Dataset<Row> rowDataset = sourceData
                    .where("rowKey = 'abcdefg_123'")
                    .selectExpr("split(rowKey, '_')[0] as id",
                            "name",
                            "time")
                    .where("name = '小强'")
                    .orderBy(functions.col("id").asc(), functions.col("time").desc());
    
            FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTime = rows->
            {
                Int count = 0; // 只能在每个分区内自增,不能保证全局自增
    			String startTime = "";
    			String endTime = "";
    			List<Row> mappedRows=new ArrayList<Row>();
                while(rows.hasNext())
                {
                    count++;
                    Row next = rows.next();
                    String id = next.getAs("id");
                    if (count == 2) {
    					startTime = next.getAs("time");
    					endTime = next.getAs("time");
                    }
                    Row mappedRow= RowFactory.create(next.getString(0), next.getString(1), next.getString(2), endTime, startTime);
                    mappedRows.add(mappedRow);
                }
                return mappedRows.iterator();
            };
    
            JavaRDD<Row> sensorDataDoubleRDD=rowDataset.toJavaRDD().mapPartitions(mapPartitonstoTime);
    
            StructType oldSchema=rowDataset.schema();
            StructType newSchema =oldSchema.add("startTime",DataTypes.StringType,false)
                    .add("endTime",DataTypes.StringType,false);
    
            System.out.println("The new schema is: ");
            newSchema.printTreeString();
    
            System.out.println("The old schema is: ");
            oldSchema.printTreeString();
    
            Dataset<Row> sensorDataDoubleDF=spark.createDataFrame(sensorDataDoubleRDD, newSchema);
            sensorDataDoubleDF.show(100, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    打印结果:

    The new schema is: 
    root
     |-- id: string (nullable = true)
     |-- name: string (nullable = true)
     |-- time: string (nullable = true)
    
    The old schema is: 
    root
     |-- id: string (nullable = true)
     |-- name: string (nullable = true)
     |-- time: string (nullable = true)
     |-- startTime: string (nullable = true)
     |-- endTime: string (nullable = true)
    
    +-----------+---------+----------+----------+----------+
    |id         |name     |time      |startTime |endTime   |
    +-----------+---------+----------+----------+----------+
    |abcdefg_123|xiaoqiang|1693462023|1693462023|1693462023|
    |abcdefg_321|xiaoliu  |1693462028|1693462028|1693462028|
    +-----------+---------+----------+----------+----------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    参考:
    java - 使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列
    java.util.Arrays$ArrayList cannot be cast to java.util.Iterator

    二、foreachPartition 遍历 Dataset

    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Iterator;
    
    public class Handler implements Serializable {
    
        public void handler(Dataset<Row> sourceData) {
            JavaRDD<Row> dataRDD = rowDataset.toJavaRDD();
            dataRDD.foreachPartition(new VoidFunction<Iterator<Row>>() {
                @Override
                public void call(Iterator<Row> rowIterator) throws Exception {
                    while (rowIterator.hasNext()) {
                        Row next = rowIterator.next();
                        String id = next.getAs("id");
                        if (id.equals("123")) {
                            String startTime = next.getAs("time");
                            // 其他业务逻辑
                        }
                    }
                }
            });
    
    	    // 转换为 lambda 表达式
    	    dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {
                while (rowIterator.hasNext()) {
                    Row next = rowIterator.next();
                    String id = next.getAs("id");
                    if (id.equals("123")) {
                        String startTime = next.getAs("time");
                        // 其他业务逻辑
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    扩展:工作中观察到的总结:
    1. System.out.println 是否会打印在控制台:
        SparkStreaming:cluster 模式不会打印(但在Yarn 日志中可以看到答应信息),client 和 local 模式都可以打印。注:client 模式也是部分会打印部分不会打印。
        离线非实时程序:cluster 模式不会打印(但在Yarn 日志中可以看到答应信息),client 和 local 模式都可以打印。注:但如果在 foreachPartition 这个方法中写打印代码的话,client 模式不会在控制台打印但是 local 模式却能,很神奇。

    2. 进程是否会在 Yarn 应用程序页面显示:
        SparkStreaming:cluster 会,client 和 local 模式不会。
        离线非实时程序:client 和 cluster 都会,local 模式不会。

    3. 命令行 --name 参数和代码中的 setAppName 对比:
        SparkStreaming:cluster 模式 --name 优先级高,setAppName 好像没起作用默认展示类名,因为我代码里写的是 .setAppName("heheApp") 但是 Yarn 页面上展示的是 com.xiaoqiang.hehe
        离线非实时程序:client 模式 setAppName 优先级高,cluster 模式 --name 优先级高。

    4. 代码 spark 配置项中加入 .master("local[*]") 程序能否启动成功:
        SparkStreaming:cluster 模式不会成功,client 能成功。
        离线非实时程序:cluster 和 client 模式都不会成功。报错:

    24/01/24 10:54:01 ERROR SparkContext: Error initializing SparkContext.
    org.apache.spark.SparkException: A master URL must be set in your configuration
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:368)
            at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
            at com.xiaoqiang.app.HehedaApp.main(EssDataAnalysisApp.java:56)
    24/01/24 10:54:01 ERROR Utils: Uncaught exception in thread main
    java.lang.NullPointerException
            at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$postApplicationEnd(SparkContext.scala:2416)
            at org.apache.spark.SparkContext$$anonfun$stop$1.apply$mcV$sp(SparkContext.scala:1931)
            at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
            at org.apache.spark.SparkContext.stop(SparkContext.scala:1930)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:585)
            at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
            at com.xiaoqiang.app.HehedaApp.main(EssDataAnalysisApp.java:56)
    org.apache.spark.SparkException: A master URL must be set in your configuration
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:368)
            at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
            at com.xiaoqiang.app.HehedaApp.main(EssDataAnalysisApp.java:56)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    三、Dataset 自定义 Partitioner

    参考:spark 自定义 partitioner 分区 java 版

    import org.apache.commons.collections.CollectionUtils;
    import org.apache.spark.Partitioner;
    import org.junit.Assert;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Created by lesly.lai on 2018/7/25.
     */
    public class CuxGroupPartitioner extends Partitioner {
    
        private int partitions;
    
        /**
         * map
         * 主要为了区分不同分区
         */
        private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<>();
    
        public CuxGroupPartitioner(List<Object> groupList) {
            int size = groupList.size();
            this.partitions = size;
            initMap(partitions, groupList);
        }
    
        private void initMap(int size, List<Object> groupList) {
            Assert.assertTrue(CollectionUtils.isNotEmpty(groupList));
            for (int i=0; i<size; i++) {
                hashCodePartitionIndexMap.put(groupList.get(i), i);
            }
        }
    
        @Override
        public int numPartitions() {
            return partitions;
        }
    
        @Override
        public int getPartition(Object key) {
            return hashCodePartitionIndexMap.get(key);
        }
    
        public boolean equals(Object obj) {
            if (obj instanceof CuxGroupPartitioner) {
                return ((CuxGroupPartitioner) obj).partitions == partitions;
            }
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    查看分区分布情况工具类:
    (1)Scala:

    import org.apache.spark.sql.{Dataset, Row}
    
    /**
     * Created by lesly.lai on 2017/12FeeTask/25.
     */
    class SparkRddTaskInfo {
      def getTask(dataSet: Dataset[Row]) {
        val size = dataSet.rdd.partitions.length
        println(s"==> partition size: $size " )
        import scala.collection.Iterator
        val showElements = (it: Iterator[Row]) => {
          val ns = it.toSeq
          import org.apache.spark.TaskContext
          val pid = TaskContext.get.partitionId
          println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
        }
        dataSet.foreachPartition(showElements)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    (2)Java:

    import org.apache.spark.TaskContext;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    public class SparkRddTaskInfo {
        public static void getTask(Dataset<Row> dataSet) {
            int size = dataSet.rdd().partitions().length;
            System.out.println("==> partition size:" + size);
    
            JavaRDD<Row> dataRDD = dataSet.toJavaRDD();
            dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {
                List<String> mappedRows = new ArrayList<String>();
                int count = 0;
                while (rowIterator.hasNext()) {
                    Row next = rowIterator.next();
                    String id = next.getAs("id");
                    String partitionKey = next.getAs("partition_key");
                    String name = next.getAs("name");
                    mappedRows.add(id + "/" + partitionKey+ "/" + name);
                }
                int pid = TaskContext.get().partitionId();
                System.out.println("[partition: " + pid + "][size: " + mappedRows.size() + "]" + mappedRows);
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    调用方式:

    import com.vip.spark.db.ConnectionInfos;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Column;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * Created by lesly.lai on 2018/7/23.
     */
    public class SparkSimpleTestPartition {
    	public static void main(String[] args) throws InterruptedException {
    	
    		SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
    		// 原始数据集
    		Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());
    		originSet
    		.selectExpr("split(rowKey, '_')[0] as id",
                "concat(split(rowKey, '_')[0],'_',split(rowKey, '_')[1]) as partition_key",
                 "split(rowKey, '_')[1] as name"
    		.createOrReplaceTempView("people");
    		// 获取分区分布情况工具类
    		SparkRddTaskInfo taskInfo = new SparkRddTaskInfo();
    		Dataset<Row> groupSet = sparkSession.sql(" select partition_key from people group by partition_key");
    		List<Object> groupList = groupSet.javaRDD().collect().stream().map(row -> row.getAs("partition_key")).collect(Collectors.toList());
    		// 创建pairRDD 目前只有pairRdd支持自定义partitioner,所以需要先转成pairRdd
    		JavaPairRDD pairRDD = originSet.javaRDD().mapToPair(row -> {
    			return new Tuple2(row.getAs("partition_key"), row);
    		});
    		// 指定自定义partitioner
    		JavaRDD javaRdd = pairRDD.partitionBy(new CuxGroupPartitioner(groupList)).map(new Function<Tuple2<String, Row>, Row>(){
    			@Override
    			public Row call(Tuple2<String, Row> v1) throws Exception {
    				return v1._2;
    			}
    		});
    		Dataset<Row> result = sparkSession.createDataFrame(javaRdd, originSet.schema());
    		// 打印分区分布情况
    		taskInfo.getTask(result);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    四、Dataset 重分区并且获取分区数

            System.out.println("1-->"+rowDataset.rdd().partitions().length);
            System.out.println("1-->"+rowDataset.rdd().getNumPartitions());
            Dataset<Row> hehe = rowDataset.coalesce(1);
            System.out.println("2-->"+hehe.rdd().partitions().length);
            System.out.println("2-->"+hehe.rdd().getNumPartitions());
    
    // 让数据按照指定的key(列名)进行分区
    rowDataset.repartition(9, new Column("name"))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    运行结果:

    1-->29
    1-->29
    2-->2
    2-->2
    
    • 1
    • 2
    • 3
    • 4

    注意:在使用 repartition() 时两次打印的结果相同:

    print(rdd.getNumPartitions())
    rdd.repartition(100)
    print(rdd.getNumPartitions())
    
    • 1
    • 2
    • 3

    产生上述问题的原因有两个:
      首先 repartition() 是惰性求值操作,需要执行一个 action 操作才可以使其执行。
      其次,repartition() 操作会返回一个新的 rdd,并且新的 rdd 的分区已经修改为新的分区数,因此必须使用返回的 rdd,否则将仍在使用旧的分区。
      修改为:rdd2 = rdd.repartition(100)

    参考:repartition() is not affecting RDD partition size

    五、去重方法 dropDuplicates

      功能:对DF的数据进行去重,如果重复数据有多条,取第一条

    # 去重API dropDuplicates,无参数是对数据进行整体去重
    df.dropDuplicates().show()
    # API 同样可以针对字段进行去重,如下传入age字段,表示只要年龄一样,就认为你是重复数据
    df.dropDuplicates(['age','job']).show()
    
    • 1
    • 2
    • 3
    • 4

    来自:大数据开发 | SparkSQL 如何去重重复值?

    六、Dataset 转换为 List

    Tuple4<String, String, String, String> mySQLInfo = getMySQLInfo(configFile);
    Properties prop = new Properties();
    prop.setProperty("user", mySQLInfo._2());
    prop.setProperty("password", mySQLInfo._3());
    prop.setProperty("driver", mySQLInfo._4());
    Dataset<Row> df = spark.read().jdbc(mySQLInfo._1(), tableName, prop);
    List<String> collectAsList = df
        .selectExpr(typeId).dropDuplicates()
        .map((MapFunction<Row, String>) row -> row.mkString(","), Encoders.STRING()).collectAsList();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    List 转换为 Dataset
    // 方式一:
    JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
    // rdd 转换为 Dataset
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    Dataset<String> hehe = spark.createDataset(lineList, Encoders.STRING());
    Dataset<Row> rowDataset = sqlContext.read().option("header", "true").csv(hehe);
    
    // 方式二:
    JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
    JavaRDD<String> jsonObjectJavaRDD = javaSparkContext.parallelize(lineList);
    // rdd转换为Dataset
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    Dataset<Row> rowDataset = sqlContext.read().json(jsonObjectJavaRDD);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    七、自定义函数 UDF

    // Dataset自定义函数:时间向上取整,半小时
    spark.udf().register("timeCeil", (String field) -> {
        String[] timeSplit = field.split(":");
        // 数字字符串前补零
        DecimalFormat g1 = new DecimalFormat("00");
        String hour = timeSplit[0];
        String standard;
        // 时间向上取整:取半小时整点
        if (Integer.parseInt(timeSplit[1]) > 30) {
            hour = g1.format(Integer.parseInt(hour) + 1);
            standard = "00";
        } else {
            standard = "30";
        }
        return hour + ":" + standard + ":00";
    }, DataTypes.StringType);
    
    Dataset<Row> rowDataset = sourceData.selectExpr("Time", "timeCeil(Time) as HalfHour");
    
    效果:
    +----------+--------+--------+
    |Date      |Time    |HalfHour|
    +----------+--------+--------+
    |2023-09-13|00:30:46|00:30:00|
    |2023-09-13|00:30:51|00:30:00|
    |2023-09-13|00:30:56|00:30:00|
    |2023-09-13|00:31:01|01:00:00|
    |2023-09-13|00:31:06|01:00:00|
    |2023-09-13|00:31:11|01:00:00|
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

      补充:

    public class FunctionHandler {
    
        public static void customFunction(SparkSession spark) {
            // Dataset自定义函数:时间向上取整,一刻钟
            spark.udf().register("quarterCeil", (String field) -> {
                String[] timeSplit = field.split(":");
                // 数字字符串前补零
                DecimalFormat g1 = new DecimalFormat("00");
                String hour = timeSplit[0];
                String standard;
                // 时间向上取整:取半小时整点
                int minutes = Integer.parseInt(timeSplit[1]);
                if (minutes > 45) {
                    hour = g1.format(Integer.parseInt(hour) + 1);
                    standard = "00";
                } else if (minutes > 30) {
                    standard = "45";
                } else if (minutes > 15) {
                    standard = "30";
                } else {
                    standard = "15";
                }
                return hour + ":" + standard + ":00";
            }, DataTypes.StringType);
    
            // Dataset自定义函数:时间向上取整,半小时
            spark.udf().register("timeCeil", (String field) -> {
                String[] timeSplit = field.split(":");
                // 数字字符串前补零
                DecimalFormat g1 = new DecimalFormat("00");
                String hour = timeSplit[0];
                String standard;
                // 时间向上取整:取半小时整点
                if (Integer.parseInt(timeSplit[1]) > 30) {
                    hour = g1.format(Integer.parseInt(hour) + 1);
                    standard = "00";
                } else {
                    standard = "30";
                }
                return hour + ":" + standard + ":00";
            }, DataTypes.StringType);
    
            // Dataset自定义函数:时间向上取整,一小时
            spark.udf().register("hourCeil", (String field) -> {
                String[] timeSplit = field.split(":");
                // 数字字符串前补零
                DecimalFormat g1 = new DecimalFormat("00");
                String hour = timeSplit[0];
                // 时间向上取整:取一小时整点
                if (Integer.parseInt(timeSplit[1]) > 0) {
                    hour = g1.format(Integer.parseInt(hour) + 1);
                }
                return hour + ":00:00";
            }, DataTypes.StringType);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    八、替换函数

    Dataset<Row> rowDataset = sourceData.selectExpr("replace(split(rowKey, '_')[0], '我爱你', '点赞加个关注呗') as studentId");
    // 等价于
    Dataset<Row> rowDataset = sourceData.selectExpr("regexp_replace(split(rowKey, '_')[0], '我爱你', '点赞加个关注呗') as studentId");
    
    • 1
    • 2
    • 3

    参考:SparkSQL中常见函数

    九、na.fill用法

      DF.na.fill("NULL") 是使用 Spark DataFrame API 中的 na 方法来填充数据中的缺失值。具体地,该代码将 DataFrame 中的所有缺失值(即 null 值)都填充为字符串 NULL

      对两个数据表如A,B取JOIN操作的时候,其结果往往会出现NULL值的出现。这种情况是非常不利于后续的分析与计算的,特别是当涉及到对这个数值列进行各种聚合函数计算的时候。

      Spark 为此提供了一个高级操作,就是:na.fill 的函数。其处理过程就是先构建一个 MAP,如下:val map = Map("列名1“ -> 指定数字, "列名2“ -> 指定数字, .....),然后执行 dataframe.na.fill(map),即可实现对 NULL 值的填充。

    参考:
    Dataframe中na.fill的用法
    scala spark DF.na.fill(“NULL”) 代表啥?为啥使用后会出现数据比原来DF行数多的情况呢?

      我工作中代码是这么使用的:

            Dataset<Row> rowDataset = sourceData
                    .selectExpr("split(rowKey, '_')[0] as studentId",
                            "Date",
                            "Time",
                            "get_json_object(Heheda,'$.点个赞关注一下呗') as Hehe")
                    .na().fill("0.0");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    十、if用法

    Dataset<Row> rowDataset4 = rowDataset3.selectExpr("studentId", "Date", "if(HalfHour='23:30:00',ts+1799,ts+1800) as tsMinusHalf")
            .orderBy("studentId");
    
    Dataset<Row> rowDataset5 = rowDataset3.join(rowDataset4, rowDataset4.col("studentId").equalTo(rowDataset3.col("studentId"))
            .and(rowDataset4.col("Date").equalTo(rowDataset3.col("Date"))), "left")
            .selectExpr("studentId", "Date", 
                    "if(min_TotalPeople is null,first_TotalPeople,min_TotalPeople) as min_TotalPeople")
            .orderBy(functions.col("studentId").asc(),
                    functions.col("Date").asc());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    十一、拼接函数

            Dataset<Row> rowDataset3 = rowDataset2
                    .groupBy("studentId", "Date", "Quarter")
                    .agg(
                            functions.max("age").alias("max_age"),
                            functions.max("money").alias("max_money"))
                    .selectExpr("essId", "concat(Date, ' ', Quarter) countTime",
                            "max_age", "max_money")
                    .orderBy(functions.col("studentId").asc(),
                            functions.col("Date").asc(),
                            functions.col("Quarter").asc());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    十二、用 spark 往 OSS 上写文件

            <dependency>
                <groupId>com.aliyun.odpsgroupId>
                <artifactId>hadoop-fs-ossartifactId>
                <version>3.3.8-publicversion>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

      整合 MinIO 的话可以参考:hadoop,spark如何集成Mino

    1. 写 csv 文件

    参考:Spark 读写CSV常用配置-实例

                SparkConf sparkConf = new SparkConf()
                        .set("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
                        .set("spark.hadoop.fs.oss.accessKeyId", accessKeyId)
                        .set("spark.hadoop.fs.oss.accessKeySecret", accessKeySecret)
                        .set("spark.hadoop.fs.oss.endpoint", accountEndPoint);
    
                SparkSession spark = SparkSession
                        .builder()
                        .master("local[*]")
                        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                        .config("spark.default.parallelism", "200")
                        .config("spark.shuffle.consolidateFiles", "true")
                        .config("spark.driver.maxResultSize", "3g")
                        .config("spark.debug.maxToStringFields", "1000")
                        .config("spark.rpc.message.maxSize", "512")
                        .appName("SparkWriteZipToOSS")
                        .config(sparkConf)
                        .getOrCreate();
    
                // 读 csv 文件
                // spark
                //         .read()
                //         .format("csv")
                //         .option("header", "true")
                //         .option("delimiter", ",")
                //         .option("quote", "'")
                //         .option("nullValue", "\\N")
                //         .option("inferSchema", "true")
                //         .load("oss://" + bucketName + "/" + ossFilePath + "/hehe.csv");
    
                dataDsJoinAllDs.printSchema();
    
                // 写 csv 文件
                dataDsJoinAllDs
                        .coalesce(1)
                        .write()
                        .mode("append")
                        .option("header", "true")
                        .option("delimiter", ",")
                        .option("quote", "\"")
                        .option("escape", "\"")
                        // Linux 路径
                        // .csv("file:///opt/xiaoqiang")
                        // hdfs 路径
                        // .csv("/xiaoqiang")
                        // oss 路径
                        .csv("oss://" + bucketName + "/" + ossFilePath);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    参数说明:

    • delimiter 分隔符,默认为逗号。
    • nullValue 指定一个字符串代表 null 值
    • quote 引号字符,默认为双引号
    • header 第一行不作为数据内容,作为标题
    • inferSchema 自动推测字段类型
    2. 读写 parquet 文件

    参考:
    spark系列17: DataFrameReader读取json/parquet等格式文件详解
    Spark(三)-- SparkSQL扩展(数据读写) – 读写 Parquet、Json 格式文件(二)
    Spark大数据处理讲课笔记4.3 Spark SQL数据源 - Parquet文件

      加载和写入 Parquet 文件时,除了可以使用 load() 方法和 save() 方法外,还可以直接使用 Spark SQL 内置的 parquet() 方法:

      方式一:

                dataDsJoinAllDs
                        .coalesce(1)
                        .write()
                        .mode("append")
                        // .mode(SaveMode.Append)
                        // 写 parquet 文件
                        .parquet("oss://" + bucketName + "/" + ossFilePath);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

      方式二:

                dataDsJoinAllDs
                        .coalesce(1)
                        .write()
                        .mode(SaveMode.Overwrite)
                        .format("parquet")
                        .save("./data/parquet"); // 这个也是 hdfs 路径
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    pyspark 加载 Parquet 文件
    [root@localhost ~]# pyspark
    Python 3.8.8 (default, Apr 13 2021, 19:58:26) 
    [GCC 7.3.0] :: Anaconda, Inc. on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    24/03/23 12:38:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
          /_/
    
    Using Python version 3.8.8 (default, Apr 13 2021 19:58:26)
    
    >>> tmp = spark.read.parquet("hdfs://192.168.199.110:8020/xiaoqiang/hehe.parquet").select('name')
    >>> tmp.show(2,false)
    Traceback (most recent call last):
      File "", line 1, in 
    NameError: name 'false' is not defined
    >>> tmp.show(2,False)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    十三、用 spark 整合 redis 和 phoenix

    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.Metadata;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    public class Test {
        public static void main(String[] args) throws IOException {
            SparkConf sparkConf = new SparkConf()
                    .set("spark.redis.host", host)
                    .set("spark.redis.port", port)
                    .set("spark.redis.auth", auth)
                    .set("spark.redis.db", database)
                    .set("spark.redis.timeout", "10000");
            
            SparkSession spark = SparkSession
                    .builder()
                    .master("local[*]")
                    .config(sparkConf)
                    .getOrCreate();
            
            Map<String, String> studentMap = RedisUtil.getStudentMap(spark, studentName, configFile);
        }
    
        public static Map<String, String> getStudentMap(SparkSession spark, String studentName, String configFile) {
            StructType schema = new StructType()
                    .add("student_code", DataTypes.StringType)
                    .add("student_id", DataTypes.StringType)
                    .add("student_map", DataTypes.StringType)
                    .add("ENABLE", DataTypes.IntegerType);
            Dataset<Row> df = spark.read().format("org.apache.spark.sql.redis")
                    .option("table", "data_student_map")
                    .option("key.column", "student_name")
                    .schema(schema)  // 提供明确的 schema
                    .load();
            String condition = String.format("student_code='%s' and ENABLE=1", studentName);
            Dataset<Row> filterDs = df.filter(condition);
            if (filterDs == null || filterDs.rdd().isEmpty()) {
                Dataset<Row> columnDefaultValue = PhoenixUtil.getStudentMapDS(spark, studentName, configFile);
                columnDefaultValue.write()
                        .format("org.apache.spark.sql.redis")
                        .option("table", "data_student_map")
                        .option("key.column", "student_address")
    //                    .option("ttl", 604800) // 一周更新一次
                        .mode(SaveMode.Append)
                        .save();
                filterDs = columnDefaultValue.filter(condition);
            }
            List<String> strings = filterDs.select("student_id", "student_map")
                    .map((MapFunction<Row, String>) row -> row.mkString(","), Encoders.STRING()).collectAsList();
            return StringUtil.getStringStringMap(strings);
        }
    
        public static Dataset<Row> getStudentMapDS(SparkSession spark, String studentName, String configFile) {
            Properties properties = PropertiesUtil.getProperties(configFile);
            String zkUrl = properties.getProperty("zkUrl");
            Dataset<Row> df1 = spark.read().format("org.apache.phoenix.spark")
                    .option("zkUrl", zkUrl)
                    .option("table", "data_student_map")
                    .load();
            String condition = String.format("student_code='%s' and ENABLE=1", studentName);
            return df1.selectExpr("concat(student_code, '_', student_id) as student_ids", "student_code", "student", "student_map", "enable").filter(condition);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    十四、show

      show 的方法有五种调用方式:他们的区别就在于参数不同。

    • numRows:即要展示的行数,默认 20 行
    • truncate:取值为 boolean 类型的时候表示一个字段是否最多展示 20 个字符,默认为 true,是 int 类型就是指定展示的字符数
    // 展示所有字段
    // 当输出字段过多或者过长时,spark会默认隐藏后面的字段,不好debug,我们只需要在 show() 括号内输入 truncate = false即可,默认为true,为 true 时则隐藏
    df.show(false)
    
    // 控制输出行数
    // spark 默认输出行数为 20 行,我们只需要在 show() 括号内输入我们所想要的行数即可。比如: 展示100行数据,可以使用:
    df.show(20)
    
    // 控制输出行数并不隐藏字段,输出100行数据并且展示所有字段
    df.show(100,false)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    十五、union

      union 方法对两个字段一致的 Dataset 进行合并,返回是组合生成的新的 Dataset。类似于 Sql 的 UNION 操作。在源码中,union 和 unionAll 是相同的,没有区分。这里还有个 unionByName 这个方法,其实我们主要用的是这个,因为 union 在合并的时候只按照列合并,不会考虑两个表的列是否相对应,而 unionByName 会根据列名一一对应的合并。注意,unionByName 是 2.3.0 才开始加入的。使用实例:

    // union并集,根据列位置合并行,列数要一致
    studentDataset.select("name","age","institute").limit(3).union(studentDataset.select("name","institute","age").limit(3)).show();
    
    // unionByName 并集,根据列名合并行,不同名报错,列数要一致
    
    // except 差集
    studentDataset.except(studentDataset.limit(15)).show();
    
    // intersect交集
    studentDataset.intersect(studentDataset.limit(3)).show();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    2022最新一线大厂Java八股文合集PDF版震撼开源,堪称史上最强
    go 使用reids分布式锁
    头歌实践平台-数据结构-二叉树及其应用
    一、react简介
    花儿朵朵-全自动视频混剪,批量剪辑批量剪视频,探店带货系统,精细化顺序混剪,故事影视解说,视频处理大全,精细化顺序混剪,多场景裂变,多视频混剪
    XLSX.utils.sheet_to_json() 数字格式转为字符串格式
    JVM学习-类加载机制
    CiscoCUCM电话注册
    VSCode 1.90版本 升级需谨慎~(Python)
    衍三的硬件笔记之如何选择MOS管
  • 原文地址:https://blog.csdn.net/m0_37739193/article/details/132662326