• Doris(0.15.1-rc09)使用Datax-DorisWrite导致BE固定节点streamLoad任务过多问题排查


    版本:
    Doris : 0.15.1-rc09
    
    • 1
    背景:

    Datax是一个优秀的增量和全量数据同步工具。最近项目组在使用Datax和DorisWrite插件来备份Mysql数据到Doris中,随着备份任务的增加,我们发现,通过Prometheus监控的BE节点中,某固定BE节点Streamload任务增长速率过高(相关指标:doris_be_streaming_load_requests_total)频繁报警。

    我们目前总共有10个BE节点。正常情况下,streamLoad任务应该是挑选BE负载低的节点来进行导入任务,而且备份任务是在任务的低峰期进行的,因此集群的负载不会太高,streamLoad任务也不应该频繁处在某一个固定BE节点,那么频繁告警某个BE节点streamload任务增长速率过高肯定是有异常。

    具体Prometheus告警信息如下:

    [FIRING:1] Doris be streamload request total
    Alerts Firing
    [WARN] doris be streamload requests to much
    Description: doris be 172.22.1.1:8040 streamload request increase rate to quickly, current increase rate is 5.933333333333334
    Graph: 📈
    Details:
    
    
    alertname: Doris be streamload request total
    instance: 172.22.1.1:8040
    user: doris
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    问题排查:

    1.排查datax 的配置文件:

    通过配置文件我们可以发现,本次DorisWrite采用的是直连BE的方式,并没有采用连接FE,通过FE转发的方式来进行streamLoad导入任务下发的方式。那么就不可能是FE下发导入任务时导致的Bug,只可能是DorisWrite有异常。

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [
                                "id",
                                "a",
                                "b",
                                "c",
                                "d"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://127.0.0.1:33306/test"
                                    ],
                                    "table": [
                                        "w_tunnel"
                                    ]
                                }
                            ],
                            "splitPk": "id",
                            "username": "test",
                            "password": "test",
                            "where": "d = '1'"
                        }
                    },
                    "writer": {
                        "name": "doriswriter",
                        "parameter": {
                            "beLoadUrl": [
                                "172.22.1.1:8040",
                                "172.22.1.2:8040",
                                "172.22.1.3:8040",
                                "172.22.1.4:8040",
                                "172.22.1.5:8040",
                                "172.22.1.6:8040"
                            ],
                            "jdbcUrl": "jdbc:doris://172.22.1.1:9030/test",
                            "loadProps": {
                                "strict_mode": true
                            },
                            "maxBatchRows": "30000",
                            "maxBatchByteSize": "31457280",
                            "database": "test_db",
                            "table": "test_tbl",
                            "column": [
                                "id",
                                "a",
                                "b",
                                "c",
                                "d"
                            ],
                            "username": "test",
                            "password": "test",
                            "customFlag": 1,
                            "webhook": "https://oapi.dingtalk.com/robot/send?access_token=1fb619bdbba5858a77e0a5035b62b92ac23423432c246648b9d5f47592f2be549",
                            "ipPort": "https://127.0.0.1/*",
                            "planId": 118,
                            "jobId": 21437,
                            "postSql": [],
                            "preSql": []
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    2.排查DorisWrite源码:

    com/alibaba/datax/plugin/writer/doriswriter/Key.java 该类主要是解析datax配置的Json文件中Doris配置的部分属性,主要是通过getBeLoadUrlList()方法来获取BE的列表接口。

        public List<String> getBeLoadUrlList() {
            return this.options.getList(BE_LOAD_URL, String.class);
        }
    
    • 1
    • 2
    • 3

    com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java 这个主类就是DorisWrite的核心内容,我们从下面的初始化方法可以看到,DorisWriter初始化的时候,new 了一个DorisWriterEmitter对象,而这个对象中,主要是streamLoad导入核心类。

            @Override
            public void init() {
                this.keys = new Key(super.getPluginJobConf());
                if("csv".equalsIgnoreCase(this.keys.getFormat())){
                    this.rowCodec = new DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
                }else{
                    this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
                }
                this.dorisWriterEmitter = new DorisWriterEmitter(keys); #重点类
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java 这个类主要就是获取连接地址,做一个具体的导入任务。通过getAvailableHost()来获取具体的BE连接HOST,通过代码我们可以看到,主要是通过hostPos的索引从targetHosts中来获取具体的HOST的。

    public class DorisWriterEmitter {
        private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
        private final Key keys;
        
        
        private int hostPos = 0;
        
        
        //此处省略部分代码
        
        
        /**
         * loop to get target host
         *
         * @return
         */
        private String getAvailableHost() {
            if (this.hostPos >= targetHosts.size()) {
                this.hostPos = 0;
            }
    
            while (this.hostPos < targetHosts.size()) {
                final String host = targetHosts.get(hostPos);
                ++this.hostPos;
                if (this.tryHttpConnection(host)) {
                    return host;
                }
            }
    
            return null;
        }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    从这块代码就能很明显看出问题在哪了。在DorisWriter初始化时,每次都会new 一个新的DorisWriterEmitter对象,所以hostPos属性值每次都是0,如果targetHostslist的顺序不发生改变,且在

    targetHosts索引为0时,能成功连接,那么无论如何都是拿到的targetHosts.get(0)处的host。datax的Json配置文件中beLoadUrl的顺序及告警信息刚好是一致的,说明问题就出在这了。

    问题解决:

    修改DorisWriterEmitter.getAvailableHost()方法,对beLoadUrl列表做一个shuffer,让它从targetHosts中随机选择一个Host即可。

        private String getAvailableHost() {
            Collections.shuffer(targetHosts);
            if (this.hostPos >= targetHosts.size()) {
                this.hostPos = 0;
            }
    
            while (this.hostPos < targetHosts.size()) {
                final String host = targetHosts.get(hostPos);
                ++this.hostPos;
                if (this.tryHttpConnection(host)) {
                    return host;
                }
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    总结:

    1.如果采用连接FE,通过FE转发的方式进行StreamLoad导入,可以在很大程度上避免这种情况出现,但通过转发之后,我们自己就没办法控制StreamLoad任务是落在哪个BE节点上。

    2.采用BE直连的方式,可以在某种程度上提高导入效率,但如果出现像上面问题,可能会导致某节点负载过高,甚至OOM

    其他说明:

    新版的Doris(1.1+)版本的DorisWrite代码进行了重构,现在是通过连接FE来实现的,可以避免这种情况。同时Datax最新Master分支代码,也已经合并了DorisWrite代码,后期,可以直接下载Datax来实现Doris数据导入,不用再自己编译打包DorisWrite插件了。

    新版DorisWrite部分代码展示:

    com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java

    public class DorisWriterManager {
    
          //此处省略部分代码
    
        public DorisWriterManager( Keys options) {
            this.options = options;
            this.visitor = new DorisStreamLoadObserver (options);
            flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
            this.startScheduler();
            this.startAsyncFlushing();
        }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java

    public class DorisStreamLoadObserver {
    
        private Keys options;
    
        private long pos;
    
          //此处省略部分代码
    
        private String getLoadHost() {
            List<String> hostList = options.getLoadUrlList();
            long tmp = pos + hostList.size();
            for (; pos < tmp; pos++) {
                String host = new StringBuilder("http://").append(hostList.get((int)pos % hostList.size())).toString();
                if (checkConnection(host)) {
                    return host;
                }
            }
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    附Prometheus监控规则:
    groups:
      - name: Doris Be Streamload Request Total
        rules:
          - alert: Doris be streamload request total
            expr: sum by(instance)(rate(doris_be_streaming_load_requests_total{group="be", job="pro-doris"}[1m])) > 5
            for: 1m
            labels:
              user: doris
              severity: warn
            annotations:
              summary: "doris be streamload requests to much"
              description: "doris be {{ $labels.instance }} streamload request increase rate to quickly, current increase rate is {{$value}}"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    java学习part06数组工具类
    SpringBoot+Mybatis-Plus+Thymeleaf 实现增删改查+登录/注册
    重启tomcat-Tomcat服务器怎么重启?
    word文档怎么转换成pdf?几个实用文档转换方法
    力扣452-用最少数量的箭引爆气球——贪心算法
    NAO机器人程序设计——接力赛准备
    Java中的对象构造与初始化顺序
    2023最新最全【Nacos】零基础安装教程
    5款高效率,但是名气不大的小众软件
    《论文阅读》开放域对话系统——外部信息融入对话的方法
  • 原文地址:https://blog.csdn.net/weixin_43914798/article/details/127905403