安装flink 1.16.2
将以下jar包放到/data/cmpt/flink-1.16.2/lib 目录下
- antlr-runtime-3.5.2.jar
- flink-connector-hive_2.12-1.16.2.jar
- flink-connector-jdbc-1.16.2.jar
- mysql-connector-java-6.0.6.jar
- hive-exec-3.1.3.jar
- libfb303-0.9.3.jar
将flink-table-planner_2.12-1.16.2.jar 移动到/data/cmpt/flink-1.16.2/lib目录下
mv /data/cmpt/flink-1.16.2/opt/flink-table-planner_2.12-1.16.2.jar /data/cmpt/flink-1.16.2/lib/
将flink-table-loader_2.12-1.16.2.jar 删除
修改flink配置
- jobmanager.rpc.address: localhost
- jobmanager.rpc.port: 6123
- jobmanager.bind-host: localhost
- jobmanager.memory.process.size: 6g
- jobmanager.execution.failover-strategy: region
-
- taskmanager.bind-host: localhost
- taskmanager.host: localhost
- taskmanager.memory.process.size: 7g
- taskmanager.numberOfTaskSlots: 50
- parallelism.default: 1
-
- rest.port: 8080
- rest.address: 0.0.0.0
- rest.bind-port: 8080-8090
- rest.bind-address: 0.0.0.0
-
- web.submit.enable: false
- web.cancel.enable: false
启动flink
- ./start-cluster.sh
- ./sql-client.sh
mysql中建表
- CREATE TABLE `mysql_user` (
- `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
- `age` int(11) NULL DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
flink sql 建立mysql外部表
- CREATE TABLE t_user (
- `id` varchar(32),
- `name` varchar(255),
- `age` int
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://10.16.60.31:3306/datasets?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
- 'table-name' = 'mysql_user', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- )
flink sql插入数据
- Flink SQL> insert into t_user values('A1', 'name_1', 10);
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: 5d0ea93304314d3ca20b240b150bd0ad
mysql中查看数据
- mysql> select * from mysql_user;
- +----+--------+------+
- | id | name | age |
- +----+--------+------+
- | A1 | name_1 | 10 |
- +----+--------+------+
- 1 row in set (0.00 sec)
创建mysql_catalog
- Flink SQL> CREATE CATALOG mysql_catalog WITH(
- > 'type' = 'jdbc',
- > 'default-database' = 'datasets',
- > 'username' = 'root',
- > 'password' = '123456',
- > 'base-url' = 'jdbc:mysql://10.16.60.31:3306'
- > );
- Flink SQL> use catalog mysql_catalog;
- Flink SQL> show databases;
- +---------------+
- | database name |
- +---------------+
- | datasets |
- | dataxweb |
- | dinky |
- | hivemeta |
- +---------------+
- 4 rows in set
- Flink SQL> use datasets;
- Flink SQL> show tables;
- +---------------------+
- | table name |
- +---------------------+
- | uk_price_paid |
- | user_behavior |
- | user_behavior_mysql |
- +---------------------+
- 3 rows in set
- Flink SQL> select * from uk_price_paid limit 10;

默认输出结果模式是表模式table。
设置输出结果模式
- SET 'sql-client.execution.result-mode' = 'table';
- SET 'sql-client.execution.result-mode' = 'changelog';
- SET 'sql-client.execution.result-mode' = 'tableau';
- Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
- Flink SQL> select * from uk_price_paid limit 10;

hive中建表
- hive (datasets)> CREATE TABLE `hive_user` (
- > `id` varchar(32),
- > `name` varchar(255),
- > `age` int
- > );
hive中插入数据(hive不支持记录级别的数据插入和更新,插入一条记录需要几秒到几十秒,生产环境不这样使用,这里只是为了演示)
hive (datasets)> insert into hive_user values('A1', 'name_1', 10);
hive中查询
- hive (datasets)> select * from hive_user;
- OK
- hive_user.id hive_user.name hive_user.age
- A1 name_1 10
- Time taken: 0.205 seconds, Fetched: 1 row(s)
flinksql 中创建catalog
- Flink SQL> CREATE CATALOG hive_catalog WITH (
- > 'type' = 'hive',
- > 'default-database' = 'datasets',
- > 'hive-conf-dir' = '/data/olap/hive-3.1.3/conf'
- > );
- Flink SQL> show catalogs;
- +-----------------+
- | catalog name |
- +-----------------+
- | default_catalog |
- | hive_catalog |
- +-----------------+
- 2 rows in set
-
- Flink SQL> use catalog hive_catalog;
- Flink SQL> select * from hive_user;

flinksql中插入数据
- Flink SQL> insert into hive_user values('A2', 'name_2', 20);
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: 01cd773a57997afe9e43cd91fccad99a
- Flink SQL> select * from hive_user;

hive数据更新
- hive (datasets)> select * from hive_user;
- OK
- hive_user.id hive_user.name hive_user.age
- A1 name_1 10
- A2 name_2 20
- Time taken: 0.113 seconds, Fetched: 2 row(s)