当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
常见的数据同步方案有三种:
同步调用
异步通知
监听binlog
以下使用异步通知同步elasticsearch的数据
-
org.springframework.boot -
spring-boot-starter-amqp -
org.elasticsearch.client -
elasticsearch-rest-high-level-client -
-
com.alibaba -
fastjson -
1.2.66
- import com.baomidou.mybatisplus.annotation.DbType;
- import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
- import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
-
- @MapperScan(basePackages = "cn.itcast.hotel.mapper")
- @SpringBootApplication
- public class HotelAdminApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(HotelAdminApplication.class, args);
- }
-
- // 最新版
- @Bean
- public MybatisPlusInterceptor mybatisPlusInterceptor() {
- MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
- interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
- return interceptor;
- }
-
- @Bean
- public RestHighLevelClient client(){
- return new RestHighLevelClient(RestClient.builder(
- HttpHost.create("http://43.139.59.28:9200")
- ));
- }
-
- }
constatnts包下新建一个类MqConstants,存储交换机和队列的名称
- public class MqConstants {
- /**
- * 交换机
- */
- public final static String HOTEL_EXCHANGE = "hotel.topic";
- /**
- * 监听新增和修改的队列
- */
- public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
- /**
- * 监听删除的队列
- */
- public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
- /**
- * 新增或修改的RoutingKey
- */
- public final static String HOTEL_INSERT_KEY = "hotel.insert";
- /**
- * 删除的RoutingKey
- */
- public final static String HOTEL_DELETE_KEY = "hotel.delete";
- }
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MqConfig {
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
- }
-
- @Bean
- public Queue insertQueue(){
- return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
- }
-
- @Bean
- public Queue deleteQueue(){
- return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
- }
-
- @Bean
- public Binding insertQueueBinding(){
- return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
- }
-
- @Bean
- public Binding deleteQueueBinding(){
- return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
- }
- }
在增、删、改业务中分别发送MQ消息
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @PostMapping
- public void saveHotel(@RequestBody Hotel hotel){
- hotelService.save(hotel);
- rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
- }
-
- @PutMapping()
- public void updateById(@RequestBody Hotel hotel){
- if (hotel.getId() == null) {
- throw new InvalidParameterException("id不能为空");
- }
- hotelService.updateById(hotel);
- rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
- }
-
- @DeleteMapping("/{id}")
- public void deleteById(@PathVariable("id") Long id) {
- hotelService.removeById(id);
- rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
- }
接收到MQ消息要做的事情包括:
新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
删除消息:根据传递的hotel的id删除索引库中的一条数据
获取es传来的值
-
-
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @NoArgsConstructor
- public class HotelDoc {
- private Long id;
- private String name;
- private String address;
- private Integer price;
- private Integer score;
- private String brand;
- private String city;
- private String starName;
- private String business;
- private String location;
- private String pic;
- // 排序时的 距离值
- private Object distance;
- public HotelDoc(Hotel hotel) {
- this.id = hotel.getId();
- this.name = hotel.getName();
- this.address = hotel.getAddress();
- this.price = hotel.getPrice();
- this.score = hotel.getScore();
- this.brand = hotel.getBrand();
- this.city = hotel.getCity();
- this.starName = hotel.getStarName();
- this.business = hotel.getBusiness();
- this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
- this.pic = hotel.getPic();
- }
- }
- import com.baomidou.mybatisplus.annotation.IdType;
- import com.baomidou.mybatisplus.annotation.TableId;
- import com.baomidou.mybatisplus.annotation.TableName;
- import lombok.Data;
-
- @Data
- @TableName("tb_hotel")
- public class Hotel {
- @TableId(type = IdType.INPUT)
- private Long id;
- private String name;
- private String address;
- private Integer price;
- private Integer score;
- private String brand;
- private String city;
- private String starName;
- private String business;
- private String longitude;
- private String latitude;
- private String pic;
- }
- import cn.itcast.hotel.mapper.HotelMapper;
- import cn.itcast.hotel.pojo.Hotel;
- import cn.itcast.hotel.pojo.HotelDoc;
- import cn.itcast.hotel.service.IHotelService;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
- import java.io.IOException;
-
- @Service
- public class HotelService extends ServiceImpl
implements IHotelService { -
- @Resource
- private RestHighLevelClient client;
-
-
- @Override
- public void deleteById(Long id) {
- try {
- // 1.准备Request
- DeleteRequest request = new DeleteRequest("hotel", id.toString());
- // 2.发送请求
- client.delete(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void insertById(Long id) {
- try {
- // 0.根据id查询酒店数据
- Hotel hotel = getById(id);
- // 转换为文档类型
- HotelDoc hotelDoc = new HotelDoc(hotel);
-
- // 1.准备Request对象
- IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
- // 2.准备Json文档
- request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
- // 3.发送请求
- client.index(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- import cn.itcast.hotel.constatnts.MqConstants;
- import cn.itcast.hotel.service.IHotelService;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Component
- public class HotelListener {
-
- @Autowired
- private IHotelService hotelService;
-
- /**
- * 监听酒店新增或修改的业务
- * @param id 酒店id
- */
- @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
- public void listenHotelInsertOrUpdate(Long id){
- hotelService.insertById(id);
- }
-
- /**
- * 监听酒店删除的业务
- * @param id 酒店id
- */
- @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
- public void listenHotelDelete(Long id){
- hotelService.deleteById(id);
- }
- }