• elasticsearch实现mysql数据同步


    当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。

    常见的数据同步方案有三种:

    • 同步调用

    • 异步通知

    • 监听binlog

    以下使用异步通知同步elasticsearch的数据 

    引入依赖

    1. org.springframework.boot
    2. spring-boot-starter-amqp
    3. org.elasticsearch.client
    4. elasticsearch-rest-high-level-client
    5. com.alibaba
    6. fastjson
    7. 1.2.66

    配置elasticsearch

    1. import com.baomidou.mybatisplus.annotation.DbType;
    2. import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
    3. import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
    4. import org.apache.http.HttpHost;
    5. import org.elasticsearch.client.RestClient;
    6. import org.elasticsearch.client.RestHighLevelClient;
    7. import org.mybatis.spring.annotation.MapperScan;
    8. import org.springframework.boot.SpringApplication;
    9. import org.springframework.boot.autoconfigure.SpringBootApplication;
    10. import org.springframework.context.annotation.Bean;
    11. @MapperScan(basePackages = "cn.itcast.hotel.mapper")
    12. @SpringBootApplication
    13. public class HotelAdminApplication {
    14. public static void main(String[] args) {
    15. SpringApplication.run(HotelAdminApplication.class, args);
    16. }
    17. // 最新版
    18. @Bean
    19. public MybatisPlusInterceptor mybatisPlusInterceptor() {
    20. MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    21. interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
    22. return interceptor;
    23. }
    24. @Bean
    25. public RestHighLevelClient client(){
    26. return new RestHighLevelClient(RestClient.builder(
    27. HttpHost.create("http://43.139.59.28:9200")
    28. ));
    29. }
    30. }

    配置交换机、队列

    声明交换机、队列名称 

    constatnts包下新建一个类MqConstants,存储交换机和队列的名称

    1. public class MqConstants {
    2. /**
    3. * 交换机
    4. */
    5. public final static String HOTEL_EXCHANGE = "hotel.topic";
    6. /**
    7. * 监听新增和修改的队列
    8. */
    9. public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    10. /**
    11. * 监听删除的队列
    12. */
    13. public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    14. /**
    15. * 新增或修改的RoutingKey
    16. */
    17. public final static String HOTEL_INSERT_KEY = "hotel.insert";
    18. /**
    19. * 删除的RoutingKey
    20. */
    21. public final static String HOTEL_DELETE_KEY = "hotel.delete";
    22. }

    声明交换机、队列

    1. import org.springframework.amqp.core.Binding;
    2. import org.springframework.amqp.core.BindingBuilder;
    3. import org.springframework.amqp.core.Queue;
    4. import org.springframework.amqp.core.TopicExchange;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. @Configuration
    8. public class MqConfig {
    9. @Bean
    10. public TopicExchange topicExchange(){
    11. return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    12. }
    13. @Bean
    14. public Queue insertQueue(){
    15. return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    16. }
    17. @Bean
    18. public Queue deleteQueue(){
    19. return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    20. }
    21. @Bean
    22. public Binding insertQueueBinding(){
    23. return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    24. }
    25. @Bean
    26. public Binding deleteQueueBinding(){
    27. return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    28. }
    29. }

    发送MQ消息  

    在增、删、改业务中分别发送MQ消息

    1. @Resource
    2. private RabbitTemplate rabbitTemplate;
    3. @PostMapping
    4. public void saveHotel(@RequestBody Hotel hotel){
    5. hotelService.save(hotel);
    6. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    7. }
    8. @PutMapping()
    9. public void updateById(@RequestBody Hotel hotel){
    10. if (hotel.getId() == null) {
    11. throw new InvalidParameterException("id不能为空");
    12. }
    13. hotelService.updateById(hotel);
    14. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    15. }
    16. @DeleteMapping("/{id}")
    17. public void deleteById(@PathVariable("id") Long id) {
    18. hotelService.removeById(id);
    19. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    20. }

    接收MQ消息

    接收到MQ消息要做的事情包括:

    • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库

    • 删除消息:根据传递的hotel的id删除索引库中的一条数据

    文档类

    获取es传来的值

    1. import lombok.Data;
    2. import lombok.NoArgsConstructor;
    3. @Data
    4. @NoArgsConstructor
    5. public class HotelDoc {
    6. private Long id;
    7. private String name;
    8. private String address;
    9. private Integer price;
    10. private Integer score;
    11. private String brand;
    12. private String city;
    13. private String starName;
    14. private String business;
    15. private String location;
    16. private String pic;
    17. // 排序时的 距离值
    18. private Object distance;
    19. public HotelDoc(Hotel hotel) {
    20. this.id = hotel.getId();
    21. this.name = hotel.getName();
    22. this.address = hotel.getAddress();
    23. this.price = hotel.getPrice();
    24. this.score = hotel.getScore();
    25. this.brand = hotel.getBrand();
    26. this.city = hotel.getCity();
    27. this.starName = hotel.getStarName();
    28. this.business = hotel.getBusiness();
    29. this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
    30. this.pic = hotel.getPic();
    31. }
    32. }
    model类 
    1. import com.baomidou.mybatisplus.annotation.IdType;
    2. import com.baomidou.mybatisplus.annotation.TableId;
    3. import com.baomidou.mybatisplus.annotation.TableName;
    4. import lombok.Data;
    5. @Data
    6. @TableName("tb_hotel")
    7. public class Hotel {
    8. @TableId(type = IdType.INPUT)
    9. private Long id;
    10. private String name;
    11. private String address;
    12. private Integer price;
    13. private Integer score;
    14. private String brand;
    15. private String city;
    16. private String starName;
    17. private String business;
    18. private String longitude;
    19. private String latitude;
    20. private String pic;
    21. }

     service类
    1. import cn.itcast.hotel.mapper.HotelMapper;
    2. import cn.itcast.hotel.pojo.Hotel;
    3. import cn.itcast.hotel.pojo.HotelDoc;
    4. import cn.itcast.hotel.service.IHotelService;
    5. import com.alibaba.fastjson.JSON;
    6. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
    7. import org.elasticsearch.action.delete.DeleteRequest;
    8. import org.elasticsearch.action.index.IndexRequest;
    9. import org.elasticsearch.client.RequestOptions;
    10. import org.elasticsearch.client.RestHighLevelClient;
    11. import org.elasticsearch.common.xcontent.XContentType;
    12. import org.springframework.stereotype.Service;
    13. import javax.annotation.Resource;
    14. import java.io.IOException;
    15. @Service
    16. public class HotelService extends ServiceImpl implements IHotelService {
    17. @Resource
    18. private RestHighLevelClient client;
    19. @Override
    20. public void deleteById(Long id) {
    21. try {
    22. // 1.准备Request
    23. DeleteRequest request = new DeleteRequest("hotel", id.toString());
    24. // 2.发送请求
    25. client.delete(request, RequestOptions.DEFAULT);
    26. } catch (IOException e) {
    27. throw new RuntimeException(e);
    28. }
    29. }
    30. @Override
    31. public void insertById(Long id) {
    32. try {
    33. // 0.根据id查询酒店数据
    34. Hotel hotel = getById(id);
    35. // 转换为文档类型
    36. HotelDoc hotelDoc = new HotelDoc(hotel);
    37. // 1.准备Request对象
    38. IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
    39. // 2.准备Json文档
    40. request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
    41. // 3.发送请求
    42. client.index(request, RequestOptions.DEFAULT);
    43. } catch (IOException e) {
    44. throw new RuntimeException(e);
    45. }
    46. }
    47. }
    监听器
    1. import cn.itcast.hotel.constatnts.MqConstants;
    2. import cn.itcast.hotel.service.IHotelService;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Component;
    6. @Component
    7. public class HotelListener {
    8. @Autowired
    9. private IHotelService hotelService;
    10. /**
    11. * 监听酒店新增或修改的业务
    12. * @param id 酒店id
    13. */
    14. @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    15. public void listenHotelInsertOrUpdate(Long id){
    16. hotelService.insertById(id);
    17. }
    18. /**
    19. * 监听酒店删除的业务
    20. * @param id 酒店id
    21. */
    22. @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    23. public void listenHotelDelete(Long id){
    24. hotelService.deleteById(id);
    25. }
    26. }

  • 相关阅读:
    工件SSMwar exploded 部署工件时出错。请参阅服务器日志了解详细信息
    公共经济学(开卷)期末复习题
    Robot Arm 机械臂源码解析
    Kafka使用Java管理主题和分区
    【Verilog基础】【计算机体系结构】深入理解Cache一致性(针对多核SoC系统)
    基于Vue+SpringBoot的超市账单管理系统 开源项目
    2579. 统计染色格子数(javascript)
    在 MongoDB 的 CRUD 操作中使用日期
    中高级Java程序员,你不得不掌握的基本功,挑战20k+
    软件架构模式+系统架构
  • 原文地址:https://blog.csdn.net/qq_63431773/article/details/132694158