问题场景:1,使用BulkRequest写入索引数据不生效,接口正常返回,索引没有数据。
先看原始代码:
- @Override
- public ResultMsg
saveRealtimeMsg(IMRealTimeMessageDTO imRealTImeMessageDTO) { -
- try {
- //1.创建批量导入数据
- BulkRequest bulkRequest = new BulkRequest();
- String index = elasticSearchConfig.getImRealtimeIndex();
- //2.将数据批量添加
- for(IMRealTimeMessageEntity entity : imRealTImeMessageDTO.getResult()){
- entity.setId(RandomUtil.randomString(elasticSearchConfig.getUuIdStr(), 18));
- if (StringUtils.isEmpty(entity.getSessionId())) {
- String sessionId = (String) JSONPath.read(entity.getCloudCustomData(), "sessionID");
- if (StringUtils.isNotBlank(sessionId)) {
- entity.setSessionId(sessionId);
- }
- }
-
- bulkRequest.add(
- new IndexRequest(index)
- //不填id时将会生成随机id
- .id(UUID.randomUUID().toString())
- .source(JSON.toJSONString(entity), XContentType.JSON).type("_doc")
- );
- }
- //3.执行请求
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- if(bulkResponse.status() != null && bulkResponse.status().getStatus()== HttpServletResponse.SC_OK){
- log.info("实时聊天消息es写入成功");
- return ResultMsg.ok("es数据同步!!!");
- }
- return ResultMsg.failed("es数据同步失败",bulkResponse.buildFailureMessage());
-
- } catch (Exception e) {
- e.printStackTrace();
- log.info("es data 同步失败:{}",e.getMessage());
- return ResultMsg.failed("es数据同步失败:{}",e.getMessage());
- }
- }
从代码层面来看写入数据是一点问题都没有的,我们接口也是200


数据也是没有的执行了操作也是没有反应。那怎么解决这个问题?
GET 索引/_mapping

如果上面没有问题我们就看代码的配置。
经过断点调试查看代码:
我们的日期格式在es是yyyymmdd hhmmss格式但是这里是个时间蹉格式,肯定就不对了,但是你会说我jsonfamart也不行吗?
这样也不行,我序列化的时候tostring不行吗,答案是不行的,只有在序列化的时候转换一下。
-
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- if(bulkResponse.status() != null && bulkResponse.status().getStatus()== HttpServletResponse.SC_OK){
- log.info("实时聊天消息es写入成功");
- return ResultMsg.ok("es数据同步!!!");
- }
- return ResultMsg.failed("es数据同步失败",bulkResponse.buildFailureMessage());
正确的方式:
-
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- if(bulkResponse.hasFailures()){
- log.info("数据写入失败:{}",bulkResponse.buildFailureMessage());
- return ResultMsg.failed("es数据同步失败",bulkResponse.buildFailureMessage());
- }else {
- log.info("实时聊天消息es写入成功");
- return ResultMsg.ok("es数据同步成功!!!");
- }
错误日志:

{"index":"im_realtime_message","type":"_doc","id":"eebeb54a-00fd-4998-9783-0c42eea3e492","cause":{"type":"exception","reason":"Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [insertTime] of type [date] in document with id 'eebeb54a-00fd-4998-9783-0c42eea3e492'. Preview of field's value: '1660835342572']","caused_by":{"type":"exception","reason":"Elasticsearch exception [type=illegal_argument_exception, reason=failed to parse date field [1660835342572] with format [yyyy-MM-dd HH:mm:ss]]","caused_by":{"type":"exception","reason":"Elasticsearch exception [type=date_time_parse_exception, reason=date_time_parse_exception: Text '1660835342572' could not be parsed at index 0]"}}},"status":400}
这个错就很明显了我们的时间格式不对对不对,咋解决呢?

我们在写入时间的时候格式化一下就搞定了
bulkRequest.add(
new IndexRequest(index)
//不填id时将会生成随机id
.id(UUID.randomUUID().toString())
.source(
JSONObject.toJSONStringWithDateFormat(entity, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteDateUseDateFormat)
// JSON.toJSONString(entity)
, XContentType.JSON).type("_doc")
);