• Google云平台构建数据ETL任务的最佳实践


    数据处理中,我们经常需要构建ETL的任务,对数据进行加载,转换处理后再写入到数据存储中。Google的云平台提供了多种方案来构建ETL任务,我也研究了一下这些方案,比较方案之间的优缺点,从而找到一个最适合我业务场景的方案。

    假设我们的业务场景需要定期从Kafka中获取数据,经过一些数据清洗,数据关联,数据Enrich操作之后,把数据写入到Bigquery数据仓库,从而方便以后生成统计分析报表。

    Google云平台提供了几个方案来完成这个任务:

    1. Datafusion,通过在UI界面设计ETL pipeline,然后把Pipeline转换为Spark应用,部署在Dataproc上运行。

    2. 编写Spark应用代码,然后在Dataproc上运行或者在K8S集群上通过Spark operator来调度执行。

    3. 编写Apache Beam代码,通过Dataflow runner在VM上执行任务。

    方案一的优点是基本不需要编写代码,在图形界面上即可完成Pipeline的设计。缺点是如果有一些额外的需求可能不太方便实现,另外最主要是太贵。Datafusion需要单独部署在一个Instance上24小时运行,这个Instance企业版的收费大概一小时要几美元。另外Pipeline运行的时候会调度Dataproc的Instance,这里会产生额外的费用。

    方案二的优点是可以灵活的通过Spark代码来完成各种需求。缺点也是比较贵,因为Dataproc是基于Hadoop集群的,需要有Zookeeper, driver和executor这几个VM。如果采用K8S集群,则Spark operator也是需要单独24小时运行在一个pod上,另外还有额外的driver, executor的Pod需要调度执行。

    方案三是综合考虑最优的方案,因为Beam的代码是提供了一个通用的流批处理框架,可以运行在Spark,Flink,Dataflow等引擎上,而Dataflow是Google提供的一个优秀的引擎,在运行任务时,Dataflow按需调度VM来运行,只收取运行时的费用。

    因此,对于我的这个业务场景,使用方案三是最合适的。下面我将介绍一下整个实现的过程。

    Beam批处理任务的实现

    在Dataflow的官方Template里面,有一个消费Kafka数据写入到Bigquery的例子,但是这个是流处理方式实现的,对于我的业务场景来说,并不需要这么实时的处理数据,只需要定期消费即可,因此用批处理的方式更合适,这样也能大幅节约费用。

    Beam的Kafka I/O connector是默认处理的数据是无边界的,即流式数据。要以批处理的方式来处理,需要调用withStartReadTime和withStopReadTime两个方法获取要读取的Kafka topic的start和end offset,这样就可以把数据转换为有边界数据。调用这两个方法需要注意的是,如果Kafka没有任何一条消息的时间戳是大于等于这个时间戳的话,那么会报错,因此我们需要确定一下具体的时间戳。

    以下的代码是检查Kafka消息的所有分区是否存在消息的时间戳是大于我们指定的时间戳,如果不存在的话,那么我们需要找出这些分区里面的最晚时间戳里面的最早的一个。例如Topic有3个分区,要指定的时间戳是1697289783000,但是3个分区里面的所有消息都小于这个时间戳,因此我们需要分别找出每个分区里面的消息的最晚的时间戳,然后取这3个分区的最晚时间戳里面最早的那个,作为我们的指定时间戳。

    1. public class CheckKafkaMsgTimestamp {
    2. private static final Logger LOG = LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);
    3. public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {
    4. long max_timestamp = stopTimestamp;
    5. long max_records = 5L;
    6. Properties props = new Properties();
    7. props.setProperty("bootstrap.servers", bootstrapServer);
    8. props.setProperty("group.id", "test");
    9. props.setProperty("enable.auto.commit", "true");
    10. props.setProperty("auto.commit.interval.ms", "1000");
    11. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    12. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    13. KafkaConsumer consumer = new KafkaConsumer<>(props);
    14. // Get all the partitions of the topic
    15. int partition_num = consumer.partitionsFor(topic).size();
    16. HashMap search_map = new HashMap<>();
    17. ArrayList tp = new ArrayList<>();
    18. for (int i=0;i
    19. search_map.put(new TopicPartition(topic, i), stopTimestamp);
    20. tp.add(new TopicPartition(topic, i));
    21. }
    22. // Check if message exist with timestamp greater than search timestamp
    23. Boolean flag = true;
    24. ArrayList selected_tp = new ArrayList<>();
    25. //LOG.info("Start to check the timestamp {}", stopTimestamp);
    26. Map results = consumer.offsetsForTimes(search_map);
    27. for (Map.Entry entry : results.entrySet()) {
    28. OffsetAndTimestamp value = entry.getValue();
    29. if (value==null) { //there is at least one partition don't have timestamp greater or equal to the stopTime
    30. flag = false;
    31. break;
    32. }
    33. }
    34. // Get the latest timestamp of all partitions if the above check result is false
    35. // Note the timestamp is the earliest of all the partitions.
    36. if (!flag) {
    37. max_timestamp = 0L;
    38. consumer.assign(tp);
    39. Map endoffsets = consumer.endOffsets(tp);
    40. for (Map.Entry entry : endoffsets.entrySet()) {
    41. Long temp_timestamp = 0L;
    42. int record_count = 0;
    43. TopicPartition t = entry.getKey();
    44. long offset = entry.getValue();
    45. if (offset < 1) {
    46. LOG.warn("Can not get max_timestamp as partition has no record!");
    47. continue;
    48. }
    49. consumer.assign(Arrays.asList(t));
    50. consumer.seek(t, offset>max_records?offset-5:0);
    51. Iterator> records = consumer.poll(Duration.ofSeconds(2)).iterator();
    52. while (records.hasNext()) {
    53. record_count++;
    54. ConsumerRecord record = records.next();
    55. LOG.info("Topic: {}, Record Timestamp: {}, recordcount: {}", t, record.timestamp(), record_count);
    56. if (temp_timestamp == 0L || record.timestamp() > temp_timestamp) {
    57. temp_timestamp = record.timestamp();
    58. }
    59. }
    60. //LOG.info("Record count: {}", record_count);
    61. if (temp_timestamp > 0L && temp_timestamp > startTimestamp) {
    62. if (max_timestamp == 0L || max_timestamp > temp_timestamp) {
    63. max_timestamp = temp_timestamp;
    64. }
    65. selected_tp.add(t);
    66. LOG.info("Temp_timestamp {}", temp_timestamp);
    67. LOG.info("Selected topic partition {}", t);
    68. LOG.info("Partition offset {}", consumer.position(t));
    69. //consumer.seek(t, -1L);
    70. }
    71. }
    72. } else {
    73. selected_tp = tp;
    74. }
    75. consumer.close();
    76. LOG.info("Max Timestamp: {}", max_timestamp);
    77. return new KafkaResult(max_timestamp, selected_tp);
    78. }
    79. }

    调用以上代码,我们可以获取要选择的分区以及对应的时间戳。利用这两个信息,我们就可以把指定时间范围内的Kafka数据转换为有边界数据了。以下是Beam建立Pipeline并处理数据,然后写入到Bigquery的代码:

    1. KafkaResult checkResult = CheckKafkaMsgTimestamp.getTimestamp(options.getBootstrapServer(), options.getInputTopic(), start_read_time, stop_read_time);
    2. stop_read_time = checkResult.max_timestamp;
    3. ArrayList selected_tp = checkResult.selected_tp;
    4. PCollection input = pipeline
    5. .apply("Read messages from Kafka",
    6. KafkaIO.read()
    7. .withBootstrapServers(options.getBootstrapServer())
    8. .withKeyDeserializer(StringDeserializer.class)
    9. .withValueDeserializer(StringDeserializer.class)
    10. .withConsumerConfigUpdates(ImmutableMap.of("group.id", "telematics_statistic.app", "enable.auto.commit", true))
    11. .withStartReadTime(Instant.ofEpochMilli(start_read_time))
    12. .withStopReadTime(Instant.ofEpochMilli(stop_read_time))
    13. .withTopicPartitions(selected_tp)
    14. .withoutMetadata())
    15. .apply("Get message contents", Values.create());
    16. PCollectionTuple msgTuple = input
    17. .apply("Filter message", ParDo.of(new DoFn() {
    18. @ProcessElement
    19. public void processElement(@Element String element, MultiOutputReceiver out) {
    20. TelematicsStatisticsMsg msg = GSON.fromJson(element, TelematicsStatisticsMsg.class);
    21. if (msg.timestamp==0 || msg.vin==null) {
    22. out.get(otherMsgTag).output(element);
    23. } else {
    24. if (msg.timestamp=stop_process_time) {
    25. out.get(otherMsgTag).output(element);
    26. } else {
    27. out.get(statisticsMsgTag).output(msg);
    28. }
    29. }
    30. }
    31. })
    32. .withOutputTags(statisticsMsgTag, TupleTagList.of(otherMsgTag)));
    33. // Get the filter out msg
    34. PCollection statisticsMsg = msgTuple.get(statisticsMsgTag);
    35. // Save the raw records to Bigquery
    36. statisticsMsg
    37. .apply("Convert raw records to BigQuery TableRow", MapElements.into(TypeDescriptor.of(TableRow.class))
    38. .via(TelematicsStatisticsMsg -> new TableRow()
    39. .set("timestamp", Instant.ofEpochMilli(TelematicsStatisticsMsg.timestamp).toString())
    40. .set("vin", TelematicsStatisticsMsg.vin)
    41. .set("service", TelematicsStatisticsMsg.service)
    42. .set("type", TelematicsStatisticsMsg.messageType)))
    43. .apply("Save raw records to BigQuery", BigQueryIO.writeTableRows()
    44. .to(options.getStatisticsOutputTable())
    45. .withSchema(new TableSchema().setFields(Arrays.asList(
    46. new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
    47. new TableFieldSchema().setName("vin").setType("STRING"),
    48. new TableFieldSchema().setName("service").setType("STRING"),
    49. new TableFieldSchema().setName("type").setType("STRING"))))
    50. .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    51. .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    52. PipelineResult result = pipeline.run();
    53. try {
    54. result.getState();
    55. result.waitUntilFinish();
    56. } catch (UnsupportedOperationException e) {
    57. // do nothing
    58. } catch (Exception e) {
    59. e.printStackTrace();
    60. }

    需要注意的是,每次处理任务完成后,我们需要把当前的stopReadTime记录下来,下次任务运行的时候把这个时间戳作为startReadTime,这样可以避免某些情况下的数据缺失读取的问题。这个时间戳我们可以把其记录在GCS的bucket里面。这里略过这部分代码。

    提交Dataflow任务

    之后我们就可以调用Google的Cloud Build功能来把代码打包为Flex Template

    首先在Java项目中运行mvn clean package,打包jar文件

    然后在命令行中设置以下环境变量:

    1. export TEMPLATE_PATH="gs://[your project ID]/dataflow/templates/telematics-pipeline.json"
    2. export TEMPLATE_IMAGE="gcr.io/[your project ID]/telematics-pipeline:latest"
    3. export REGION="us-west1"

    之后运行gcloud build的命令来构建镜像:

    gcloud dataflow flex-template build $TEMPLATE_PATH --image-gcr-path "$TEMPLATE_IMAGE" --sdk-language "JAVA" --flex-template-base-image "gcr.io/dataflow-templates-base/java17-template-launcher-base:20230308_RC00" --jar "target/telematics-pipeline-1.0-SNAPSHOT.jar" --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="com.example.TelematicsBatch"

    最后就可以调用命令来提交任务执行了:

    gcloud dataflow flex-template run "analytics-pipeline-`date +%Y%m%d-%H%M%S`" --template-file-gcs-location "$TEMPLATE_PATH" --region "us-west1" --parameters ^~^bootstrapServer="kafka-1:9094,kafka-2:9094"~statisticsOutputTable="youprojectid:dataset.tablename"~serviceAccount="xxx@projectid.iam.gserviceaccount.com"~region="us-west1"~usePublicIps=false~runner=DataflowRunner~subnetwork="XXXX"~tempLocation=gs://bucketname/temp/~startTime=1693530000000~stopTime=1697216400000~processStartTime=1693530000000~processStopTime=1697216400000

    如果我们需要任务自动定期执行,还可以在dataflow里面import一个Pipeline,用之前指定的Template_path来导入。然后设置任务的定期周期和启动时间即可,非常方便。

  • 相关阅读:
    React--props细节知识
    【小白专用 已验证24.5.30】ThinkPHP6 视图
    一篇博客带你轻松应对java面试中的多线程与高并发
    Java笔试题_输入日期后输出日历的程序
    企业纷纷选择数字化转型,数字化转型给企业带来了哪些提升?
    【数据结构】笔试面试中常用的数据结构操作
    Redis性能管理及主从复制、哨兵的配置与部署
    SwiftUI 导航设置
    计算机网络-数据链路层(流量控制与可靠传输机制(停止等待协议、滑动窗口协议(GBN,SR)))
    Python 实训教学,更便捷的学生邀请及内容分发|ModelWhale 版本更新
  • 原文地址:https://blog.csdn.net/gzroy/article/details/133827729