• Mysql和Elasticsearch的数据同步


    Elasticsearch的数据来自Mysql数据库中,所以当我们的MySQL发生改变时,Elasticsearch也要跟着改变,这时候我们的es的数据就要和mysql同步了


    同步实现思路

     常见的数据同步方案有三种:

    • 同步调用

    • 异步通知

    • 监听binlog

     方案一:
          

     

    • hotel-demo对外提供接口,用来修改elasticsearch中的数据

    • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

            也就是说MySQL修改完去修改es的数据 

    • 优点:实现简单,粗暴

    • 缺点:业务耦合度高

     方案二

     

     

    • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息

    • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

    • 优点:低耦合,实现难度一般

    • 缺点:依赖mq的可靠性

    • 这个实现方式也就是使用mq进行操纵,当我们修改MySQL的服务器修改完以后会将信息发送给MQ,然后修改ES的会进行监听,当监听到了以后就进行修改es的操作

    方式三:

     

     

    • 给mysql开启binlog功能

    • mysql完成增、删、改操作都会记录在binlog中

    • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容

    • 也就是监听mysql,如果MySQL的数据有变化那么就直接去改变es的数据

    • 优点:完全解除服务间耦合

    • 缺点:开启binlog增加数据库负担、实现复杂度高

     在这里使用的是第二种实现方案:使用MQ来写


    同步案例代码
     

    用来操控ES的代码(负责监听MQ队列)

    1. /**
    2. * 监听增加和修改的队列
    3. * 因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
    4. * 所以队列过来的id不管是新增还是修改es都可以判断如果有这个数据id那么就先删除再新增,如果没有这个数据就直接新增,所以新增和修改他俩用一个方法就行了
    5. *
    6. * @param id 队列中需要进行操作的id
    7. */
    8. @RabbitListener(bindings = @QueueBinding(
    9. value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),
    10. exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
    11. key = MqConstants.HOTEL_INSERT_KEY
    12. ))
    13. public void insertAndUpdate(Long id) {
    14. if (id == null) {
    15. return;
    16. }
    17. log.info("入参:{}", id);
    18. //监听到以后拿到id去数据库查询整个数据
    19. Hotel hotel = iHotelService.getById(id);
    20. //因为查的mysql数据和es的数据有些不一样所以需要做转换
    21. HotelDoc hotelDoc = new HotelDoc(hotel);
    22. //转换为json
    23. String hotelDocJson = JSON.toJSONString(hotelDoc);
    24. System.out.println("hotelDocJson = " + hotelDocJson);
    25. //发送到ES中,因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
    26. //创建请求语义对象 添加文档数据
    27. IndexRequest request = new IndexRequest("hotel");
    28. //这个新增就是PUT在es中
    29. request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
    30. //发送请求
    31. try {
    32. IndexResponse response = client.index(request, RequestOptions.DEFAULT);
    33. RestStatus status = response.status();
    34. log.info("响应结果为:{}", status);
    35. } catch (IOException e) {
    36. e.printStackTrace();
    37. }
    38. }
    39. /**
    40. * 监听删除队列
    41. *
    42. * @param id 队列中需要进行操作的id
    43. */
    44. @RabbitListener(bindings = @QueueBinding(
    45. value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
    46. exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
    47. key = MqConstants.HOTEL_DELETE_KEY
    48. ))
    49. public void deleteByMqId(Long id) {
    50. if (id == null) {
    51. return;
    52. }
    53. log.info("入参:{}", id);
    54. //先创建语义对象,直接就可以给里面写id的字段
    55. DeleteRequest request = new DeleteRequest("hotel", id.toString());
    56. //发送请求
    57. try {
    58. DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
    59. RestStatus status = response.status();
    60. log.info("响应结果为:{}", status);
    61. } catch (IOException e) {
    62. e.printStackTrace();
    63. }

     用来操作MySQL的代码:
     

    1. @RestController
    2. @RequestMapping("hotel")
    3. public class HotelController {
    4. //注入和RabbitMQ链接
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @Autowired
    8. private IHotelService hotelService;
    9. @GetMapping("/{id}")
    10. public Hotel queryById(@PathVariable("id") Long id) {
    11. return hotelService.getById(id);
    12. }
    13. @GetMapping("/list")
    14. public PageResult hotelList(
    15. @RequestParam(value = "page", defaultValue = "1") Integer page,
    16. @RequestParam(value = "size", defaultValue = "1") Integer size
    17. ) {
    18. Page<Hotel> result = hotelService.page(new Page<>(page, size));
    19. return new PageResult(result.getTotal(), result.getRecords());
    20. }
    21. @PostMapping
    22. public void saveHotel(@RequestBody Hotel hotel) {
    23. hotelService.save(hotel);
    24. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    25. }
    26. @PutMapping()
    27. public void updateById(@RequestBody Hotel hotel) {
    28. if (hotel.getId() == null) {
    29. throw new InvalidParameterException("id不能为空");
    30. }
    31. hotelService.updateById(hotel);
    32. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    33. }
    34. @DeleteMapping("/{id}")
    35. public void deleteById(@PathVariable("id") Long id) {
    36. hotelService.removeById(id);
    37. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
    38. }
    39. }

    当然也是可以不使用注解来写,直接在配置文件中写队列绑定的交换机
    RabbitMQ_小白要变大牛的博客-CSDN博客RabbitMqhttps://blog.csdn.net/hnhroot/article/details/125463948?spm=1001.2014.3001.5502

  • 相关阅读:
    LeetCode221109_122、27. 移除元素
    CSDN,毕业生有话说!在如此疯狂的年代如何寻找自己的方向?迈向社会的第一步
    CSS - 移动端布局(二)移动端适配
    linux-windows服务设置
    TrackBar控件
    Redis高并发分布式锁详解
    .NET现代应用的产品设计 - DDD实践
    关于#rust#的问题:用rust的格式化输出println,指定带中文的字符串的位宽,无效(相关搜索:中英文字符串)
    Vue3 之 Vuex - 状态管理
    SRM : A Style-based Recalibration Module for Convolutional Neural Networks论文笔记
  • 原文地址:https://blog.csdn.net/hnhroot/article/details/125531758