• DataX DorisWriter 插件DorisStreamLoadObserver类详细解读


    DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

    1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
    2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
    3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
    4. private byte[] addRows(List rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
    5. private Map put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
    6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
    7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
    8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

    DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

    1. import org.apache.commons.codec.binary.Base64;
    2. import org.apache.http.HttpEntity;
    3. import org.apache.http.HttpHeaders;
    4. import org.apache.http.client.config.RequestConfig;
    5. import org.apache.http.client.methods.CloseableHttpResponse;
    6. import org.apache.http.client.methods.HttpGet;
    7. import org.apache.http.client.methods.HttpPut;
    8. import org.apache.http.entity.ByteArrayEntity;
    9. import org.apache.http.impl.client.CloseableHttpClient;
    10. import org.apache.http.impl.client.DefaultRedirectStrategy;
    11. import org.apache.http.impl.client.HttpClientBuilder;
    12. import org.apache.http.impl.client.HttpClients;
    13. import org.apache.http.util.EntityUtils;
    14. import org.json.simple.JSONValue;
    15. import org.slf4j.Logger;
    16. import org.slf4j.LoggerFactory;
    17. import java.io.IOException;
    18. import java.net.HttpURLConnection;
    19. import java.net.URL;
    20. import java.nio.ByteBuffer;
    21. import java.nio.charset.StandardCharsets;
    22. import java.util.HashMap;
    23. import java.util.List;
    24. import java.util.Map;
    25. import java.util.concurrent.TimeUnit;
    26. import java.util.stream.Collectors;
    27. public class DorisStreamLoadObserver {
    28. private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);
    29. private Keys options;
    30. private long pos;
    31. private static final String RESULT_FAILED = "Fail";
    32. private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    33. private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
    34. private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
    35. private static final String RESULT_LABEL_PREPARE = "PREPARE";
    36. private static final String RESULT_LABEL_ABORTED = "ABORTED";
    37. private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
    38. public DorisStreamLoadObserver(Keys options) {
    39. this.options = options;
    40. }
    41. // 数据写入 Doris 的主要方法
    42. public void streamLoad(WriterTuple data) throws Exception {
    43. String host = getLoadHost();
    44. if (host == null) {
    45. throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");
    46. }
    47. String loadUrl = new StringBuilder(host)
    48. .append("/api/")
    49. .append(options.getDatabase())
    50. .append("/")
    51. .append(options.getTable())
    52. .append("/_stream_load")
    53. .toString();
    54. LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
    55. Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
    56. LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));
    57. final String keyStatus = "Status";
    58. if (null == loadResult || !loadResult.containsKey(keyStatus)) {
    59. throw new IOException("Unable to flush data to Doris: unknown result status.");
    60. }
    61. LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
    62. if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
    63. throw new IOException(
    64. new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString()
    65. );
    66. } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
    67. LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
    68. checkStreamLoadState(host, data.getLabel());
    69. }
    70. }
    71. // 检查数据加载状态的方法
    72. private void checkStreamLoadState(String host, String label) throws IOException {
    73. int idx = 0;
    74. while (true) {
    75. try {
    76. TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
    77. } catch (InterruptedException ex) {
    78. break;
    79. }
    80. try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
    81. HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
    82. httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
    83. httpGet.setHeader("Connection", "close");
    84. try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
    85. HttpEntity respEntity = getHttpEntity(resp);
    86. if (respEntity == null) {
    87. throw new IOException(String.format("Failed to flush data to Doris, Error " +
    88. "could not get the final state of label[%s].\n", label), null);
    89. }
    90. Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
    91. String labelState = (String) result.get("state");
    92. if (null == labelState) {
    93. throw new IOException(String.format("Failed to flush data to Doris, Error " +
    94. "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
    95. }
    96. LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
    97. switch (labelState) {
    98. case LAEBL_STATE_VISIBLE:
    99. case LAEBL_STATE_COMMITTED:
    100. return;
    101. case RESULT_LABEL_PREPARE:
    102. continue;
    103. case RESULT_LABEL_ABORTED:
    104. throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +
    105. "label[%s] state[%s]\n", label, labelState), null, true);
    106. case RESULT_LABEL_UNKNOWN:
    107. default:
    108. throw new IOException(String.format("Failed to flush data to Doris, Error " +
    109. "label[%s] state[%s]\n", label, labelState), null);
    110. }
    111. }
    112. }
    113. }
    114. }
    115. // 根据格式将数据行拼接成字节数组
    116. private byte[] addRows(List<byte[]> rows, int totalBytes) {
    117. if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
    118. Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
    119. byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
    120. ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
    121. for (byte[] row : rows) {
    122. bos.put(row);
    123. bos.put(lineDelimiter);
    124. }
    125. return bos.array();
    126. }
    127. if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
    128. ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
    129. bos.put("[".getBytes(StandardCharsets.UTF_8));
    130. byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
    131. boolean isFirstElement = true;
    132. for (byte[] row : rows) {
    133. if (!isFirstElement) {
    134. bos.put(jsonDelimiter);
    135. }
    136. bos.put(row);
    137. isFirstElement = false;
    138. }
    139. bos.put("]".getBytes(StandardCharsets.UTF_8));
    140. return bos.array();
    141. }
    142. throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
    143. }
    144. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
    145. RequestConfig requestConfig = RequestConfig.custom()
    146. .setSocketTimeout(120 * 1000)
    147. .setConnectTimeout(120 * 1000)
    148. .setConnectionRequestTimeout(120 * 1000)
    149. .build();
    150. try (CloseableHttpClient httpclient = HttpClientBuilder.create()
    151. .setDefaultRequestConfig(requestConfig)
    152. .setRedirectStrategy(new DefaultRedirectStrategy())
    153. .build()) {
    154. HttpPut httpPut = new HttpPut(loadUrl);
    155. httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
    156. httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
    157. httpPut.setEntity(new ByteArrayEntity(data));
    158. try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
    159. HttpEntity respEntity = getHttpEntity(resp);
    160. if (respEntity == null) {
    161. throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");
    162. }
    163. return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
    164. }
    165. }
    166. }
    167. // 构造 HTTP 请求中的基本认证头部
    168. private String getBasicAuthHeader(String username, String password) {
    169. String credentials = username + ":" + password;
    170. byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);
    171. String base64Credentials = Base64.encodeBase64String(credentialsBytes);
    172. return "Basic " + base64Credentials;
    173. }
    174. // 从 HTTP 响应中获取实体内容
    175. private HttpEntity getHttpEntity(CloseableHttpResponse response) {
    176. if (response != null) {
    177. return response.getEntity();
    178. }
    179. return null;
    180. }
    181. // 获取用于加载数据的主机地址
    182. private String getLoadHost() {
    183. List<String> hosts = options.getDorisStreamLoadUrls();
    184. for (String host : hosts) {
    185. try {
    186. HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();
    187. connection.setRequestMethod("HEAD");
    188. int responseCode = connection.getResponseCode();
    189. if (responseCode == HttpURLConnection.HTTP_OK) {
    190. return host;
    191. }
    192. } catch (IOException e) {
    193. LOG.warn("Failed to connect to host: {}", host);
    194. }
    195. }
    196. return null;
    197. }
    198. }
  • 相关阅读:
    基于nodejs+vue百鸟全科赏析网站
    911. 在线选举
    NFT 智能合约实战-快速开始(1)NFT发展历史 | NFT合约标准(ERC-721、ERC-1155和ERC-998)介绍
    C++面试八股文:C++中,函数的参数应该传值还是传引用?
    腾讯云Redis全面升级,性能提升400%,可用性高达5个9
    Android逆向学习(番外一)smali2java部分文件无法反编译的bug与修复方法
    Mysql事务
    软考高级软件架构师学习笔记一
    HDFS、YARN、MapReduce概述及三者之间的关系
    关于static修饰的成员方法成员成员变量的相关讨论
  • 原文地址:https://blog.csdn.net/linweidong/article/details/132634622