参考博客:
Greenplum-Spark Connector 介绍_Greenplum中文社区的博客-CSDN博客
比pgload更快更方便写入大数据量至Greenplum的Greenplum-Spark Connector_秣码一盏的博客-CSDN博客
官方推荐了几种将外部数据写入Greenplum方式,分别是jdbc、pgcopy、gpdfdist、以及Pivotal Greenplum-Spark Connector,在官方文档描述中:
- package com.greenplum.spark.gsc
-
- import org.apache.log4j.Logger
- import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * @Description: TODO
- * @Author: chenweifeng
- * @Date: 2022年08月16日 下午4:00
- **/
- object GreenplumSparkTest {
- // 全局log对象
- val LOGGER = Logger.getLogger(this.getClass)
-
- def main(args: Array[String]) {
-
- val conf = new SparkConf().setAppName("gsc-greenplum-test").setMaster("local")
- val sc = new SparkContext(conf)
- sc.setLogLevel("INFO")
- val spark = SparkSession.builder().config(conf).getOrCreate()
- println("spark-version:" + spark.version)
- // spark读greenplum
- val gscReadOptionMap = Map(
- "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
- "user" -> "gpadmin",
- "password" -> "******",
- "dbschema" -> "public",
- "dbtable" -> "test_datax_gp_spark"
- )
-
- val gpdf: DataFrame = spark.read.format("greenplum")
- .options(gscReadOptionMap)
- .load()
-
- gpdf.show()
-
- // spark写greenplum
- val gscWriteOptionMap = Map(
- "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
- "user" -> "gpadmin",
- "password" -> "******",
- "dbschema" -> "public",
- "dbtable" -> "test_datax_gp_spark_w"
- )
-
- gpdf.write.format("greenplum")
- .mode(SaveMode.Append)
- .options(gscWriteOptionMap)
- .save()
-
- sc.stop()
- }
- }
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>com.greenplum.sparkgroupId>
- <artifactId>gsc-scala-testartifactId>
- <version>1.0-SNAPSHOTversion>
-
- <properties>
- <spark.version>2.4.5spark.version>
- <scala.version>2.12scala.version>
- properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.sparkgroupId>
- <artifactId>spark-core_${scala.version}artifactId>
- <version>${spark.version}version>
- dependency>
- <dependency>
- <groupId>org.apache.sparkgroupId>
- <artifactId>spark-streaming_${scala.version}artifactId>
- <version>${spark.version}version>
- dependency>
- <dependency>
- <groupId>org.apache.sparkgroupId>
- <artifactId>spark-sql_${scala.version}artifactId>
- <version>${spark.version}version>
- dependency>
- <dependency>
- <groupId>io.pivotal.greenplum.sparkgroupId>
- <artifactId>greenplum-spark_${scala.version}artifactId>
- <version>2.1.0version>
- dependency>
- <dependency>
- <groupId>com.pivotalgroupId>
- <artifactId>greenplum-jdbcartifactId>
- <version>5.1.4version>
- dependency>
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connector-javaartifactId>
- <version>8.0.27version>
- dependency>
- <dependency>
- <groupId>org.postgresqlgroupId>
- <artifactId>postgresqlartifactId>
- <version>9.3-1102-jdbc4version>
- dependency>
- dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.scala-toolsgroupId>
- <artifactId>maven-scala-pluginartifactId>
- <version>${scala.version}version>
- <executions>
- <execution>
- <goals>
- <goal>compilegoal>
- <goal>testCompilegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
- project>
mvn clean package
分别上传greenplum-spark_2.12-2.1.0.jar 和 gsc-scala-test-1.0-SNAPSHOT.jar 这两个包到 spark的bin目录下
- spark-submit \
- --class com.greenplum.spark.gsc.GreenplumSparkTest \
- --master spark://localhost:7078 \
- --jars greenplum-spark_2.12-2.1.0.jar \
- gsc-scala-test-1.0-SNAPSHOT.jar
1、Greenplum-Spark Connector驱动和Spark的版本兼容问题
目前greenplum-spark_2.12-2.1.2.jar只支持Spark2.x版本的环境,如果用Spark3.x会报错不兼容, 后续得看是否会出新的驱动支持spark3.x。