玩过Elasticsearch(下面统称ES)的小伙伴都知道ES是一个十分强悍的搜索引擎,但是在之前学习的过程中一直都是通过手敲DSL语句把数据导入进去,这多少有点不优雅。那么到底能不能做到在我们数据库发生变更时,ES中的数据也随之改变,即做到两者数据同步呢?答案理所应当的可以!
常见的数据同步方案主要有以下三种:
而在这里选择的是通过MQ异步调用方案解决不同服务间ES和MySQL的数据同步问题,MQ则是采用了RabbitMQ。
我们的目标其实是很明确的,就是我们在进行增删改操作之后,能够即可在搜索端更新并得以体现。这里使用的是发布订阅模型中的Topic Exchange,当然也可以使用Direct Exchange,但是就不推荐使用Fanout Exchange了,当然自己喜欢最重要。
需要注意的是,这里除了数据同步的具体实现外,其余的都只是大致提及需要自己进行编写,比如增删改中的接口,搜索的接口等。
步骤大致可分为以下部分:
第一步和第二步这里就直接省略了,大家伙们随便准备一个简单的demo就行。这里除了基本的依赖之外,值得注意的只要有三个依赖,分别是数据库依赖、ES依赖和MQ依赖,如果需要用的druid的则需要自己进行修改。(其中ES依赖只有搜索服务需要)
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>org.elasticsearch.clientgroupId>
<artifactId>elasticsearch-rest-high-level-clientartifactId>
dependency>
配置文件主要是对MySQL和RabbitMQ进行配置。这一部分两个服务不出意外的话是一样的,除非是用了不同的数据库或者RabbitMQ。
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/heima?useSSL=false
username: root
password: 123456
rabbitmq:
host: 192.168.150.100 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: xbaozi # 用户名
password: 123456 # 密码
对交换机和队列以及Topic Exchange所需要的RoutingKey名称,提高整体代码的复用性。在两个服务中同时新建一个constant包,将该包下新建下面类:
public class MqConstants {
/**
* 交换机
*/
public final static String MY_EXCHANGE = "my.topic";
/**
* 监听新增和修改的队列
*/
public final static String MY_INSERT_QUEUE = "my.insert.queue";
/**
* 监听删除的队列
*/
public final static String MY_DELETE_QUEUE = "my.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String MY_INSERT_KEY = "my.insert";
/**
* 删除的RoutingKey
*/
public final static String MY_DELETE_KEY = "my.delete";
}
因为是两边服务都需要,且为了避免只在一个服务声明另一个服务先运行产生异常,因此这里使用的是配置类注入的形式进行声明,当然也可以使用注解的形式,只是相对来说对这里不太方便而已。在两个服务中新建config包,并在该包下新建配置类:
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.MY_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue() {
return new Queue(MqConstants.MY_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue() {
return new Queue(MqConstants.MY_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding() {
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.MY_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding() {
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.MY_DELETE_KEY);
}
}
发送消息相对简单,只需要在增删改服务中的增、删、改操作中调用convertAndSend方法即可。(记得注入RabbitTemplate)
@PostMapping
public void save(@RequestBody User user){
userService.save(user);
// 发送保存消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_INSERT_KEY, user.getId());
}
@PutMapping()
public void updateById(@RequestBody User user){
if (user.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
userService.updateById(user);
// 发送更新消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_INSERT_KEY, user.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
userService.removeById(id);
// 发送删除消息
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE, MqConstants.MY_DELETE_KEY, id);
}
对于接受消息需要准备好删除和更新的业务逻辑,其中新增和修改统称为了更新业务。除此之外,还需要准备好一个监听器,用于监听MQ发送过来的消息。
@Override
public void deleteById(Long id) {
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void updateById(Long id) {
try {
// 0.根据id查询酒店数据
User user = getById(id);
// 转换为文档类型
UserDoc userDoc = new UserDoc(user);
// 1.准备Request对象
IndexRequest request = new IndexRequest("user").id(user.getId().toString());
// 2.准备Json文档
request.source(JSON.toJSONString(userDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 监听用户新增或修改的业务
* @param id 用户id
*/
@RabbitListener(queues = MqConstants.MY_INSERT_QUEUE)
public void listenUserInsertOrUpdate(Long id){
userService.insertById(id);
}
/**
* 监听用户删除的业务
* @param id 用户id
*/
@RabbitListener(queues = MqConstants.MY_DELETE_QUEUE)
public void listenUserDelete(Long id){
userService.deleteById(id);
}
到这里的话就可以对结果进行演示了,因为在写这玩意的时候是已经写完了demo且已经是关掉了的,因此这里就不放演示截图,直接告诉大家伙的验收方式:
这里对可能失败的原因进行一个推测,如果真的失败了可以尝试一下以下方法: