ElasticSearch 官方提供了各种不同语言的客户端,用来操作 ES。这些客户端的本质就是组装 DSL 语句,通过 HTTP 请求发送给 ES 服务器。
官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html。
在本文中,我们将着重介绍 ElasticSearch Java 客户端中的 RestClient,并演示如何使用它实现对索引库和文档的各种操作。
RestClient 是 ElasticSearch 提供的用于与 ElasticSearch 集群进行通信的 Java 客户端。它提供了一种简单而灵活的方式来执行 REST 请求,并处理响应。通过 RestClient,我们可以轻松地在 Java 中操作 ElasticSearch。
在 ES 官方文档中看到 Java 的 RestClient 分为 High level REST client
和 Low level REST client
:
它们之间的区别:
High Level REST Client:
Low Level REST Client:
RestHighLevelClient 是 Elasticsearch Java 客户端中的高级客户端,提供了更加方便和抽象的操作方式,适用于大多数的 Elasticsearch 操作。在使用 RestHighLevelClient 之前,需要创建一个 RestClient 实例,并将其包装在 RestHighLevelClient 中。
主要功能和特点:
面向对象的操作:RestHighLevelClient 提供了更加面向对象的 API,使得 Elasticsearch 操作更加符合 Java 开发的习惯,易于理解和使用。
内置序列化和反序列化:RestHighLevelClient 内置了 Jackson 库,可以自动序列化和反序列化 Elasticsearch 的请求和响应,无需手动处理 JSON 数据。
复杂查询支持:支持复杂的 Elasticsearch 查询操作,如布尔查询、范围查询、聚合查询等。
错误处理:提供了异常处理机制,能够更好地捕获和处理 Elasticsearch 操作中的错误。
并发性:RestHighLevelClient 可以处理多个并发请求,是多线程安全的。
常用操作和方法:
以下是 RestHighLevelClient 类的一些常用操作和方法,通过这些方法可以实现对 Elasticsearch 的索引库和文档的各种操作:
操作 | 方法 | 描述 |
---|---|---|
索引文档 | IndexResponse index(IndexRequest request, RequestOptions options) | 向指定索引插入文档 |
获取文档 | GetResponse get(GetRequest request, RequestOptions options) | 根据文档 ID 获取文档 |
删除文档 | DeleteResponse delete(DeleteRequest request, RequestOptions options) | 根据文档 ID 删除文档 |
更新文档 | UpdateResponse update(UpdateRequest request, RequestOptions options) | 根据文档 ID 更新文档 |
批量操作 | BulkResponse bulk(BulkRequest request, RequestOptions options) | 批量执行操作 |
查询 | SearchResponse search(SearchRequest request, RequestOptions options) | 执行搜索查询 |
聚合查询 | SearchResponse search(SearchRequest request, RequestOptions options) | 执行聚合查询 |
清理滚动 | ClearScrollResponse clearScroll(ClearScrollRequest request, RequestOptions options) | 清理滚动上下文 |
以上只是 RestHighLevelClient 类的一部分方法,更多详细的操作和方法请参考 官方文档。这些方法提供了丰富的功能,可以满足各种 Elasticsearch 操作的需求。
当需要将已有的数据库数据导入到 Elasticsearch 索引中时,首先需要定义好 Elasticsearch 索引的 mapping 结构,这样 Elasticsearch 才能正确解析和存储数据。
在这个例子中,我们有一个名为 hotel
的数据库表,它有各种不同类型的字段,包括文本、数字、地理坐标等。让我们逐步解释如何根据数据库表的结构编写创建 Elasticsearch 索引的 DSL(Domain Specific Language)语句。
- 数据库表结构分析
首先,让我们来看一下 hotel
表的结构:
+-----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| id | bigint | NO | PRI | NULL | |
| name | varchar(255) | NO | | NULL | |
| address | varchar(255) | NO | | NULL | |
| price | int | NO | | NULL | |
| score | int | NO | | NULL | |
| brand | varchar(32) | NO | | NULL | |
| city | varchar(32) | NO | | NULL | |
| star_name | varchar(16) | YES | | NULL | |
| business | varchar(255) | YES | | NULL | |
| latitude | varchar(32) | NO | | NULL | |
| longitude | varchar(32) | NO | | NULL | |
| pic | varchar(255) | YES | | NULL | |
+-----------+--------------+------+-----+---------+-------+
这个表包含了以下字段:
id
:长整型(bigint
)name
:文本字符串(varchar
),用于存储酒店名称address
:文本字符串,用于存储酒店地址price
:整数(int
),表示酒店价格score
:整数,表示酒店评分brand
:文本字符串,用于存储酒店品牌city
:文本字符串,用于存储城市名称star_name
:文本字符串,用于存储星级名称(可为空)business
:文本字符串,用于存储营业信息(可为空)latitude
:文本字符串,用于存储纬度坐标longitude
:文本字符串,用于存储经度坐标pic
:文本字符串,用于存储图片路径(可为空)
- 创建 Elasticsearch 索引的 DSL
现在,让我们将上述数据库表的结构映射到 Elasticsearch 索引的 DSL 中:
PUT /hotel
{
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"copy_to": "all"
},
"address": {
"type": "keyword",
"index": false
},
"price": {
"type": "integer"
},
"score": {
"type": "integer"
},
"brand": {
"type": "keyword",
"copy_to": "all"
},
"city": {
"type": "keyword"
},
"starName": {
"type": "keyword"
},
"business": {
"type": "keyword",
"copy_to": "all"
},
"location": {
"type": "geo_point"
},
"pic": {
"type": "keyword",
"index": false
},
"all": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
重要说明:
location
) 字段:在 MySQL 数据库表中,地理坐标是使用 latitude
和 longitude
两个字段表示的。但在 Elasticsearch 中,我们使用 geo_point
类型来表示地理坐标。
geo_point
:由纬度(latitude
)和经度(longitude
)确定的一个点。例如:“32.8752345, 120.2981576”。补充:ES 中支持两种地理坐标数据类型
geo_point
:由纬度(latitude
)和经度(longitude
)确定的一个点。例如:"32.8752345, 120.2981576"
。geo_shape
:有多个geo_point
组成的复杂几何图形。例如一条直线:"LINESTRING (-77.03653 38.897676, -77.009051 38.889939)"
。字段拷贝的目的是在搜索时,同时匹配多个字段。我们使用 copy_to
属性将当前字段拷贝到指定字段。这样,在搜索时,可以同时匹配 name
、brand
和 business
字段。
示例:
"all": {
"type": "text",
"analyzer": "ik_max_word"
},
"brand": {
"type": "keyword",
"copy_to": "all"
}
- 映射规则总结
Elasticsearch 对不同类型的字段有不同的映射规则,以下是常见类型的映射规则:
text
和 keyword
): text
用于全文搜索,支持分词;keyword
用于精确匹配,不分词。integer
): 用于存储整数。long
): 用于存储长整数。float
): 用于存储浮点数。geo_point
): 用于存储地理坐标。date
): 用于存储日期时间。通过正确定义索引的映射规则,我们可以更有效地利用 Elasticsearch 的搜索和分析功能。
RestHighLevelClient
的依赖首先在 pom.xml
中引入依赖
<dependency>
<groupId>org.elasticsearch.clientgroupId>
<artifactId>elasticsearch-rest-high-level-clientartifactId>
<version>7.12.1version>
dependency>
此次引入的版本是 7.12.1
版本的,目的是与ElasticSearch 服务器的版本相同。刷新 Maven 之后,发现 elasticsearch-rest-high-level-client
依赖中某些组件的版本并不是7.12.1
的:
其原因是 Spring Boot 已经自动为我们管理了一些依赖,其中就包括了elasticsearch
,其版本就是 7.6.2
的。
因此我们需要做的就是在 pom.xml
覆盖这个配置,即在 properties
中指定版本为 7.12.1
:
再次刷新 Maven,就能够发现所有组件的版本都是 7.12.1
了:
@SpringBootTest
class HotelIndexTests {
private RestHighLevelClient client;
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.248.128:9200")
));
}
@AfterEach
void tearDown() throws IOException {
this.client.close();
}
/**
* 测试初始化客户端
*/
@Test
void testInit() {
System.out.println(client);
}
}
对上述代码的说明:
RestClient.builder
构建了 RestHighLevelClient
实例,并指定了 ElasticSearch 服务器的地址为 http://192.168.248.128:9200
。这是一个简单的单元测试,用于验证客户端的初始化是否成功。@BeforeEach
注解的方法中,我们创建了 RestHighLevelClient
的实例,而在 @AfterEach
注解的方法中,我们关闭了客户端。这是为了保证测试用例执行前后,客户端都能够正确地被初始化和关闭。testInit
中,我们简单地打印了客户端对象,以验证其初始化是否成功。在接下来的内容中,我们将继续使用编写单元测试方法,执行一系列对 ElasticSearch 索引库和文档的操作。
创建 hotel
索引库会使用到前文根据 hotle
表结构编写的 DSL mapping 映射,在Java代码中,我们需要将其封装成一个全局常量,例如,将其保存到名为 MAPPING_TEMPLATE
的常量字符串中:
public class HotelConstants {
public static final String MAPPING_TEMPLATE = "{\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"id\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"address\": {\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"price\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"score\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"brand\": {\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"city\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"starName\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"business\": {\n" +
" \"type\": \"keyword\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"location\": {\n" +
" \"type\": \"geo_point\"\n" +
" },\n" +
" \"pic\": {\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"all\": {\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_max_word\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
}
下面是创建索引库的单元测试方:
/**
* 创建索引库
*
* @throws IOException 抛出异常
*/
@Test
void testCreateHotelIndex() throws IOException {
// 1. 创建 Request 对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
// 2. 准备请求参数:DSL 语句
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3. 发起请求
client.indices().create(request, RequestOptions.DEFAULT);
}
对上述代码的说明:
CreateIndexRequest
对象,指定索引库的名称为 "hotel"
。MAPPING_TEMPLATE
常量。client.indices().create(request, RequestOptions.DEFAULT)
执行。这样,我们就完成了通过 Java RestClient 创建 ElasticSearch 索引库的操作。在实际应用中,创建索引库是一个初始化工作,通常在应用启动时执行一次即可。
以下是使用 Java RestClient 删除名为 “hotel” 的索引库的单元测试方法:
/**
* 删除索引库
* @throws IOException 抛出异常
*/
@Test
void testDeleteHotelIndex() throws IOException {
// 1. 创建 Request 对象
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
// 2. 发起请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
对上述代码的说明:
DeleteIndexRequest
对象,指定要删除的索引库名称为 “hotel”。client.indices().delete(request, RequestOptions.DEFAULT)
执行。这个方法主要用于清理测试环境或者在应用退出时执行,以确保数据的整洁和安全。
删除索引库的操作需要谨慎执行,因为它会将整个索引库以及其中的所有文档都删除,且无法恢复。在实际应用中,通常会设置一些安全机制来避免误操作。
在 Elasticsearch 中,我们可以通过 Java RestClient 来判断指定的索引库是否存在。以下是一个示例代码:
@Test
void testExistsHotelIndex() throws IOException {
// 1. 创建 GetIndexRequest 对象,指定要判断是否存在的索引库名称为 "hotel"
GetIndexRequest request = new GetIndexRequest("hotel");
// 2. 发起请求,执行判断索引库是否存在的操作
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 3. 打印判断结果
System.out.println(exists ? "索引库存在" : "索引库不存在");
}
在这个方法中,首先创建了一个 GetIndexRequest
对象,指定了要判断是否存在的索引库名称为 “hotel”。然后,通过 client.indices().exists(request, RequestOptions.DEFAULT)
发起请求,执行判断索引库是否存在的操作。最后,根据返回的布尔值,输出相应的提示信息。
这个方法通常用于在进行其他操作之前,先判断索引库是否存在,以确保我们不会对不存在的索引库执行其他操作。
在新增文档之前,首先需要从数据库中去查询一条记录,然后再将查询到的记录保存到 ES 文档中。例如,现在有一条 id=61083
的酒店数据,我们需要把它查询出来,然后添加到文档中:
首先同样需要创建一个测试类HotelDocumentTests
,并完成 RestHighLevelClient
的初始化。然后新增文档的测试代码如下:
@Test
void testAddDocument() throws IOException {
// 根据id查询酒店
Hotel hotel = hotelService.getById(61083L);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1. 准备 Request 对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2. 准备 JSON 文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3. 发送请求
client.index(request, RequestOptions.DEFAULT);
}
对上述代码的说明:
根据 ID 查询酒店数据: 使用 hotelService.getById(61083L)
方法从数据库中根据酒店 ID(这里是 61083L
)查询酒店数据并封装到 Hotel
对象中。
转换为文档类型: 将查询到的 Hotel
类型转换为HotelDoc
文档类型,因为数据库和文档中表示经纬度的方式不同。
准备 IndexRequest
对象: 创建 IndexRequest
对象,指定索引库名称为 “hotel”,并设置文档 ID 为酒店的 ID(使用 hotel.getId().toString()
获取 ID 的字符串表示)。
准备 JSON 文档: 将 HotelDoc
对象转换为 JSON 格式的字符串,使用 JSON.toJSONString(hotelDoc)
实现转换。
发送请求: 使用 client.index(request, RequestOptions.DEFAULT)
发送请求,将准备好的文档添加到索引库中。
这个测试方法演示了如何通过 Java RestClient 向 Elasticsearch 索引库中新增文档。
获取指定文档的测试方法的代码如下:
@Test
void testGetDocument() throws IOException {
// 1. 创建 Request 对象
GetRequest request = new GetRequest("hotel", "61083");
// 2. 发送请求,获取结果
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3. 解析结果
String json = response.getSourceAsString();
// 4. 将字符串解析为 HotelDoc 对象
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
System.out.println(hotelDoc);
}
对该方法的详细说明:
创建 GetRequest
对象: 使用 GetRequest
对象指定索引库名称为 “hotel”,文档 ID 为 “61083”。
发送请求,获取结果: 使用 client.get(request, RequestOptions.DEFAULT)
发送请求,获取包含文档信息的 GetResponse
对象。
解析结果: 通过 response.getSourceAsString()
获取文档内容的 JSON 字符串表示。
将字符串解析为 HotelDoc
对象: 使用 JSON.parseObject(json, HotelDoc.class)
将获取的 JSON 字符串解析为 HotelDoc
对象。
这个测试方法演示了如何通过 Java RestClient 获取 Elasticsearch 索引库中指定文档的信息。
更新文档的测试方法的代码如下:
@Test
void testUpdateDocument() throws IOException {
// 1. 获取 Request 对象
UpdateRequest request = new UpdateRequest("hotel", "61083");
// 2. 准备参数
request.doc(
"price", 1000,
"score", 50
);
// 3. 发起请求
client.update(request, RequestOptions.DEFAULT);
}
对该方法的详细说明:
获取 UpdateRequest
对象: 使用 UpdateRequest
对象指定索引库名称为 “hotel”,文档 ID 为 “61083”。
准备参数: 使用 request.doc(...)
方法准备需要更新的字段及其对应的新值。在这个例子中,更新了 “price” 字段为 1000,“score” 字段为 50。
发起请求: 使用 client.update(request, RequestOptions.DEFAULT)
发送更新请求。
这个测试方法演示了如何通过 Java RestClient 更新 Elasticsearch 索引库中的指定文档。
删除指定文档的单元测试方法的代码如下:
@Test
void testDeleteDocument() throws IOException {
// 1. 获取 Request 对象
DeleteRequest request = new DeleteRequest("hotel", "61083");
// 2. 发起请求
client.delete(request, RequestOptions.DEFAULT);
}
对该方法的详细说明:
获取 DeleteRequest
对象: 使用 DeleteRequest
对象指定索引库名称为 “hotel”,文档 ID 为 “61083”。
发起请求: 使用 client.delete(request, RequestOptions.DEFAULT)
发送删除请求。
这个测试方法演示了如何通过 Java RestClient 删除 Elasticsearch 索引库中的指定文档。
在实际开发中,我们不可能像上面那样一条数据一条数据的导入到文档中,而是需要批量的查询数据库,然后将结果集批量的导入到文档中,导入批量数据到文档的测试方法如下:
@Test
void testBulkRequest() throws IOException {
// 批量查询酒店数据
List<Hotel> hotels = hotelService.list();
// 1. 创建 BulkRequest
BulkRequest request = new BulkRequest();
// 转换为文档类型 HotelDoc
for (Hotel hotel : hotels) {
HotelDoc hotelDoc = new HotelDoc(hotel);
// 2. 准备参数,添加多个新增的 Request
request.add(new IndexRequest("hotel")
.id(hotel.getId().toString())
.source(JSON.toJSONString(hotelDoc), XContentType.JSON));
}
// 3. 发起请求
client.bulk(request, RequestOptions.DEFAULT);
}
对上述代码的说明:
批量查询酒店数据: 使用 hotelService.list()
批量获取酒店数据。
创建 BulkRequest
对象: 使用 BulkRequest
对象准备批量请求。
循环添加请求: 遍历酒店数据列表,将每个酒店数据转换为 HotelDoc
类型,并添加到 BulkRequest
中。
发起请求: 使用 client.bulk(request, RequestOptions.DEFAULT)
发送批量请求。
这个测试方法演示了如何通过 Java RestClient 批量导入 Elasticsearch 索引库中的文档。批量导入通常能够提高效率,特别是在处理大量数据时。