// 写elasticsearch的代码
ds.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only", "true")
.option("es.mapping.id", "_id")
.option("es.mapping.exclude", "_id")
.option("es.nodes", host)
.option("es.port", port)
.option("es.update.script.lang","painless")
.option("es.update.script.inline",script) // es.update.script.inline 6.0以及之后的版本
.option("es.update.script.params", params)
.option("es.write.operation", "upsert")
.option("es.batch.write.retry.count", 3)
.option("es.update.retry.on.conflict", 3)
.option("es.mapping.exclude", "_id")
.option("es.batch.write.refresh", "false") // 在每次bulk操作之后执行refresh操作, 默认为true
.option("es.batch.size.bytes", "1mb") // 默认每次bulk操作的数据量大小
.option("es.batch.size.entries", "1000") // bulk的操作数据条数
.mode("append")
.save(index_name)
// 读取elasticsearch的代码配置
val ds = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.read.metadata", "true") // 读取元数据信息
.option("es.read.metadata.field", "_metadata")
.option("es.nodes.wan.only","true") // 公网的时候必传
.option("pushdown", "true")
.option("es.port",port)
.option("es.net.ssl","false")
.option("es.nodes", host)
.option("query", query) // 传入查询的dsl语句
.option("es.read.field.include", includeField) // 读取数据的时候可以在这个进行字段筛选
.option("es.read.field.as.array.include", arrIncludeField) // 在读取数组的时候需要加这个参数,否则无法识别
.option("es.mapping.date.rich", "false")
.option("es.scroll.size", "10000") // es滚动读取的时候每次请求的数据最大条数, 默认50
.option("es.input.max.docs.per.partition", "100000") // 每个分区处理的最大条数, 默认100000
.load(index_name)
1、es.nodes.wan.only
如果运行的spark程序和你的elasitcsearch是在同一个网段的时候,不加这个是没有问题的。但是如果不在同一个网段比如是在公网上面的话,则会报请求失败的错误。
2、es.update.retry.on.conflict
如果是并发更新的时候,如果是更新到同一条记录上的时候则会报冲突,所以要设置这个参数
3、es.update.script.inline
elasticsear的spark插件在6.0版本之前是es.update.script这个参数,由于我自己用的是5.x的版本所以用的是es.update.script参数,但是这里在更新嵌套类型的数据结构的时候会报无法转成scala.tuple2的错误,只需要使用es.update.script.inline这个参数就能解决,但是使用这个参数在更新es7.0的时候还是会报这个错误:Upsert nested fields with Spark - Elasticsearch - Discuss the Elastic Stack
附上spark on elasticsearch的配置项链接:Configuration | Elasticsearch for Apache Hadoop [7.16] | Elastic
4、spark-sql的udf里面获取广播变量的问题
今天碰到一个类似于:https://segmentfault.com/q/1010000008010132这个的问题,在udf里面调用广播变量的value获取值一直报空指针,后来在udf的类里面 将广播变量作为成员变量获取到了。由于udf的函数的类文件会分发到各个excutor节点上进行调用所以构造好的广播变量的成员变量在各个executor上也能顺利获取到。
问题:集群出现以上的情况,当时有一个计算指标的任务,以upsert的方式往elasticsearch写入200多w条数据,然后卡顿了6个多小时都没写完,任务表现为hang住,es的cpu飙升到100%左右。
这个问题后来询问网友,问题应该是出现在elasticsearch的默认1s的refresh策略导致的,具体优化策略为修改es的refesh interval为-1,当数据写完之后,然后refresh=true,强制刷新一下。
由于spark on ealsticsearch5.6版本配置项es.batch.write.refresh = true。默认会在每次bulk写完成之后强制刷新一次,所以这可能会导致cpu飙升。
最近由于需要进行elasticsearch5.x的集群数据迁移到elasticsearch7.10上去,由于集群都在阿里云上,并且也不支持logstash和reindex的方式,所以只能写代码进行同步。
使用spark rdd 【elasticsearch-spark-20_2.11】方式来读取es,然后使用【elasticsearch-rest-high-level-client】批量写入到集群中去。
其中在spark读取es的时候的参数配置如下:
.set("es.scroll.size", 5000)
.set("es.input.max.docs.per.partition", 5000000)
.set("es.input.use.sliced.partitions", false)
.set("es.scroll.keepalive", "10m")
其中es.input.max.docs.per.partition 决定spark会生成多少个partition对应执行的task,同时es.scroll.size 指定了每次滚动查询获取数据的条数。
在RestService.findPartitions获取job需要生成的partitions列表:
public static List findPartitions(Settings settings, Log log) {
Version.logVersion();
InitializationUtils.validateSettings(settings);
InitializationUtils.validateSettingsForReading(settings);
EsMajorVersion version = InitializationUtils.discoverEsVersion(settings, log);
List nodes = InitializationUtils.discoverNodesIfNeeded(settings, log);
InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
InitializationUtils.filterNonIngestNodesIfNeeded(settings, log);
RestRepository client = new RestRepository(settings);
try {
boolean indexExists = client.indexExists(true);
List>> shards = null;
if (!indexExists) {
if (settings.getIndexReadMissingAsEmpty()) {
log.info(String.format("Index [%s] missing - treating it as empty", settings.getResourceRead()));
shards = Collections.emptyList();
} else {
throw new EsHadoopIllegalArgumentException(
String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), ConfigurationOptions.ES_INDEX_READ_MISSING_AS_EMPTY));
}
} else {
shards = client.getReadTargetShards();
if (log.isTraceEnabled()) {
log.trace("Creating splits for shards " + shards);
}
}
log.info(String.format("Reading from [%s]", settings.getResourceRead()));
MappingSet mapping = null;
if (!shards.isEmpty()) {
mapping = client.getMappings();
if (log.isDebugEnabled()) {
log.debug(String.format("Discovered resolved mapping {%s} for [%s]", mapping.getResolvedView(), settings.getResourceRead()));
}
// validate if possible
FieldPresenceValidation validation = settings.getReadFieldExistanceValidation();
if (validation.isRequired()) {
MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping.getResolvedView(), validation, log);
}
}
final Map nodesMap = new HashMap();
if (nodes != null) {
for (NodeInfo node : nodes) {
nodesMap.put(node.getId(), node);
}
}
final List partitions;
// 判断es的版本是否是5.x或之后的版本,则为每个shard生成总文档数 / es.input.max.docs.per.partition 个partition
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
} else {
// 如果是5.x之前的版本,则根据有多少个shard生成多少个partition
partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
}
Collections.shuffle(partitions);
return partitions;
} finally {
client.close();
}
}
/**
* Create one {@link PartitionDefinition} per shard for each requested index.
* 则一个shard生成一个partition
*/
static List findShardPartitions(Settings settings, MappingSet mappingSet, Map nodes,
List>> shards, Log log) {
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
List partitions = new ArrayList(shards.size());
for (List