角色 | IP | 配置 | 备注 |
---|---|---|---|
FE | 192.168.110.170;192.168.110.171;192.168.110.171 | 3follower | 测试可一个FE |
BE | 192.168.110.170;192.168.110.171;192.168.110.171 | 3be | 测试可一个BE |
broker | 192.168.110.170;192.168.110.171;192.168.110.171 | 3broker | 测试可不用broker |
安装模式
docker 部署
# 拉取镜像
docker pull postgres
# 启动postgresql
docker run --name mypostgres -d -p 5432:5432 -e POSTGRES_PASSWORD=123456 postgres
配置修改
# 进入容器
docker exec -it mypostgres /bin/bash
# 下载vim命令
apt-get update
apt-get install vim
# 寻找配置文件路径
find / -name postgresql.conf
# 编辑配置文件
vi /var/lib/postgresql/data/postgresql.conf
修改以下配置
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必须更改的,其它参数选着性更改
更改配置文件postgresql.conf完成,需要重启pg服务生效
重启服务
# 退出容器后
docker restart mypostgres
创建pg用户,给予复制流权限
-- pg新建用户
CREATE USER pgcdc WITH PASSWORD '123456';
-- 给用户复制流权限
ALTER ROLE pgcdc replication;
-- 创建数据库
create database test;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to pgcdc;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO pgcdc;
发布表
--创建表
CREATE TABLE testcdc(
ID INT PRIMARY KEY NOT NULL,
DEPT CHAR(50) NOT NULL,
EMP_ID INT NOT NULL
);
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
更改表的复制标识包含更新和删除的值
-- 更改复制标识包含更新和删除之前值
ALTER TABLE testcdc REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='testcdc';
OK,到这一步,设置已经完全可以啦,上面步骤都是必须的
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
<scala.binary.version>2.11scala.binary.version>
<debezium.version>1.5.4.Finaldebezium.version>
<flink.version>1.13.6flink.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_2.11artifactId>
<version>1.13.6version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.11artifactId>
<version>1.13.6version>
dependency>
<dependency>
<groupId>com.alibaba.ververicagroupId>
<artifactId>flink-connector-postgres-cdcartifactId>
<version>1.4.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>com.starrocksgroupId>
<artifactId>flink-connector-starrocksartifactId>
<version>1.2.2_flink-1.13_2.11version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-cep_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-rabbitmq_2.11artifactId>
<version>1.13.6version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.75version>
dependency>
dependencies>
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PgsqlToStarRocksTest {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 获取flink流环境变量
StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
exeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
exeEnv.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata");
exeEnv.setStateBackend(stateBackend);
// exeEnv.setDefaultSavepointDirectory();
exeEnv.setParallelism(2);
// 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings);
String sourceDDL =
"CREATE TABLE pgsql_source (\n" +
" id int,\n" +
" dept STRING,\n" +
" emp_id int\n" +
") WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = '192.168.110.13',\n" +
" 'port' = '5432',\n" +
" 'username' = 'pgcdc',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test',\n" +
" 'schema-name' = 'public',\n" +
" 'debezium.snapshot.mode' = 'never',\n" +
" 'decoding.plugin.name' = 'pgoutput',\n" +
" 'debezium.slot.name' = 'testcdc',\n" +
" 'table-name' = 'testcdc'\n" +
")";
String sinkDDL =
"CREATE TABLE sr_sink (\n" +
"id int ," +
"dept string," +
"emp_id int," +
"PRIMARY KEY (id) " +
"NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='jdbc:mysql://192.168.110.170:9036,192.168.110.171:9036,192.168.110.172:9036'," +
"'load-url'='192.168.110.170:8036;192.168.110.171:8036;192.168.110.172:8036'," +
"'database-name' = 'test'," +
"'table-name' = 'testcdc'," +
"'username' = 'root'," +
"'password' = ''," +
"'sink.properties.column_separator' = '\\x01'," +
"'sink.properties.row_delimiter' = '\\x02'" +
")";
String transformSQL =
"INSERT INTO sr_sink select * from pgsql_source";
// 执行source表ddl
tableEnv.executeSql(sourceDDL);
// String selectSQL = "select * from pgsql_source";
// Table table = tableEnv.e(selectSQL);
tableEnv.executeSql(sinkDDL);
// String transformSQL = "select * from pgsql_source";
TableResult tableResult = tableEnv.executeSql(transformSQL);
tableResult.print();
}
}
starrocks
CREATE TABLE `testcdc` (
`id` int(11) NOT NULL DEFAULT "-1" COMMENT "",
`dept` varchar(65533) NULL COMMENT "",
`emp_id` int(11) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false"
);
pg
CREATE TABLE testcdc(
ID INT PRIMARY KEY NOT NULL,
DEPT CHAR(50) NOT NULL,
EMP_ID INT NOT NULL
);