• Apache Doris 系列: 基础篇-Flink DataStream 读写Doris


    简介

    本文介绍 Flink 如何流式写入 Apache Doris,分为一下几个部分:

    • Flink Doris connector
    • Doris FE 节点配置
    • Flink DataStream 写 Doris

    Flink Doris connector

    Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。
    支持二阶段提交,可实现Exatly Once的写入。

    Doris FE 节点配置

    1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置:

    enable_http_server_v2 = true
    
    • 1
    1. 重启 FE 节点
    apache-doris/fe/bin/stop_fe.sh
    apache-doris/fe/bin/start_fe.sh --daemon
    
    
    • 1
    • 2
    • 3

    Flink DataStream 读写 Doris

    1. Flink DataStream 读取 Doris
            //Doris Source
            DorisOptions.Builder sourceBuilder =
                    DorisOptions.builder()
                            .setFenodes("192.168.56.104:8030")  //FE节点IP和端口
                            .setTableIdentifier("test.order_info_example")
                            .setUsername("test")
                            .setPassword("password123");
    
    
            DorisSource> dorisSource = DorisSourceBuilder.>builder()
                    .setDorisOptions(sourceBuilder.build())
                    .setDorisReadOptions(DorisReadOptions.builder().build())
                    .setDeserializer(new SimpleListDeserializationSchema())
                    .build();
    
            DataStreamSource> source = env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. Flink DataStream 写入 Doris
            //Doris Sink
            DorisSink.Builder sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("192.168.56.104:8030")
                    .setTableIdentifier("test.order_info_output")
                    .setUsername("test")
                    .setPassword("password123");
    
            Properties properties = new Properties();
            properties.setProperty("column_separator", ",");
            DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setLabelPrefix("label-doris-20") //streamload label prefix
                    .setStreamLoadProp(properties);
            sinkBuilder.setDorisReadOptions(DorisReadOptions.builder().build())
                    .setDorisExecutionOptions(executionBuilder.build())
                    .setDorisOptions(dorisBuilder.build())
                    .setSerializer(new SimpleStringSerializer()) //serialize according to string
                     ;
    
    
            DataStream transform = source.flatMap(new FlatMapFunction, String>() {
                @Override
                public void flatMap(List element, Collector collector) throws Exception {
    
                    //collector.collect();
                    StringBuffer stringBuffer = new StringBuffer();
                    stringBuffer.append(element.get(0))
                            .append(",")
                            .append(element.get(1))
                            .append(",")
                            .append(element.get(2))
                            .append(",")
                            .append(element.get(3))
                            .append(",")
                            .append(element.get(4))
                            .append(",")
                            .append(element.get(5))
                            ;
                        collector.collect(stringBuffer.toString());
    
                }
            });
    
    
            transform.print();
            transform.sinkTo(sinkBuilder.build());
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    Github完整代码

  • 相关阅读:
    SCI投稿:投稿附言cover letter的写法和模板!
    【蓝桥杯物联网赛项学习日志】Day4 关于USART/UART
    SPDK NVMe-oF target多路功能介绍
    全面揭秘!微信传输助手的用处有哪些!
    DC综合学习
    SCI一区级 | Matlab实现GJO-CNN-LSTM-Multihead-Attention多变量时间序列预测
    计算机视觉——飞桨深度学习实战-起始篇
    SmobilerService 推送实现
    L11.linux命令每日一练 -- 第二章 文件和目录操作命令 -- rename和basename命令
    并发锁机制
  • 原文地址:https://blog.csdn.net/weixin_47298890/article/details/126921569