Elasticsearch 作为分布式搜索分析引擎,在大数据应用中有非常多的场景。Flink 提供的 Elasticsearch的SQL连接器只能作为TableSink,可以将表数据写入Elasticsearch的索引(index)。 Elasticsearch 连接器的使用与 JDBC 连接器非常相似,写入数据的模式同样是由创建表的 DDL 中是否有主键定义决定的。
想要在 Flink 程序中使用 Elasticsearch 连接器,需要引入对应的依赖。具体的依赖与 Elasticsearch 服务器的版本有关,对于 6.x 版本引入依赖如下:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
对于 Elasticsearch 7 以上的版本,引入的依赖则是:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
创建 Elasticsearch 表的方法与 JDBC 表基本一致。下面是一个具体示例:
-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
这里定义了主键,所以会以更新插入(Upsert)模式向 Elasticsearch 写入数据。
作为高性能、可伸缩的分布式列存储数据库,HBase 在大数据分析中是一个非常重要的工 具。Flink 提供的 HBase 连接器支持面向 HBase 集群的读写操作。
在流处理场景下,连接器作为 TableSink 向 HBase 写入数据时,采用的始终是更新插入 (Upsert)模式。也就是说,HBase 要求连接器必须通过定义的主键(primary key)来发送更新日志(changelog)。所以在创建表的 DDL 中,我们必须要定义行键(rowkey)字段,并将它声明为主键;如果没有用 PRIMARY KEY 子句声明主键,连接器会默认把 rowkey 作为主键。
想要在 Flink 程序中使用 HBase 连接器,需要引入对应的依赖。目前 Flink 只对 HBase 的 1.4.x 和 2.2.x 版本提供了连接器支持,而引入的依赖也应该与具体的 HBase 版本有关。对于 1.4 版本引入依赖如下:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hbase-1.4_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
对于 HBase 2.2 版本,引入的依赖则是:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
由于 HBase 并不是关系型数据库,因此转换为 Flink SQL 中的表会稍有一些麻烦。在 DDL 创建出的 HBase 表中,所有的列族(column family)都必须声明为 ROW 类型,在表中占据一 个字段;而每个 family 中的列(column qualifier)则对应着 ROW 里的嵌套字段。我们不需要将 HBase 中所有的 family 和 qualifier 都在 Flink SQL 的表中声明出来,只要把那些在查询中用到的声明出来就可以了。
除了所有 ROW 类型的字段(对应着 HBase 中的 family),表中还应有一个原子类型的字段,它就会被识别为 HBase 的 rowkey。在表中这个字段可以任意取名,不一定非要叫 rowkey。
下面是一个具体示例:
-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
368
-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
我们将另一张 T 中的数据提取出来,并用 ROW()函数来构造出对应的 column family,最终写入 HBase 中名为 mytable 的表。