• 基于Greenplum-Spark Connector数据同步工具开发及遇到的坑


    参考博客:

    Greenplum-Spark Connector 介绍_Greenplum中文社区的博客-CSDN博客

    比pgload更快更方便写入大数据量至Greenplum的Greenplum-Spark Connector_秣码一盏的博客-CSDN博客

    1、背景

            官方推荐了几种将外部数据写入Greenplum方式,分别是jdbc、pgcopy、gpdfdist、以及Pivotal Greenplum-Spark Connector,在官方文档描述中:

    • jdbc写大数据量会很慢,也是最不推荐做大数据写入的;
    • pgcopy会比jdbc快,但是会占用master节点的资源;
    • gpdfdist不占用master资源,直接写入segment,能并行写入,但缺点是需要安装客户端,包括gpfdist等依赖;
    • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口,下面会基于该组件进行数据写入测试;下载地址(Download VMware Tanzu™ Greenplum® — VMware Tanzu Network

    2、测试代码

    2.1 核心测试类

    1. package com.greenplum.spark.gsc
    2. import org.apache.log4j.Logger
    3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    4. import org.apache.spark.{SparkConf, SparkContext}
    5. /**
    6. * @Description: TODO
    7. * @Author: chenweifeng
    8. * @Date: 2022年08月16日 下午4:00
    9. **/
    10. object GreenplumSparkTest {
    11. // 全局log对象
    12. val LOGGER = Logger.getLogger(this.getClass)
    13. def main(args: Array[String]) {
    14. val conf = new SparkConf().setAppName("gsc-greenplum-test").setMaster("local")
    15. val sc = new SparkContext(conf)
    16. sc.setLogLevel("INFO")
    17. val spark = SparkSession.builder().config(conf).getOrCreate()
    18. println("spark-version:" + spark.version)
    19. // spark读greenplum
    20. val gscReadOptionMap = Map(
    21. "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
    22. "user" -> "gpadmin",
    23. "password" -> "******",
    24. "dbschema" -> "public",
    25. "dbtable" -> "test_datax_gp_spark"
    26. )
    27. val gpdf: DataFrame = spark.read.format("greenplum")
    28. .options(gscReadOptionMap)
    29. .load()
    30. gpdf.show()
    31. // spark写greenplum
    32. val gscWriteOptionMap = Map(
    33. "url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
    34. "user" -> "gpadmin",
    35. "password" -> "******",
    36. "dbschema" -> "public",
    37. "dbtable" -> "test_datax_gp_spark_w"
    38. )
    39. gpdf.write.format("greenplum")
    40. .mode(SaveMode.Append)
    41. .options(gscWriteOptionMap)
    42. .save()
    43. sc.stop()
    44. }
    45. }

    2.2 Pom.xml

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0modelVersion>
    6. <groupId>com.greenplum.sparkgroupId>
    7. <artifactId>gsc-scala-testartifactId>
    8. <version>1.0-SNAPSHOTversion>
    9. <properties>
    10. <spark.version>2.4.5spark.version>
    11. <scala.version>2.12scala.version>
    12. properties>
    13. <dependencies>
    14. <dependency>
    15. <groupId>org.apache.sparkgroupId>
    16. <artifactId>spark-core_${scala.version}artifactId>
    17. <version>${spark.version}version>
    18. dependency>
    19. <dependency>
    20. <groupId>org.apache.sparkgroupId>
    21. <artifactId>spark-streaming_${scala.version}artifactId>
    22. <version>${spark.version}version>
    23. dependency>
    24. <dependency>
    25. <groupId>org.apache.sparkgroupId>
    26. <artifactId>spark-sql_${scala.version}artifactId>
    27. <version>${spark.version}version>
    28. dependency>
    29. <dependency>
    30. <groupId>io.pivotal.greenplum.sparkgroupId>
    31. <artifactId>greenplum-spark_${scala.version}artifactId>
    32. <version>2.1.0version>
    33. dependency>
    34. <dependency>
    35. <groupId>com.pivotalgroupId>
    36. <artifactId>greenplum-jdbcartifactId>
    37. <version>5.1.4version>
    38. dependency>
    39. <dependency>
    40. <groupId>mysqlgroupId>
    41. <artifactId>mysql-connector-javaartifactId>
    42. <version>8.0.27version>
    43. dependency>
    44. <dependency>
    45. <groupId>org.postgresqlgroupId>
    46. <artifactId>postgresqlartifactId>
    47. <version>9.3-1102-jdbc4version>
    48. dependency>
    49. dependencies>
    50. <build>
    51. <plugins>
    52. <plugin>
    53. <groupId>org.scala-toolsgroupId>
    54. <artifactId>maven-scala-pluginartifactId>
    55. <version>${scala.version}version>
    56. <executions>
    57. <execution>
    58. <goals>
    59. <goal>compilegoal>
    60. <goal>testCompilegoal>
    61. goals>
    62. execution>
    63. executions>
    64. plugin>
    65. plugins>
    66. build>
    67. project>

    3 打包部署

    3.1 打包

    mvn clean package

    3.2 上传包

    分别上传greenplum-spark_2.12-2.1.0.jar 和 gsc-scala-test-1.0-SNAPSHOT.jar 这两个包到 spark的bin目录下

    3.3 提交Spark任务执行命令

    1. spark-submit \
    2. --class com.greenplum.spark.gsc.GreenplumSparkTest \
    3. --master spark://localhost:7078 \
    4. --jars greenplum-spark_2.12-2.1.0.jar \
    5. gsc-scala-test-1.0-SNAPSHOT.jar

    遇到的坑

    1、Greenplum-Spark Connector驱动和Spark的版本兼容问题

    目前greenplum-spark_2.12-2.1.2.jar只支持Spark2.x版本的环境,如果用Spark3.x会报错不兼容, 后续得看是否会出新的驱动支持spark3.x。

  • 相关阅读:
    基于单片机双路压力监测报警系统
    0.泛型基础学习
    Vue里面怎么使用站点地图Sitemap做SEO
    Halcon 常用通道Scale灰度元操作整理
    SpringBoot整合redis
    CAS虚拟化平台Linux虚拟机安装vGPU显卡驱动并获取许可
    python编写小游戏之三入最最简陋简单贪食蛇编写
    【python笔记】第九节 函数进阶
    打印机共享设置步骤
    开源规则引擎LiteFlow项目应用实践
  • 原文地址:https://blog.csdn.net/Carson073/article/details/126500498