Doris : 0.15.1-rc09
Datax是一个优秀的增量和全量数据同步工具。最近项目组在使用Datax和DorisWrite插件来备份Mysql数据到Doris中,随着备份任务的增加,我们发现,通过Prometheus监控的BE节点中,某固定BE节点Streamload任务增长速率过高(相关指标:doris_be_streaming_load_requests_total
)频繁报警。
我们目前总共有10个BE节点。正常情况下,streamLoad任务应该是挑选BE负载低的节点来进行导入任务,而且备份任务是在任务的低峰期进行的,因此集群的负载不会太高,streamLoad任务也不应该频繁处在某一个固定BE节点,那么频繁告警某个BE节点streamload任务增长速率过高肯定是有异常。
具体Prometheus告警信息如下:
[FIRING:1] Doris be streamload request total
Alerts Firing
[WARN] doris be streamload requests to much
Description: doris be 172.22.1.1:8040 streamload request increase rate to quickly, current increase rate is 5.933333333333334
Graph: 📈
Details:
alertname: Doris be streamload request total
instance: 172.22.1.1:8040
user: doris
1.排查datax 的配置文件:
通过配置文件我们可以发现,本次DorisWrite采用的是直连BE的方式,并没有采用连接FE,通过FE转发的方式来进行streamLoad导入任务下发的方式。那么就不可能是FE下发导入任务时导致的Bug,只可能是DorisWrite有异常。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"a",
"b",
"c",
"d"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:33306/test"
],
"table": [
"w_tunnel"
]
}
],
"splitPk": "id",
"username": "test",
"password": "test",
"where": "d = '1'"
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"beLoadUrl": [
"172.22.1.1:8040",
"172.22.1.2:8040",
"172.22.1.3:8040",
"172.22.1.4:8040",
"172.22.1.5:8040",
"172.22.1.6:8040"
],
"jdbcUrl": "jdbc:doris://172.22.1.1:9030/test",
"loadProps": {
"strict_mode": true
},
"maxBatchRows": "30000",
"maxBatchByteSize": "31457280",
"database": "test_db",
"table": "test_tbl",
"column": [
"id",
"a",
"b",
"c",
"d"
],
"username": "test",
"password": "test",
"customFlag": 1,
"webhook": "https://oapi.dingtalk.com/robot/send?access_token=1fb619bdbba5858a77e0a5035b62b92ac23423432c246648b9d5f47592f2be549",
"ipPort": "https://127.0.0.1/*",
"planId": 118,
"jobId": 21437,
"postSql": [],
"preSql": []
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
2.排查DorisWrite源码:
com/alibaba/datax/plugin/writer/doriswriter/Key.java
该类主要是解析datax配置的Json文件中Doris配置的部分属性,主要是通过getBeLoadUrlList()
方法来获取BE的列表接口。
public List<String> getBeLoadUrlList() {
return this.options.getList(BE_LOAD_URL, String.class);
}
com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
这个主类就是DorisWrite的核心内容,我们从下面的初始化方法可以看到,DorisWriter初始化的时候,new 了一个DorisWriterEmitter对象,而这个对象中,主要是streamLoad导入核心类。
@Override
public void init() {
this.keys = new Key(super.getPluginJobConf());
if("csv".equalsIgnoreCase(this.keys.getFormat())){
this.rowCodec = new DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
}else{
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
}
this.dorisWriterEmitter = new DorisWriterEmitter(keys); #重点类
}
com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
这个类主要就是获取连接地址,做一个具体的导入任务。通过getAvailableHost()来获取具体的BE连接HOST,通过代码我们可以看到,主要是通过hostPos
的索引从targetHosts
中来获取具体的HOST的。
public class DorisWriterEmitter {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
private final Key keys;
private int hostPos = 0;
//此处省略部分代码
/**
* loop to get target host
*
* @return
*/
private String getAvailableHost() {
if (this.hostPos >= targetHosts.size()) {
this.hostPos = 0;
}
while (this.hostPos < targetHosts.size()) {
final String host = targetHosts.get(hostPos);
++this.hostPos;
if (this.tryHttpConnection(host)) {
return host;
}
}
return null;
}
}
从这块代码就能很明显看出问题在哪了。在DorisWriter
初始化时,每次都会new 一个新的DorisWriterEmitter
对象,所以hostPos
属性值每次都是0,如果targetHosts
list的顺序不发生改变,且在
targetHosts
索引为0时,能成功连接,那么无论如何都是拿到的targetHosts.get(0)处的host。datax的Json配置文件中beLoadUrl
的顺序及告警信息刚好是一致的,说明问题就出在这了。
修改DorisWriterEmitter
.getAvailableHost()
方法,对beLoadUrl
列表做一个shuffer,让它从targetHosts中随机选择一个Host即可。
private String getAvailableHost() {
Collections.shuffer(targetHosts);
if (this.hostPos >= targetHosts.size()) {
this.hostPos = 0;
}
while (this.hostPos < targetHosts.size()) {
final String host = targetHosts.get(hostPos);
++this.hostPos;
if (this.tryHttpConnection(host)) {
return host;
}
}
return null;
}
1.如果采用连接FE,通过FE转发的方式进行StreamLoad导入,可以在很大程度上避免这种情况出现,但通过转发之后,我们自己就没办法控制StreamLoad任务是落在哪个BE节点上。
2.采用BE直连的方式,可以在某种程度上提高导入效率,但如果出现像上面问题,可能会导致某节点负载过高,甚至OOM。
新版的Doris(1.1+)版本的DorisWrite代码进行了重构,现在是通过连接FE来实现的,可以避免这种情况。同时Datax最新Master分支代码,也已经合并了DorisWrite代码,后期,可以直接下载Datax来实现Doris数据导入,不用再自己编译打包DorisWrite插件了。
新版DorisWrite部分代码展示:
com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
public class DorisWriterManager {
//此处省略部分代码
public DorisWriterManager( Keys options) {
this.options = options;
this.visitor = new DorisStreamLoadObserver (options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
}
}
com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
public class DorisStreamLoadObserver {
private Keys options;
private long pos;
//此处省略部分代码
private String getLoadHost() {
List<String> hostList = options.getLoadUrlList();
long tmp = pos + hostList.size();
for (; pos < tmp; pos++) {
String host = new StringBuilder("http://").append(hostList.get((int)pos % hostList.size())).toString();
if (checkConnection(host)) {
return host;
}
}
return null;
}
}
groups:
- name: Doris Be Streamload Request Total
rules:
- alert: Doris be streamload request total
expr: sum by(instance)(rate(doris_be_streaming_load_requests_total{group="be", job="pro-doris"}[1m])) > 5
for: 1m
labels:
user: doris
severity: warn
annotations:
summary: "doris be streamload requests to much"
description: "doris be {{ $labels.instance }} streamload request increase rate to quickly, current increase rate is {{$value}}"