• azkaban二次开发


    springboot封装azkaban的api,提供可调用azkaban任务流的接口

    流程如下:
    springboot接口->azkaban api->azkaban project(flow tasks)->shell脚本->spark tasks

    Api测试

    curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://192.168.33.162:8443
    {
      "session.id" : "2b0a77ee-ee93-4d31-a188-8bcc89bacdb2",
      "status" : "success"
    }
    
    
    
    
    curl -k -X POST --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=executeFlow&project=iot&flow=iot&flowOverride[buiz]=iot_ads_use_x_hour&flowOverride[projects]=120100lae&flowOverride[meterKind]=1&flowOverride[meterCode]='xxx'&flowOverride[dt]=2021-11-16&flowOverride[archive_suffix]=''" https://192.168.33.162:8443/executor
    {
      "project" : "iot",
      "message" : "Execution queued successfully with exec id 7975",
      "flow" : "iot",
      "execid" : 7975
    }
    
    
    curl -k --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=fetchexecflow&execid=5559" https://192.168.33.162:8443/executor
    {
      "project" : "iot",
      "updateTime" : 1637132885051,
      "type" : null,
      "attempt" : 0,
      "execid" : 5559,
      "submitTime" : 1637132859558,
      "nodes" : [ {
        "nestedId" : "iot_main",
        "startTime" : 1637132859661,
        "updateTime" : 1637132884986,
        "id" : "iot_main",
        "endTime" : 1637132884914,
        "type" : "command",
        "attempt" : 0,
        "status" : "FAILED"
      } ],
      "nestedId" : "iot",
      "submitUser" : "azkaban",
      "startTime" : 1637132859655,
      "id" : "iot",
      "endTime" : 1637132885046,
      "projectId" : 17,
      "flowId" : "iot",
      "flow" : "iot",
      "status" : "FAILED"
    }
    
    curl -k --data "session.id=2b0a77ee-ee93-4d31-a188-8bcc89bacdb2&ajax=fetchExecJobLogs&execid=5559&jobId=iot_main&offset=0&length=10000" https://192.168.33.162:8443/executor
    {
      "length" : 0,
      "offset" : 0,
      "data" : ""
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    FlowJobProcess

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import okhttp3.*;
    import rx.Observable;
    import rx.Subscriber;
    import rx.functions.Action1;
    import rx.functions.Func1;
    
    import javax.net.ssl.*;
    import java.io.IOException;
    import java.security.cert.CertificateException;
    
    /**
     * curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://192.168.0.162:8443
     * curl -k -X POST --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=executeFlow&project=iot&flow=iot&flowOverride[buiz]=iot_ads_use_x_hour&flowOverride[projects]=120100lae&flowOverride[meterKind]=1&flowOverride[meterCode]='xxx'&flowOverride[dt]=2021-11-16&flowOverride[archive_suffix]=''" https://192.168.0.162:8443/executor
     * curl -k --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=fetchexecflow&execid=5559" https://192.168.0.162:8443/executor
     * curl -k --data "session.id=78d1374e-3b0e-445b-9a71-302cffa05f98&ajax=fetchExecJobLogs&execid=5559&jobId=iot_main&offset=0&length=10000" https://192.168.0.162:8443/executor
     */
    public class FlowJobProcess {
    
        public static void main(String[] args) {
    
    
            Observable.create(new Observable.OnSubscribe<Response>() {
                @Override
                public void call(Subscriber<? super Response> subscriber) {
    
                    OkHttpClient okHttpClient = getUnsafeOkHttpClient();
    
                    String url = "https://192.168.0.162:8443";
    
                    RequestBody requestBody = new FormBody.Builder()
                            .add("action", "login")
                            .add("username", "azkaban")
                            .add("password", "azkaban")
                            .build();
    
                    Request request = new Request.Builder()
                            .url(url)
                            .post(requestBody)
                            .build();
    
                    Call call = okHttpClient.newCall(request);
    
                    try {
                        Response response = call.execute();
    
                        subscriber.onNext(response);
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).map(new Func1<Response, String>() {
                @Override
                public String call(Response response) {
    
                    String result = null;
                    try {
                        result = response.body().string();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println("授权成功:" + result);
    
                    JSONObject jsonObject = JSON.parseObject(result);
    
                    return jsonObject.getString("session.id");
                }
            }).flatMap(new Func1<String, Observable<String>>() {
                @Override
                public Observable<String> call(String s) {
    
                    System.out.println("sessionId:" + s);
    
                    OkHttpClient okHttpClient = getUnsafeOkHttpClient();
    
                    String url = "https://192.168.0.162:8443/executor";
    
                    RequestBody requestBody = new FormBody.Builder()
                            .add("session.id", s)
                            .add("ajax", "executeFlow")
                            .add("project", "iot")
                            .add("flow", "iot")
                            .add("flowOverride[buiz]", "iot_ads_use_x_hour")
                            .add("flowOverride[projects]", "120100lae")
                            .add("flowOverride[meterKind]", "1")
                            .add("flowOverride[meterCode]", "xxx")
                            .add("flowOverride[dt]", "2021-12-02")
                            .add("flowOverride[archive_suffix]", "")
                            .build();
    
                    Request request = new Request.Builder()
                            .url(url)
                            .post(requestBody)
                            .build();
    
                    String result = "";
                    Call call = okHttpClient.newCall(request);
                    try {
                        Response response = call.execute();
    
                        result = response.body().string();
    
                        System.out.println(result);
    
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                    return Observable.just(s + "@" + result);
                }
            }).subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
    
                            String[] split = s.split("@");
    
                            String sessionId = split[0];
                            String result = split[1];
    
                            /**
                             * flow exec result:{
                             *   "project" : "iot",
                             *   "message" : "Execution queued successfully with exec id 5626",
                             *   "flow" : "iot",
                             *   "execid" : 5626
                             * }
                             */
                            System.out.println("sessionId is" + sessionId + " and flow exec result:" + result);
                        }
                    });
    
    
        }
    
        public static OkHttpClient getUnsafeOkHttpClient() {
            try {
                // Create a trust manager that does not validate certificate chains
                final TrustManager[] trustAllCerts = new TrustManager[]{
                        new X509TrustManager() {
                            @Override
                            public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {
                            }
    
                            @Override
                            public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {
                            }
    
                            @Override
                            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                                return new java.security.cert.X509Certificate[]{};
                            }
                        }
                };
    
                // Install the all-trusting trust manager
                final SSLContext sslContext = SSLContext.getInstance("SSL");
                sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
                // Create an ssl socket factory with our all-trusting manager
                final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
                OkHttpClient.Builder builder = new OkHttpClient.Builder();
                builder.sslSocketFactory(sslSocketFactory);
                builder.hostnameVerifier(new HostnameVerifier() {
                    @Override
                    public boolean verify(String hostname, SSLSession session) {
                        return true;
                    }
                });
    
                return builder.build();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179

    下面的web项目只是spark任务启动前的前置任务:利用接口删除数据

    es数据Dao

    import com.mysql.jdbc.StringUtils;
    import lombok.SneakyThrows;
    import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.GetAliasesResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.cluster.metadata.AliasMetaData;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.index.query.BoolQueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.index.reindex.DeleteByQueryRequest;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Repository;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.Set;
    import java.util.function.Consumer;
    
    /**
     * 针对某个索引,在dt内,基于设备编码删除数据
     */
    @Repository
    public class IndexHandler {
    
    
        @Autowired
        RestHighLevelClient esClient;
    
        public void getAllIndexes() throws IOException {
    
    
            GetAliasesRequest getAliasesRequest = new GetAliasesRequest();
    
            GetAliasesResponse alias = esClient.indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);
    
            Map<String, Set<AliasMetaData>> aliases = alias.getAliases();
    
            Set<String> indices = aliases.keySet();
    
    
            indices.forEach(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    System.out.println("index:" + s);
                }
            });
    
    
        }
    
        public SearchResponse search(String dt, String meterKind, String meterCode) throws IOException {
    
            String index = "";
            switch (meterKind) {
                case "0":
                    index = "ads_iot_electricity_index2";
                    break;
                case "1":
                    index = "ads_iot_water_index";
                    break;
            }
    
            if (StringUtils.isNullOrEmpty(index)) {
                throw new IllegalArgumentException("es index is not matched!!!");
            }
    
            System.out.println("index:" + index);
    
            BoolQueryBuilder must = QueryBuilders.boolQuery()
                    .must(QueryBuilders.matchQuery("dt", dt));
    
            String meterCodeAlas = "equipmentNo";
    
            if (!StringUtils.isNullOrEmpty(meterCode)) {
                must.must(QueryBuilders.matchQuery(meterCodeAlas, meterCode));
            }
    
            System.out.println("must:" + must.toString());
    
    
            //查询api
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(must);
    
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(index.split(","));
            searchRequest.source(searchSourceBuilder);
    
            return esClient.search(searchRequest, RequestOptions.DEFAULT);
        }
    
    
        @SneakyThrows
        public BulkByScrollResponse deleteByQuery(String dt, String meterKind, String meterCode) {
    
            String index = "";
            switch (meterKind) {
                case "0":
                    index = "ads_iot_electricity_index2";
                    break;
                case "1":
                    index = "ads_iot_water_index";
                    break;
            }
    
            if (StringUtils.isNullOrEmpty(index)) {
                throw new IllegalArgumentException("es index is not matched!!!");
            }
    
    
            BoolQueryBuilder must = QueryBuilders.boolQuery()
                    .must(QueryBuilders.matchQuery("dt", dt))
                    //注意旧索引没有meterKind
    //                .must(QueryBuilders.matchQuery("meterKind", meterKind))
                    .must(QueryBuilders.matchQuery("equipmentNo", meterCode));
            System.out.println("must:" + must.toString());
    
            DeleteByQueryRequest request = new DeleteByQueryRequest();
            request.setQuery(must);
    
            request.indices(index);
    
            request.setConflicts("proceed");
            request.setTimeout(String.valueOf(TimeValue.timeValueMinutes(5)));
            request.setRefresh(true);
            request.setQuery(must);
    
    
            BulkByScrollResponse bulkByScrollResponse = esClient.deleteByQuery(request, RequestOptions.DEFAULT);
    
            return bulkByScrollResponse;
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    IndexController

    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.rest.RestStatus;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.lang.Nullable;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.*;
    
    import java.io.IOException;
    
    @Controller
    @ResponseBody
    public class IndexController {
    
        @Autowired
        IndexHandler indexHandler;
    
    
        @GetMapping("/search/{dt}/{meterKind}")
        public R search(@PathVariable("dt") String dt, @PathVariable("meterKind") String meterKind) throws IOException {
    
            return search(dt, meterKind, null);
        }
    
    
        @GetMapping("/search/{dt}/{meterKind}/{meterCode}")
        public R search(
                @PathVariable("dt") String dt
                , @PathVariable("meterKind") String meterKind
                , @Nullable @PathVariable("meterCode") String meterCode
        ) throws IOException {
    
            if ("yyyy-MM-dd".length() != dt.length()) {
                throw new IllegalArgumentException("dt格式必须为yyyy-MM-dd");
            }
    
            System.out.println("dt:" + dt);
    
            SearchResponse response = indexHandler.search(dt, meterKind, meterCode);
    
            RestStatus status = response.status();
            System.out.println("status:" + status);
    
            if (status == RestStatus.OK) {
    
                SearchHits hits = response.getHits();
    
                SearchHit[] searchHits = hits.getHits();
                for (SearchHit hit : searchHits) {
                    // do something with the SearchHit
                    String sourceAsString = hit.getSourceAsString();
                    System.out.println("hits:" + sourceAsString);
                }
    
                return R.ok().put("data", searchHits);
    
            } else {
    
                return R.error(-1, "not ok").put("data", response.status());
            }
    
        }
    
    
        /**
         * Resolved [org.springframework.web.HttpRequestMethodNotSupportedException: Request method 'GET' not supported]
         */
        //@DeleteMapping("/delete/{dt}/{meterKind}/{meterCode}")
        @RequestMapping(value = "/delete/{dt}/{meterKind}/{meterCode}", method = RequestMethod.GET)
        public R deleteByQuery(@PathVariable("dt") String dt
                , @PathVariable("meterKind") String meterKind
                , @PathVariable("meterCode") String meterCode
        ) {
    
            System.out.println("dt:" + dt);
    
            if ("yyyy-MM-dd".length() != dt.length()) {
                throw new IllegalArgumentException("dt格式必须为yyyy-MM-dd");
            }
    
            BulkByScrollResponse response = indexHandler.deleteByQuery(dt, meterKind, meterCode);
    
            System.out.println("delete response:" + response);
    
            return R.ok().put("data", response);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    EsConfig

    import lombok.Data;
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 配置绑定
     */
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "es")
    public class EsConfig {
    
        /**
         * Factory method 'esClient' threw exception;
         * nested exception is java.lang.IllegalArgumentException: Host name may not be null
         */
        private String host;
        private int port;
    
    
        @Bean
        public RestHighLevelClient esClient() {
    
            return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    R

    import org.apache.http.HttpStatus;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 返回数据
     */
    public class R<T> extends HashMap<String, Object> {
        private static final long serialVersionUID = 1L;
    
        private T data;
    
        public T getData() {
            return data;
        }
    
        public void setData(T data) {
            this.data = data;
        }
    
        public R() {
            put("code", 200);
            put("msg", "success");
        }
    
        public static R error() {
            return error(HttpStatus.SC_INTERNAL_SERVER_ERROR, "未知异常,请联系管理员");
        }
    
        public static R error(String msg) {
            return error(HttpStatus.SC_INTERNAL_SERVER_ERROR, msg);
        }
    
        public static R error(int code, String msg) {
            R r = new R();
            r.put("code", code);
            r.put("msg", msg);
            return r;
        }
    
        public static R ok(String msg) {
            R r = new R();
            r.put("msg", msg);
            return r;
        }
    
        public static R ok(Map<String, Object> map) {
            R r = new R();
            r.putAll(map);
            return r;
        }
    
        public static R ok() {
            return new R();
        }
    
        public R put(String key, Object value) {
            super.put(key, value);
            return this;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    Main

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
    
    @SpringBootApplication(scanBasePackages = "com.xx.yy", exclude = {DataSourceAutoConfiguration.class})
    public class Main {
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    TestData

    import com.mz.iot.dao.IndexHandler;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.jdbc.core.JdbcTemplate;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    @SpringBootTest
    public class TestData {
    
    //    @Autowired
    //    JdbcTemplate template;
    
        @Autowired
        IndexHandler indexHandler;
    
        @Autowired
        RestHighLevelClient esClient;
    
        @Test
        void testGetAllIndexes() throws IOException {
    
            indexHandler.getAllIndexes();
        }
    
        @Test
        void testEsClient() {
    
            System.out.println("esClient:" + esClient);
        }
    
        @Test
        void testDelete() {
            BulkByScrollResponse bulkByScrollResponse = indexHandler.deleteByQuery("2021-12-23", "1", "CACVHNM61U62");
    
            System.out.println("test delete response:" + bulkByScrollResponse);
        }
    
        @Test
        void testSearch() throws IOException {
    
            SearchResponse response = indexHandler.search("2021-12-23", "1", "CACVHNM61U62");
    
            System.out.println("test search response:" + response);
    
            SearchHits hits = response.getHits();
    
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit
                String sourceAsString = hit.getSourceAsString();
                System.out.println("hits:" + sourceAsString);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    application.propertis

    server.port=8888
    server.tomcat.accesslog.encoding=utf-8
    debug=false
    my.car.brand=byd
    my.car.price=9999.98
    #Consider the following:
    #If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
    #If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).
    spring.profiles.active=test
    spring.mvc.hiddenmethod.filter.enabled=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    application-test.yml

    spring:
      datasource:
        url: jdbc:mysql://192.168.33.169:3306/paascloud_wfs
        username: mzadmin
        password: Mz@123456
        driver-class-name: com.mysql.jdbc.Driver
      jdbc:
        template:
          query-timeout: 30
      data:
        elasticsearch:
          client:
            reactive:
              endpoints: 191.168.33.163:9200
      application:
        name: cupon
    es:
      host: 192.168.33.163
      port: 9200
    my:
      car:
        brand: bwm
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.4.RELEASE</version>
            <relativePath/>
        </parent>
    
        <artifactId>iot_web</artifactId>
        <modelVersion>4.0.0</modelVersion>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <!--覆盖父工程spring-boot-starter-parent中定义的依赖版本-->
            <elasticsearch.version>6.5.4</elasticsearch.version>
            <lombok.version>1.18.18</lombok.version>
            <mysql.version>5.1.48</mysql.version>
    
        </properties>
    
    
        <dependencies>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jdbc</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>com.squareup.okhttp3</groupId>
                <artifactId>okhttp</artifactId>
                <version>4.9.1</version>
            </dependency>
    
            <dependency>
                <groupId>io.reactivex</groupId>
                <artifactId>rxjava</artifactId>
                <version>1.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.75</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
    
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-configuration-processor</artifactId>
                            </exclude>
                        </excludes>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
  • 相关阅读:
    【分割链表】
    理解Go语言中的GOPATH
    基于类似Kiva的移动机器人的路径规划(Matlab代码实现)
    十年老程序员的职场教训,很受用
    c# 4.8 实现Windows 定时任务计划(Task Scheduler)
    html文件使用postcss-pxtorem适配移动端 && 使用tailwindcss库
    windows下redis设置redis开机自启动方法
    华为数字化转型之道 方法篇 第三章 数字化转型框架
    线上 udp 客户端请求服务端客户端句柄泄漏问题
    密码学奇妙之旅、03 HMAC单向散列消息认证码、Golang代码
  • 原文地址:https://blog.csdn.net/u013727054/article/details/134515659