• 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据


    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。

    社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何使用 Flink Doris Connector 如何将 bitmap 数据写入 Doris 中。

    前置准备
    Doris2.0.1的环境

    Flink1.16,同时将 Doris Flink Connector的Jar包放在/lib 下面。

    创建Doris表

    1. CREATE TABLE `page_view_bitmap` (
    2. `dt` int,
    3. `page` varchar(256),
    4. `user_id` bitmap bitmap_union
    5. )
    6. AGGREGATE KEY(`dt`, page)
    7. DISTRIBUTED BY HASH(`dt`) BUCKETS 1
    8. PROPERTIES (
    9. "replication_num" = "1"
    10. )

    写入Bitmap数据
    这里模拟Flink读取MySQL数据写入Doris,同时将user_id存储到bitmap中。

    模拟数据

    创建MySQL表

    1. CREATE TABLE `page_view` (
    2. `id` int NOT NULL,
    3. `dt` int,
    4. `page` varchar(256),
    5. `user_id` int,
    6. PRIMARY KEY (`id`)
    7. );
    8. #模拟数据
    9. INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
    10. INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
    11. INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
    12. INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
    13. INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
    14. FlinkSQL写入Bitmap
    15. #使用JDBC读取mysql数据
    16. CREATE TABLE page_view (
    17. `dt` int,
    18. `page` string,
    19. `user_id` int
    20. ) WITH (
    21. 'connector' = 'jdbc',
    22. 'url' = 'jdbc:mysql://127.0.0.1:3306/test',
    23. 'table-name' = 'page_view',
    24. 'username' = 'root',
    25. 'password' = '123456'
    26. );

    doris connector写入数据

    1. CREATE TABLE page_view_bitmap (
    2. dt int,
    3. page string,
    4. user_id int
    5. )
    6. WITH (
    7. 'connector' = 'doris',
    8. 'fenodes' = '127.0.0.1:8030',
    9. 'table.identifier' = 'test.page_view_bitmap',
    10. 'username' = 'root',
    11. 'password' = '',
    12. 'sink.label-prefix' = 'doris_label1',
    13. 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    14. );

    insert into page_view_bitmap select * from page_view
    我们知道 Doris Flink Connector Sink 底层是基于 Doris Stream Load 来实现的,同样 Stream load 在 Connector 里也是一样适用,我们将这个参数封装在了 :sink.properties 参数里,
    这里我们可以看到上面这个例子里我们在是 With 属性里加入了我们 Columns 参数,这里我们配置了列的转换操作,将 user_id 通过 to_bitmap 函数进行转换,并导入到 Doris 表里。
    查询结果

    1. mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
    2. +----------+--------+---------------------------+
    3. | dt | page | bitmap_to_string(user_id) |
    4. +----------+--------+---------------------------+
    5. | 20230921 | home | 1001,1002 |
    6. | 20230921 | search | 1003 |
    7. | 20230922 | home | 1002 |
    8. | 20230922 | mine | 1001 |
    9. +----------+--------+---------------------------+
    10. 4 rows in set (0.00 sec)

    Flink DataStream
    使用 DataStream API 模拟数据写入刚才的表中。

    DataStream API 对 Bitmap 的操作也是和上面 SQL 操作的方式一样。

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. env.setParallelism(1);
    4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    5. DorisSink.Builder<String> builder = DorisSink.builder();
    6. final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
    7. Properties properties = new Properties();
    8. properties.setProperty("column_separator", ",");
    9. properties.setProperty("format", "csv");
    10. properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");
    11. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    12. dorisBuilder.setFenodes("127.0.0.1:8030")
    13. .setTableIdentifier("test.page_view_bitmap")
    14. .setUsername("root")
    15. .setPassword("");
    16. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    17. executionBuilder.setLabelPrefix("doris_label")
    18. .setStreamLoadProp(properties)
    19. .setDeletable(false);
    20. builder.setDorisReadOptions(readOptionBuilder.build())
    21. .setDorisExecutionOptions(executionBuilder.build())
    22. .setSerializer(new SimpleStringSerializer())
    23. .setDorisOptions(dorisBuilder.build());
    24. //mock data
    25. DataStreamSource<String> stringDataStreamSource = env.fromCollection(
    26. Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));
    27. stringDataStreamSource.sinkTo(builder.build());
    28. env.execute("doris bitmap write");
    29. }

    查询结果

    1. mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
    2. +----------+--------+---------------------------+
    3. | dt | page | bitmap_to_string(user_id) |
    4. +----------+--------+---------------------------+
    5. | 20230921 | home | 1001,1002,1003 |
    6. | 20230921 | search | 1001,1003 |
    7. | 20230922 | home | 1002 |
    8. | 20230922 | mine | 1001 |
    9. | 20230923 | home | 1001 |
    10. +----------+--------+---------------------------+
    11. 5 rows in set (0.00 sec)
  • 相关阅读:
    记一次Spark 提交任务执行缓慢之问题解决了
    由浅入深学习nginx
    Python基础(八):循环深入讲解
    Docker数据卷
    3Blue1Brown 安装教程
    一种朴素的消失点计算方法
    MYSQL语句
    自动化运维工具Ansible教程(一)【入门篇】
    [前端框架]-VUE(下篇)
    【那些年那些题】轮转数组
  • 原文地址:https://blog.csdn.net/hf200012/article/details/133346413