DorisStreamLoadObserver
类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:
DorisStreamLoadObserver(Keys options)
: 这是类的构造函数,用于初始化加载数据所需的配置选项。void streamLoad(WriterTuple data) throws Exception
: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple
对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。private void checkStreamLoadState(String host, String label) throws IOException
: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。private byte[] addRows(List rows, int totalBytes)
: 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。private Map put(String loadUrl, String label, byte[] data) throws IOException
: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。private String getBasicAuthHeader(String username, String password)
: 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。private HttpEntity getHttpEntity(CloseableHttpResponse response)
: 这是一个实用方法,用于从 HTTP 响应中提取实体内容。private String getLoadHost()
: 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。DorisStreamLoadObserver
类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。
- import org.apache.commons.codec.binary.Base64;
- import org.apache.http.HttpEntity;
- import org.apache.http.HttpHeaders;
- import org.apache.http.client.config.RequestConfig;
- import org.apache.http.client.methods.CloseableHttpResponse;
- import org.apache.http.client.methods.HttpGet;
- import org.apache.http.client.methods.HttpPut;
- import org.apache.http.entity.ByteArrayEntity;
- import org.apache.http.impl.client.CloseableHttpClient;
- import org.apache.http.impl.client.DefaultRedirectStrategy;
- import org.apache.http.impl.client.HttpClientBuilder;
- import org.apache.http.impl.client.HttpClients;
- import org.apache.http.util.EntityUtils;
- import org.json.simple.JSONValue;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.nio.ByteBuffer;
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.Collectors;
-
- public class DorisStreamLoadObserver {
- private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);
-
- private Keys options;
-
- private long pos;
- private static final String RESULT_FAILED = "Fail";
- private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
- private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
- private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
- private static final String RESULT_LABEL_PREPARE = "PREPARE";
- private static final String RESULT_LABEL_ABORTED = "ABORTED";
- private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
-
- public DorisStreamLoadObserver(Keys options) {
- this.options = options;
- }
-
- // 数据写入 Doris 的主要方法
- public void streamLoad(WriterTuple data) throws Exception {
- String host = getLoadHost();
- if (host == null) {
- throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");
- }
- String loadUrl = new StringBuilder(host)
- .append("/api/")
- .append(options.getDatabase())
- .append("/")
- .append(options.getTable())
- .append("/_stream_load")
- .toString();
- LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
- Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
- LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));
- final String keyStatus = "Status";
- if (null == loadResult || !loadResult.containsKey(keyStatus)) {
- throw new IOException("Unable to flush data to Doris: unknown result status.");
- }
- LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
- if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
- throw new IOException(
- new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString()
- );
- } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
- LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
- checkStreamLoadState(host, data.getLabel());
- }
- }
-
- // 检查数据加载状态的方法
- private void checkStreamLoadState(String host, String label) throws IOException {
- int idx = 0;
- while (true) {
- try {
- TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
- } catch (InterruptedException ex) {
- break;
- }
- try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
- HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
- httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
- httpGet.setHeader("Connection", "close");
-
- try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
- HttpEntity respEntity = getHttpEntity(resp);
- if (respEntity == null) {
- throw new IOException(String.format("Failed to flush data to Doris, Error " +
- "could not get the final state of label[%s].\n", label), null);
- }
- Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
- String labelState = (String) result.get("state");
- if (null == labelState) {
- throw new IOException(String.format("Failed to flush data to Doris, Error " +
- "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
- }
- LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
- switch (labelState) {
- case LAEBL_STATE_VISIBLE:
- case LAEBL_STATE_COMMITTED:
- return;
- case RESULT_LABEL_PREPARE:
- continue;
- case RESULT_LABEL_ABORTED:
- throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +
- "label[%s] state[%s]\n", label, labelState), null, true);
- case RESULT_LABEL_UNKNOWN:
- default:
- throw new IOException(String.format("Failed to flush data to Doris, Error " +
- "label[%s] state[%s]\n", label, labelState), null);
- }
- }
- }
- }
- }
-
- // 根据格式将数据行拼接成字节数组
- private byte[] addRows(List<byte[]> rows, int totalBytes) {
- if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
- Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
- byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
- for (byte[] row : rows) {
- bos.put(row);
- bos.put(lineDelimiter);
- }
- return bos.array();
- }
-
- if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
- bos.put("[".getBytes(StandardCharsets.UTF_8));
- byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
- boolean isFirstElement = true;
- for (byte[] row : rows) {
- if (!isFirstElement) {
- bos.put(jsonDelimiter);
- }
- bos.put(row);
- isFirstElement = false;
- }
- bos.put("]".getBytes(StandardCharsets.UTF_8));
- return bos.array();
- }
- throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
- }
-
- private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
- RequestConfig requestConfig = RequestConfig.custom()
- .setSocketTimeout(120 * 1000)
- .setConnectTimeout(120 * 1000)
- .setConnectionRequestTimeout(120 * 1000)
- .build();
- try (CloseableHttpClient httpclient = HttpClientBuilder.create()
- .setDefaultRequestConfig(requestConfig)
- .setRedirectStrategy(new DefaultRedirectStrategy())
- .build()) {
- HttpPut httpPut = new HttpPut(loadUrl);
- httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
- httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
- httpPut.setEntity(new ByteArrayEntity(data));
- try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
- HttpEntity respEntity = getHttpEntity(resp);
- if (respEntity == null) {
- throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");
- }
- return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
- }
- }
- }
-
- // 构造 HTTP 请求中的基本认证头部
- private String getBasicAuthHeader(String username, String password) {
- String credentials = username + ":" + password;
- byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);
- String base64Credentials = Base64.encodeBase64String(credentialsBytes);
- return "Basic " + base64Credentials;
- }
-
- // 从 HTTP 响应中获取实体内容
- private HttpEntity getHttpEntity(CloseableHttpResponse response) {
- if (response != null) {
- return response.getEntity();
- }
- return null;
- }
-
- // 获取用于加载数据的主机地址
- private String getLoadHost() {
- List<String> hosts = options.getDorisStreamLoadUrls();
- for (String host : hosts) {
- try {
- HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();
- connection.setRequestMethod("HEAD");
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- return host;
- }
- } catch (IOException e) {
- LOG.warn("Failed to connect to host: {}", host);
- }
- }
- return null;
- }
- }