在Google的云平台中,我创建了一个Cloud SQL的Postgresql实例,在上面保存了一些业务数据。现在需要定期把这些数据同步到Bigquery数据仓库中,这样我们就能在Bigquery上进行数据的后续分析处理,生成数据报表。
Google提供了一个Datastream的服务,通过CDC(Capture data change)的方式,把Cloudsql数据库的改动,例如增删更新等操作,同步到Bigquery的数据集。以下将介绍如何设置Datastream来完成。
因为我的Cloudsql实例没有暴露公网IP,因此我们需要设置VPC peering的方式,把Datastream的VPC和我的GCP项目的VPC网络连接起来。另外Cloudsql是在一个单独的Service network中,我们还需要通过反向代理的方式来连接到cloudsql
在Datastream的Private Connectivity里面,新建一个连接profile。在profile里面我们需要设置VPC network,这个就是我们项目当前所在的VPC网络。然后需要分配一个IP地址段给Datastream来起一个子网。这个IP地址段不能是已分配的IP地址段,并且至少要具备/29的地址。
在VPC网络的防火墙设置里面,新增两条规则,分别对应ingress和egress。其中target需要输入我们刚才分配的地址段,然后开放TCP:5432端口。
在VPC网络里面设置一台VM,然后运行以下代码的脚本,设置一个反向代理
- #! /bin/bash
-
- export DB_ADDR=[IP]
- export DB_PORT=[PORT]
-
- export ETH_NAME=$(ip -o link show | awk -F': ' '{print $2}' | grep -v lo)
-
- export LOCAL_IP_ADDR=$(ip -4 addr show $ETH_NAME | grep -Po 'inet \K[\d.]+')
-
- echo 1 > /proc/sys/net/ipv4/ip_forward
- iptables -t nat -A PREROUTING -p tcp -m tcp --dport $DB_PORT -j DNAT \
- --to-destination $DB_ADDR:$DB_PORT
- iptables -t nat -A POSTROUTING -j SNAT --to-source $LOCAL_IP_ADDR
这里的DB_ADDR,DB_PORT填写CloudSQL的PG数据库的地址和端口
连接到PG数据库,创建一个publication和replication slot
以下命令将赋予用户创建replication的role
ALTER USER USER_NAME WITH REPLICATION;
创建一个publication,这里假设我们要复制public这个schma的test表,那么把以下的SCHEMA1替换为public,把TABLE1替换为test
CREATE PUBLICATION PUBLICATION_NAME FOR TABLE SCHEMA1.TABLE1, SCHEMA2.TABLE2;
创建一个replication slot
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('REPLICATION_SLOT_NAME', 'pgoutput');
最后就可以设置一个stream了,这里连接数据库的地址需要输入刚才我们设置的反向代理的VM的地址和端口,以及我们之前配置的PG的publication和replication slot的name。最后就可以成功运行了。我们可以测试一下,在PG数据表里面做相应的改动,然后在Bigquery的数据表里面等待一会儿就可以看到数据能同步过去了。