• Google Data Fusion构建数据ETL任务


    Google云平台提供了一个Data Fusion的产品,是基于开源的CDAP做的一个图形化的编辑工具,可以很方便的来完成数据处理的任务,而无需编写代码。假设我们现在要构建一个ETL的任务,从Kafka中消费一些数据,经过处理之后把数据存放到Bigquery中。

    首先我们要准备一些测试数据发送到Kafka。这里我是在GKE的环境中起了一个Kafka的pod,然后往test topic发送了一些简单的JSON格式的消息。

    创建Data Fusion Instance

    在GCP的console页面中打开Data Fusion,选择Create an instance,在配置页面中,版本我没有选择最新的6.9.2版本,因为发现这个版本在解析JSON格式会有问题,我选择的是6.8.3。然后在Advanced Option里面,我选择了Enable Private IP,因为我的Kafka是没有暴露对外的公网IP,因此Data Fusion Instance也只能用Private IP来和Kafka进行通讯。在Associating Network里面,选择需要在哪个VPC里面来分配这个Private IP。之后点击Create就可以创建一个Instance了。

    VPC Network Peering

    Instance的创建需要等待一段时间,等完成之后,点击Instance的名字,我们可以看到相关的信息,拷贝Instance的Tenant project ID。然后去到VPC Network里面,建立一个VPC network peering。因为Data Fusion需要把任务安排在Dataproc集群上运行,这个集群是运行在单独的网络中,如果要和我们的Kafka通讯,需要和Kafka所在的VPC网络建立一个peering。选择create peering connection,在里面的Your VPC network里面,输入Kafka所在的VPC network,在peered network里面选择in another project,输入刚才拷贝的Tenant project ID,勾选Exchange IPv4 custom routes里面的Export custom routes,然后点击create进行创建。

    防火墙规则设置

    因为我的VPC network里面的防火墙规则,出于安全考虑,把default-allow-internal这条规则给删掉了。但是Dataproc的集群里面的VM需要互相之间能进行通讯,因此我们需要加上一条规则。在VPC network的Firewall rule里面,添加一条规则,其中Direction选择ingress,Sources和Targets里面我们都设置为Tag,tag的名称是Dataproc,这个tag将在稍后Data Fusion的compute profile里面设置。

    设置ETL Pipeline

    在Data Fusion的Instances列表里面,选择View instance打开刚才我们创建的Instance,然后点击Wrangler,在里面选择Add connection,然后选择Kafka Connection,在Kafka Brokers里面输入地址,例如10.0.0.100:9094,然后点击Test connection。如果之前的网络设置都完成的话,应该是能正确连接的。之后Wrangler就会打开这个Kafka连接,然后可以看到这个Kafka里面的所有的topic。点击我们要测试的那个topic,就可以看到里面已有的数据,然后我们可以点击左上角Message的小箭头,在下拉菜单中选择Parse as JSON,这样子就可以直接把JSON里面的字段解析为对应的字段了,在右边的界面列出了解析出的对应字段,这里我需要修改一下字段的名称,使得字段名和我之后创建Bigquery的表里面的字段名对应。

    数据解析没有问题之后,可以在直接点击右上角的create pipeline来创建一个任务了。

    在新打开的Pipeline的编辑窗口中,我们可以看到目前有两个步骤存在,一个是Kafka connection,另一个是Wrangler。我们需要再增加一个步骤,把Wrangler解析的数据保存到Bigquery里面。在左边菜单的Sink里面,点击Bigquery,然后把Wrangler框的箭头拖动连接到这个新加的Bigquery。点击Bigquery的properties,选择Use connection,然后选择之前已经创建好的bigquery的数据集和数据表即可。主要Wrangler输出的字段名需要和数据表的字段名匹配。

    Pipeline设置好之后,我们就可以选择Deploy来部署了。

    设置Compute Profile

    点击Data Fusion右上角的System admin,然后在Configuration里面的System compute profiles里面,我们可以新建一个profile。

    在Profile里面我们可以设置要部署的Dataproc集群的机器的配置。在General setting的subnet里面,我需要设置正确的subnet,因为在我的VPC网络里面有多个subnet,不同的subnet的策略是不同的。我需要设置kafka所在的那个subnet。在Cluster metadata的network tag里面,输入我们在配置防火墙规则是设定的tag Dataproc。这样就可以让我们的cluster能应用到那条规则。

    把新建的profile设置为default之后,我们就可以运行Pipeline了。点击界面上的Run即可。打开Log,我们可以看到整个Pipeline运行的情况。整个Pipeline成功运行完毕之后,我们可以打开Bigquery的对应的数据表,可以看到数据能成功的从kafka消费之后写入Bigquery的表中。

  • 相关阅读:
    AndroidAuto PCTS A118解决杂音问题
    Morris遍历
    快速搭建Springboot项目(一)
    STM32硬件调试器不一定准确,proteus不一定准确
    Excel大量表格选择,快速定位表格
    LeNet网络复现
    【文件IO】文件系统的操作 流对象 字节流(Reader/Writer)和字符流 (InputStream/OutputStream)的用法
    CMake Tutorial 巡礼(8)_为测试白板添加支持
    Docker系列第05部分:实战部署应用全流程
    计算机图形学(八)-纹理映射、计算重心坐标、UV插值、双线性插值、MipMap
  • 原文地址:https://blog.csdn.net/gzroy/article/details/132858770