- # 1、将依赖包上传到flink的lib目录下
- flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar
-
- # 2、重启flink集群
- yarn application -list
- yarn application -kill application_1699579932721_0003
- yarn-session.sh -d
-
- # 3、重新进入sql命令行
- sql-client.sh
catalog(元数据) ---> database ---> table ---> 数据 --- > 列
- -- 1、开启hive的元数据服务
- nohup hive --service metastore &
-
- -- 2、创建hive catalog
- CREATE CATALOG myhive WITH (
- 'type' = 'hive',
- 'hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf'
- );
-
- -- 查看所有的catalog
- -- default_catalog: 默认的元数据,将元数据保存在内存中
- show catalogs;
-
- --3、切换catalog
- use catalog myhive;
-
- --4、在flink中就可以使用hive中已经创建好的表
- select * from student;
- -- 可以从catalog开始定位一张表
- select * from myhive.`default`.student;
-
-
- -- 将flink的表结构保存到hive catalog中
- -- hive中可以看到flink创建的流表,但是在hive中不能查询flink的流表
- create database flink;
- use flink;
- -- 创建flink动态表
- CREATE TABLE students_kafka (
- `offset` BIGINT METADATA VIRTUAL, -- 偏移量
- `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
- sid STRING,
- name STRING,
- age INT,
- sex STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students', -- 数据的topic
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
- 'properties.group.id' = 'testGroup', -- 消费者组
- 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
- 'format' = 'csv' -- 读取数据的格式
- );
在Flink中的使用hive中的函数:
- -- 加载hive函数
- LOAD MODULE hive WITH ('hive-version' = '3.1.2');
-
- -- 使用hive的函数
- select split('java,spark',',');