• Flink SQL(四) 连接到外部系统Elasticsearch和HBase


    Elasticsearch

    ​ Elasticsearch 作为分布式搜索分析引擎,在大数据应用中有非常多的场景。Flink 提供的 Elasticsearch的SQL连接器只能作为TableSink,可以将表数据写入Elasticsearch的索引(index)。 Elasticsearch 连接器的使用与 JDBC 连接器非常相似,写入数据的模式同样是由创建表的 DDL 中是否有主键定义决定的。

    1. 引入依赖

    想要在 Flink 程序中使用 Elasticsearch 连接器,需要引入对应的依赖。具体的依赖与 Elasticsearch 服务器的版本有关,对于 6.x 版本引入依赖如下:

    <dependency>
     <groupId>org.apache.flinkgroupId> 
    <artifactId>flink-connector-elasticsearch6_${scala.binary.version}artifactId>
    <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    对于 Elasticsearch 7 以上的版本,引入的依赖则是:

    <dependency>
     <groupId>org.apache.flinkgroupId> 
    <artifactId>flink-connector-elasticsearch7_${scala.binary.version}artifactId>
    <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 创建连接到 Elasticsearch 的表

    创建 Elasticsearch 表的方法与 JDBC 表基本一致。下面是一个具体示例:

    -- 创建一张连接到 Elasticsearch 的 表
    CREATE TABLE MyTable (
     user_id STRING,
     user_name STRING
     uv BIGINT,
     pv BIGINT,
     PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'users'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    这里定义了主键,所以会以更新插入(Upsert)模式向 Elasticsearch 写入数据。

    HBase

    ​ 作为高性能、可伸缩的分布式列存储数据库,HBase 在大数据分析中是一个非常重要的工 具。Flink 提供的 HBase 连接器支持面向 HBase 集群的读写操作。

    ​ 在流处理场景下,连接器作为 TableSink 向 HBase 写入数据时,采用的始终是更新插入 (Upsert)模式。也就是说,HBase 要求连接器必须通过定义的主键(primary key)来发送更新日志(changelog)。所以在创建表的 DDL 中,我们必须要定义行键(rowkey)字段,并将它声明为主键;如果没有用 PRIMARY KEY 子句声明主键,连接器会默认把 rowkey 作为主键。

    1. 引入依赖

    ​ 想要在 Flink 程序中使用 HBase 连接器,需要引入对应的依赖。目前 Flink 只对 HBase 的 1.4.x 和 2.2.x 版本提供了连接器支持,而引入的依赖也应该与具体的 HBase 版本有关。对于 1.4 版本引入依赖如下:

    <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-connector-hbase-1.4_${scala.binary.version}artifactId>
     <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    对于 HBase 2.2 版本,引入的依赖则是:

    <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-connector-hbase-2.2_${scala.binary.version}artifactId>
     <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 创建连接到 HBase 的表

    ​ 由于 HBase 并不是关系型数据库,因此转换为 Flink SQL 中的表会稍有一些麻烦。在 DDL 创建出的 HBase 表中,所有的列族(column family)都必须声明为 ROW 类型,在表中占据一 个字段;而每个 family 中的列(column qualifier)则对应着 ROW 里的嵌套字段。我们不需要将 HBase 中所有的 family 和 qualifier 都在 Flink SQL 的表中声明出来,只要把那些在查询中用到的声明出来就可以了。

    ​ 除了所有 ROW 类型的字段(对应着 HBase 中的 family),表中还应有一个原子类型的字段,它就会被识别为 HBase 的 rowkey。在表中这个字段可以任意取名,不一定非要叫 rowkey。

    ​ 下面是一个具体示例:

    -- 创建一张连接到 HBase 的 表
    CREATE TABLE MyTable (
    rowkey INT,
    family1 ROW<q1 INT>,
    family2 ROW<q2 STRING, q3 BIGINT>,
    family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
    PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
    'connector' = 'hbase-1.4',
    'table-name' = 'mytable',
    'zookeeper.quorum' = 'localhost:2181'
    );
    368
    -- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
    INSERT INTO MyTable
    SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ​ 我们将另一张 T 中的数据提取出来,并用 ROW()函数来构造出对应的 column family,最终写入 HBase 中名为 mytable 的表。

  • 相关阅读:
    (附源码)ssm跨平台教学系统 毕业设计 280843
    Ask Milvus Anything!聊聊被社区反复@的那些事儿ⅠⅠ
    Unity IL2CPP 游戏分析入门
    搭建游戏要选什么样的服务器?
    《机器学习实战》笔记
    对象的比较(下)
    浅谈一下Java当中的:封装
    抢滩“数字厨电”时代,老板电器用全新“数字人”冲阵
    游戏成元宇宙“主力军”:上半年收入占比达94%
    OpenCV之九宫格图像
  • 原文地址:https://blog.csdn.net/qq_42575907/article/details/126091244