• spark读写elasticsearch的坑


    // 写elasticsearch的代码
    ds.write
          .format("org.elasticsearch.spark.sql")
          .option("es.nodes.wan.only", "true")
          .option("es.mapping.id", "_id")
          .option("es.mapping.exclude", "_id")
          .option("es.nodes", host)
          .option("es.port", port)
          .option("es.update.script.lang","painless")
          .option("es.update.script.inline",script)  // es.update.script.inline 6.0以及之后的版本
          .option("es.update.script.params", params)
          .option("es.write.operation", "upsert")
          .option("es.batch.write.retry.count", 3)
          .option("es.update.retry.on.conflict", 3)
          .option("es.mapping.exclude", "_id")
          .option("es.batch.write.refresh", "false") // 在每次bulk操作之后执行refresh操作, 默认为true
          .option("es.batch.size.bytes", "1mb") // 默认每次bulk操作的数据量大小
          .option("es.batch.size.entries", "1000") // bulk的操作数据条数
          .mode("append")
          .save(index_name)
    
    // 读取elasticsearch的代码配置
    val ds = spark.read
          .format("org.elasticsearch.spark.sql")
          .option("es.read.metadata", "true") // 读取元数据信息
          .option("es.read.metadata.field", "_metadata")
          .option("es.nodes.wan.only","true") // 公网的时候必传
          .option("pushdown", "true")
          .option("es.port",port)
          .option("es.net.ssl","false")
          .option("es.nodes", host)
          .option("query", query) // 传入查询的dsl语句
          .option("es.read.field.include", includeField) // 读取数据的时候可以在这个进行字段筛选
          .option("es.read.field.as.array.include", arrIncludeField) // 在读取数组的时候需要加这个参数,否则无法识别
          .option("es.mapping.date.rich", "false")
          .option("es.scroll.size", "10000") // es滚动读取的时候每次请求的数据最大条数, 默认50
          .option("es.input.max.docs.per.partition", "100000") // 每个分区处理的最大条数, 默认100000
          .load(index_name)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    1、es.nodes.wan.only

    如果运行的spark程序和你的elasitcsearch是在同一个网段的时候,不加这个是没有问题的。但是如果不在同一个网段比如是在公网上面的话,则会报请求失败的错误。

    2、es.update.retry.on.conflict

    如果是并发更新的时候,如果是更新到同一条记录上的时候则会报冲突,所以要设置这个参数

    3、es.update.script.inline

    elasticsear的spark插件在6.0版本之前是es.update.script这个参数,由于我自己用的是5.x的版本所以用的是es.update.script参数,但是这里在更新嵌套类型的数据结构的时候会报无法转成scala.tuple2的错误,只需要使用es.update.script.inline这个参数就能解决,但是使用这个参数在更新es7.0的时候还是会报这个错误:Upsert nested fields with Spark - Elasticsearch - Discuss the Elastic Stack

    附上spark on elasticsearch的配置项链接:Configuration | Elasticsearch for Apache Hadoop [7.16] | Elastic

    4、spark-sql的udf里面获取广播变量的问题

    今天碰到一个类似于:https://segmentfault.com/q/1010000008010132这个的问题,在udf里面调用广播变量的value获取值一直报空指针,后来在udf的类里面 将广播变量作为成员变量获取到了。由于udf的函数的类文件会分发到各个excutor节点上进行调用所以构造好的广播变量的成员变量在各个executor上也能顺利获取到。

    2022-01-10更新

    问题:集群出现以上的情况,当时有一个计算指标的任务,以upsert的方式往elasticsearch写入200多w条数据,然后卡顿了6个多小时都没写完,任务表现为hang住,es的cpu飙升到100%左右。

    这个问题后来询问网友,问题应该是出现在elasticsearch的默认1s的refresh策略导致的,具体优化策略为修改es的refesh interval为-1,当数据写完之后,然后refresh=true,强制刷新一下。

    由于spark on ealsticsearch5.6版本配置项es.batch.write.refresh = true。默认会在每次bulk写完成之后强制刷新一次,所以这可能会导致cpu飙升。

    配置地址:Configuration | Elasticsearch for Apache Hadoop [5.6] | Elastic[这里是图片002]https://www.elastic.co/guide/en/elasticsearch/hadoop/5.6/configuration.html

    20220329更新:

    最近由于需要进行elasticsearch5.x的集群数据迁移到elasticsearch7.10上去,由于集群都在阿里云上,并且也不支持logstash和reindex的方式,所以只能写代码进行同步。

    使用spark rdd 【elasticsearch-spark-20_2.11】方式来读取es,然后使用【elasticsearch-rest-high-level-client】批量写入到集群中去。

    其中在spark读取es的时候的参数配置如下:

    .set("es.scroll.size", 5000)
    .set("es.input.max.docs.per.partition", 5000000)
    .set("es.input.use.sliced.partitions", false)
    .set("es.scroll.keepalive", "10m")
    
    • 1
    • 2
    • 3
    • 4

    其中es.input.max.docs.per.partition 决定spark会生成多少个partition对应执行的task,同时es.scroll.size 指定了每次滚动查询获取数据的条数。

    在RestService.findPartitions获取job需要生成的partitions列表:

    public static List findPartitions(Settings settings, Log log) {
        Version.logVersion();
    
        InitializationUtils.validateSettings(settings);
        InitializationUtils.validateSettingsForReading(settings);
    
        EsMajorVersion version = InitializationUtils.discoverEsVersion(settings, log);
        List nodes = InitializationUtils.discoverNodesIfNeeded(settings, log);
        InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
        InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
        InitializationUtils.filterNonIngestNodesIfNeeded(settings, log);
    
        RestRepository client = new RestRepository(settings);
        try {
            boolean indexExists = client.indexExists(true);
    
            List>> shards = null;
    
            if (!indexExists) {
                if (settings.getIndexReadMissingAsEmpty()) {
                    log.info(String.format("Index [%s] missing - treating it as empty", settings.getResourceRead()));
                    shards = Collections.emptyList();
                } else {
                    throw new EsHadoopIllegalArgumentException(
                            String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), ConfigurationOptions.ES_INDEX_READ_MISSING_AS_EMPTY));
                }
            } else {
                shards = client.getReadTargetShards();
                if (log.isTraceEnabled()) {
                    log.trace("Creating splits for shards " + shards);
                }
            }
    
            log.info(String.format("Reading from [%s]", settings.getResourceRead()));
    
            MappingSet mapping = null;
            if (!shards.isEmpty()) {
                mapping = client.getMappings();
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Discovered resolved mapping {%s} for [%s]", mapping.getResolvedView(), settings.getResourceRead()));
                }
                // validate if possible
                FieldPresenceValidation validation = settings.getReadFieldExistanceValidation();
                if (validation.isRequired()) {
                    MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping.getResolvedView(), validation, log);
                }
            }
            final Map nodesMap = new HashMap();
            if (nodes != null) {
                for (NodeInfo node : nodes) {
                    nodesMap.put(node.getId(), node);
                }
            }
            final List partitions;
            // 判断es的版本是否是5.x或之后的版本,则为每个shard生成总文档数 / es.input.max.docs.per.partition 个partition
            if (version.onOrAfter(EsMajorVersion.V_5_X)) {
                partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
            } else {
                // 如果是5.x之前的版本,则根据有多少个shard生成多少个partition
                partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
            }
            Collections.shuffle(partitions);
            return partitions;
        } finally {
            client.close();
        }
    }
    
    /**
     * Create one {@link PartitionDefinition} per shard for each requested index.
     * 则一个shard生成一个partition
     */
    static List findShardPartitions(Settings settings, MappingSet mappingSet, Map nodes,
                                                         List>> shards, Log log) {
        Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
        List partitions = new ArrayList(shards.size());
        for (List> group : shards) {
            String index = null;
            int shardId = -1;
            List locationList = new ArrayList ();
            for (Map replica : group) {
                ShardInfo shard = new ShardInfo(replica);
                index = shard.getIndex();
                shardId = shard.getName();
                if (nodes.containsKey(shard.getNode())) {
                    locationList.add(nodes.get(shard.getNode()).getPublishAddress());
                }
            }
            if (index == null) {
                // Could not find shards for this partition. Continue anyway?
                if (settings.getIndexReadAllowRedStatus()) {
                    log.warn("Shard information is missing from an index and will not be reached during job execution. " +
                            "Assuming shard is unavailable and cluster is red! Continuing with read operation by " +
                            "skipping this shard! This may result in incomplete data retrieval!");
                } else {
                    throw new IllegalStateException("Could not locate shard information for one of the read indices. " +
                            "Check your cluster status to see if it is unstable!");
                }
            } else {
                PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
                        locationList.toArray(new String[0]));
                partitions.add(partition);
            }
        }
        return partitions;
    }
    
    /**
     * Partitions the query based on the max number of documents allowed per partition {@link Settings#getMaxDocsPerPartition()}.
     */
    static List findSlicePartitions(RestClient client, Settings settings, MappingSet mappingSet,
                                                         Map nodes, List>> shards, Log log) {
        QueryBuilder query = QueryUtils.parseQueryAndFilters(settings);
        int maxDocsPerPartition = settings.getMaxDocsPerPartition();
        String types = new Resource(settings, true).type();
        Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
    
        List partitions = new ArrayList(shards.size());
        for (List> group : shards) {
            String index = null;
            int shardId = -1;
            List locationList = new ArrayList ();
            for (Map replica : group) {
                ShardInfo shard = new ShardInfo(replica);
                index = shard.getIndex();
                shardId = shard.getName();
                if (nodes.containsKey(shard.getNode())) {
                    locationList.add(nodes.get(shard.getNode()).getPublishAddress());
                }
            }
            String[] locations = locationList.toArray(new String[0]);
            if (index == null) {
                // Could not find shards for this partition. Continue anyway?
                if (settings.getIndexReadAllowRedStatus()) {
                    log.warn("Shard information is missing from an index and will not be reached during job execution. " +
                            "Assuming shard is unavailable and cluster is red! Continuing with read operation by " +
                            "skipping this shard! This may result in incomplete data retrieval!");
                } else {
                    throw new IllegalStateException("Could not locate shard information for one of the read indices. " +
                            "Check your cluster status to see if it is unstable!");
                }
            } else {
                StringBuilder indexAndType = new StringBuilder(index);
                if (StringUtils.hasLength(types)) {
                    indexAndType.append("/");
                    indexAndType.append(types);
                }
                // TODO applyAliasMetaData should be called in order to ensure that the count are exact (alias filters and routing may change the number of documents)
                // 先获取index下每个shard的文档数量
                long numDocs = client.count(indexAndType.toString(), Integer.toString(shardId), query);
                // 然后每个shard的文档数量 / es.input.max.docs.per.partition 就是该shard对应的partition数量
                int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
                // 生成每个shard对应的计算出的partition数量
                for (int i = 0; i < numPartitions; i++) {
                    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
                    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
                }
            }
        }
        return partitions;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
  • 相关阅读:
    java计算机毕业设计基于安卓Android的仓库货物管理app(源码+系统+mysql数据库+Lw文档)
    PCL 空间两平面交线计算
    Jx_Python基础库入门_2: 基本数据类型
    应用程序生成器:App Builder 2023
    吲哚菁绿ICG标记海藻酸钠|ICG-海藻酸钠|alginate-Indocyaninegreen
    苹果电脑好用的剪切板管理工具 Paste激活中文版最新
    使用QLoRA对Llama 2进行微调的详细笔记
    【Vite】development、mock和production不同环境下的配置
    毫米波雷达人体感应器,智能感知静止存在,人体存在检测应用
    ZMQ之自杀的蜗牛模式和黑箱模式
  • 原文地址:https://blog.csdn.net/m0_67401746/article/details/126496578