• SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)


    SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

    在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
    使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

    • 启动MySQL环境,并开启binlog
    • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
    • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
    • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

    预先在model实体中准备

    短信实体

    @Data
    @ApiModel(description = "短信实体")
    public class MsmVo{
    	
    	@ApiModelProperty(value="phone")
    	private String phone;
    
    	@ApiModelProperty(value = "短信模板code")
    	private  String templateCode;
    
    	@ApiModelProperty(value="短信模板参数")
    	private Map<String,Object> param;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    排班实体

    @Data
    @ApiModel(description = "OrderMqVo")
    public class OrderMqVo{
    	
    	@ApiModelProperty(value="可预约数")
    	private Integer reserverdNumber
    
    	@ApiModelProperty(value = "剩余预约数")
    	private Integer availableNumber;
    
    	@ApiModelProperty(value = "排班id")
    	private String scheduleId;
    
    	@ApiModelProperty(value = "短信实体")
    	private MsmVo msmVo;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    一、安装RabbitMQ

    docker pull rabbitmq:nanagemnet
    docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement
    
    • 1
    • 2

    访问:http://IP:15672
    在这里插入图片描述

    二、rabbit-util模块封装

    <dependency>
    	<groupId>org.springframework.bootgroupId>
    	<artifactId>spring-boot-starter-actuatorartifactId>
    dependency>
    <dependency>
    	<groupId>org.springframework.cloudgroupId>
    	<artifactId>spring-cloud-starter-bus-amqpartifactId>
    dependency>
    <dependency>
    	<groupId>com.alibabagroupId>
    	<artifactId>fastjsonartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    创建一个RabbitService用来发送消息

    @Service
    public class RabbitService{
    	
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    
    	//发送消息
    	public boolean sendMessage(String exchange,String routingKey,Object message){
    		rabbitTemplate.convertAndSend(exchange,routingKey,message);
    		return true;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    创建mq消息转化器

    @Configuration
    public class MQConfig{
    	
    	@Bean
    	public MessageConverter messageConverter(){
    		return new Jackson2JsonMessageConverter();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    添加常量配置类

    public class MqConst{
    	
    	//预约下单
    	public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
    	public static final String ROUTING_ORDER = "order";
    	//队列
    	public static final String QUEUE_ORDER = "queue.order";
    
    	//短信
    	public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
    	public static final String ROUTING_MSM_ITEM = "msm.item";
    	pulib static final String Queue_MSM_item = "queue.msm.item";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    三、短信模块service-sms

    将二中的模块依赖引入

    <dependency>
    	<groupId>com.michaelgroupId>
    	<artifactId>rabbit_utilartifactId>
    	<version>xxxversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置文件application.properties

    spring.rabbitmq.host=192.168.44.168
    spring.rabbitmq.port=5672
    spring.rabbit.uername=guest
    spring.rabbitmq.password=guest
    
    • 1
    • 2
    • 3
    • 4

    Service发送消息

    public interface MsmService{
    	//发送手机验证码
    	boolean send(String phone,String code);
    	
    	//MQ使用发送短信的接口
    	boolean send(MsmVo msmVo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    @Service
    public class MsmServiceImpl implements MsmService{
    
    	@Override
    	public boolean send(String phone,String code){
    		//判断手机号是否为空
    		if(StringUtils.isEmpty(phone)){
    			return false;
    		}
    
    		//整合阿里云相关参数,短信服务
    		DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
    			ConstantPropertiesUtils.ACCESS_KEY_ID,
    			ConstantPropertiesUtils.SECRET
    		);
    		IAcsClient client = new DefaultAcsClient(profile);
    		CommonRequest request = new CommonRequest();
    
    		request.setMethod(MethodType.POST);
    		request.setDomain("dysmsapi.aliyuncs.com");
    		request.setVersion("2018-08-08");
    		request.setAction("SendSms");
    
    		//手机号
    		request.putQueryParameter("PhoneNumbers",phone);
    		//签名名称
    		request.putQueryParameter("SignName","我的网站");
    		//模板
    		request.putQueryParameter("TemplateCode","SMS_180051135");
    		//验证码使用json格式{"code":"123456"}
    		Map<String,Object> param = new HashMap();
    		param.put("code",code);
    		request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
    		
    		//调用方法进行短信发送
    		try{
    			CommonResponse response = client.getCommonResponse(request);
    			System.out.println(response.getData());
    			return response.getHttpResponse().isSuccess();
    		}catch(ServerException e){
    			e.printStackTrace();
    		}catch(ClientException e){
    			e.printStackTrace();
    		}
    		return false;
    	}
    	
    	@Override
    	public boolean send(MsmVo msmVo){
    		
    		if(!StringUtils.isEmpty(msmVO.getPhone())){
    			String code = (String)msmVo.getParam().get("code");
    			boolean isSend = this.send(msmVo.getPhone(),code);
    			return isSend;
    		}
    		return false;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    创建mq监控器

    @Component
    public class MsmReceiver{
    	
    	@Autowired
    	private MsmService msmService;
    
    	//监听
    	@RabbitListener(bindings = @QueueBinding(
    		value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
    		exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
    		key = {MqConst.ROUTING_MSM_ITEM}
    	))
    	public void send(MsmVo msmVo,Message message,Channel channel){
    		msmService.ssend(msmVo);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    四、业务类

    生成订单之后,发送短信并更新数量

    ①、业务模块中引入依赖

    rabbit-util

    ②、添加配置

    spring.rabbitmq.host=192.168.44.165
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    • 1
    • 2
    • 3
    • 4

    ③、service接口以及实现类

    @Override
    public void update(Schedule schedule){
    	schedule.setUpdata(new Date());
    	scheduleRepository.save(schedule);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ④、receiver包中创建MQ监听器

    @Component
    public class HospitalReceiver{
    	
    	@Autowired
    	private ScheduleService scheduleService;
    
    	@Autowired
    	private RabbitService rabbitService;
    
    	//监听
    	@RabbitListener(
    		bindings = @QueueBinding(
    			value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
    			exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
    			key = {MqConst.ROUTING_ORDER}
    		)
    	)
    	public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
    		//下单成功,更新数据
    		Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
    		schedule.setReservedNumber(orderMqVo.getReservedNumber());
    		schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
    		scheduleService.update(schedule);
    
    		//发送短信
    		MsmVo msmVo = orderMqVo.getMsmVo();
    		if(null != msmVo){
    			rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    Go语言学习教程(十五)
    Nanoframework 操作单片机蓝牙配置WIFI的案例
    java中转义字符的源码数据格式,内存存储数据格式和转换json后的数据格式
    vulnhub靶机hacksudoLPE中Challenge-1
    [附源码]java毕业设计企业物资信息管理系统
    编译基于wanyland的 EFL
    基于神经网络彩色图像插值研究-附Matlab程序
    OSPF——DR和BDR讲解
    警钟:SBP持有的MogaFX外汇储备暴跌9.56亿美元,达到79.6亿美元
    Pandas
  • 原文地址:https://blog.csdn.net/usa_washington/article/details/133617303