• SpringBoot-RabbitMQ


    RabbitMQ 是一个开源的消息中间件,它实现了 AMQP(Advanced Message Queuing Protocol)协议,并提供了可靠的消息传递机制。
    Spring Boot 中使用 RabbitMQ 实现异步消息的发送和接收。
    使用 Spring Boot 提供的 AmqpTemplate 和 @RabbitListener 注解进行消息的发送和接收。

    可以应用于各种场景,如日志处理、实时数据传输、系统解耦等
    在这里插入图片描述
    比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定

    • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”
    • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
    • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

    步骤一:安装 RabbitMQ

    首先,你需要在本地环境中安装 RabbitMQ。RabbitMQ 的官方网站(https://www.rabbitmq.com/)下载适用于你的操作系统的安装包,并按照官方文档进行安装和配置。
    在这里插入图片描述
    Routing Key

    生产者将消息发给交换器的时候,一般会指定一个 RoutingKey,用来指定这个消息的路由规则
    通过指定 RoutingKey 来决定消息流向哪里
    在这里插入图片描述

    BindingKey

    通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

    多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理
    在这里插入图片描述
    生产者将消息发送到Exchange,由交换器将消息路由到一个或者多个队列中
    在这里插入图片描述
    交换机会涉及如下四种类型(不同的类型有着不同的路由策略):

    环境

    <dependency>
    	<groupId>org.springframework.bootgroupId>
    	<artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    rabbitmq:
    	host: 127.0.0.1
    	port: 5672
    	username: guest
    	password: guest
    	vritual-host: /
    	listener:
    		simple:
    			concurrency: 10
    			max-concurrency: 10
    			prefetch: 1 #从队列每次取一个
    			auto-startup: true
    			default-requeue-rejected: true #失败后重试
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • Direct Exchange

    行为是"先匹配, 再投送". 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去.是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

    MQConfig配置类

    public static final String QUEUE = "queue";
    
    @Bean
    public Queue queue(){
    	return new Queue(QUEUE,true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    发送服务

    @Service
    @Slf4j
    public class MQSender{
    	
    	@Autowired
    	AmqpTemplate amqpTemplate;
    
    	public void send(Object message){
    		String msg = (String) message;
    		log.info("send msg"+message);
    		amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接收服务

    @Service
    @Slf4j
    public class MQReceiver{
    	
    	//监听的queue
    	@RabbitListener(queues = MQConfig.QUEUE)
    	public void receive(String msg){
    		log.info("receive msg"+msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    测试

    @Autowired
    private MQSender sender;
    sender.send("hello direct Exchange");
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    • Fanout Exchange

    转发消息到所有绑定队列,消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

    配置类

    public static final String FANOUT_EXCHANGE = "fanoutExchange";
    
    @Bean
    public FanoutExchange fanoutExchange(){
    	return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding fanoutBinding1(){
    	return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2(){
    	return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    发送

    public void sendFanout(Object message){
    	String msg = (String)message;
    	log.info("send fanout message:" + msg);
    	amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试

    @Autowired
    private MQSender sender;
    sender.sendFanout("hello fanout Exchange");
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    • 主题Topic

    按规则转发消息(最灵活) 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
    路由键必须是一串字符,用句号(.) 隔开
    路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词, 井号(#)就表示相当于一个或者多个单词

    配置类

    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    
    public static final String ROUTING_KEY1 = "topic.key1";
    public static final String ROUTING_KEY2 = "topic.#";
    
    @Bean
    public Queue topicQueue1(){
    	return new Queue(TOPIC_QUEUE1,true);
    }
    @Bean
    public Queue topicQueue2(){
    	return new Queue(TOPIC_QUEUE2,true);
    }
    @Bean
    public TopicExchange topicExchange(){
    	return new TopicExchange(TOPIC_EXCHANGE);
    }
    
    @Bean
    public Binding topicBinding1(){
    	return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
    }
    @Bean
    public Binding topicBinding2(){
    	return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    发送类

    public void sendTopic(Object message){
    	String msg = (String)message;
    	log.info("send topic message"+msg);
    
    	amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1");
    	amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接收类

    @RabbitListener(queues = MQConfig.TOPIC_QUEUE)
    public void receiveTopic1(String msg){
    	log.info("receive topic1 msg " + msg);
    }
    
    • 1
    • 2
    • 3
    • 4

    测试

    @Autowired
    private MQSender sender;
    sender.sendTopic("hello topic Exchange");
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    • Headers Exchange

    设置header attribute参数类型的交换机,相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列

    配置类

    public static final String HEADER_EXCHANGE = "headerExchange";
    
    @Bean
    public HeadersExchange headersExchange(){
    	return new HeadersExchange(HEADER_EXCHANGE);
    }
    @Bean
    public Queue headerQueue(){
    	return new Queue(HEADER_QUEUE2,true);
    }
    
    //绑定需要指定header,如果不匹配,则不能使用
    @Bean
    public Binding headerBinding(){
    	Map<String,Object> map = new HashMap();
    	map.put("header1","value1");
    	map.put("header2","value2");
    	return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    发送

    public void sendHeader(Object message){
    
    	String msg = (String)message;
    	log.info("send fanout message:" + msg);
    
    	messageProperties properties = new MessageProperties();
    	properties.setHeader("header1","value1");
    	properties.setHeader("header2","value2");
    	Message obj = new Message(msg.getBytes(),properties);
    	amqplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    用MessageProperties来添加Header信息,然后与接收者的header比对。我都设置的是"header1",“value1”;“header2”,“value2”

    //监听header模式的queue
    @RabbitListener(queues = MQConfig.HEADER_QUEUE2)
    public void receiveHeader(byte[] message){//因为发送的是byte类型,接收的也是该类型
    	log.info("header queue message "+ new String(message));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试

    @Autowired
    private MQSender sender;
    sender.sendHeader("hello header exchange");
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    步骤二:创建 Spring Boot 项目

    使用 Spring Initializr(https://start.spring.io/)创建一个新的 Spring Boot 项目。选择适当的项目元数据(如项目名称、包名等)并选择适当的依赖项,包括 RabbitMQ 的依赖项。

    <dependency>
    	 <groupId>org.springframework.bootgroupId>
    	 <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    配置文件application.yml

    spring:
    	rabbitmq:
    		host: localhost
    		port: 5672
    		username: guest
    		password: guest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    **创建消息发送者:**生成消息,通过信道(Channel),把消息发送给交换机(Exchange)

    • 消息体(payload):一般是一个带有业务逻辑结构的数据,比如:一个 JSON 字符串;
    • 标签(Label):用来表述这条消息,比如:一个交换器的名称和一个路由键;
    @Component
    public class MessageSender{
    	
    	private final AmqpTemplate amqpTemplate;
    
    	@Autowired
    	public MessageSender(AmqpTemplate amqpTemplate){
    		this.amqpTemplate = amqpTemplate;
    	}
    
    	public void sendMessage(String message){
    		/**
    			将消息发送到名为 "myExchange" 的交换机,并使用 "myRoutingKey" 进行路由
    		*/
    		amqpTemplate.convertAndSend("myExchange", "myRoutingKey", message);
    		System.out.println("Message sent: " + message);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    **创建消息接收者:**监听RabbitMQ中的(Queue)队列中的消息,然后去消费

    @Component
    public class MessageReceiver{
    
    	/**
    		使用了名为 "myQueue" 的队列来接收消息
    		当有消息到达队列时,receiveMessage 方法将被自动调用,并将消息作为参数传入。
    	*/	
    	@RabbitListener(queues="myQueue") //将该方法标记为消息监听器
    	public void receiveMessage(String message){
    		System.out.println("Message received: " + message);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    发送和接收消息:消息(Message)会一直留在队列里,直到被消费者(Consumer)消费

    @Autowired
    private MessageSender messageSender;
    
    messageSender.sendMessage("Hello, RabbitMQ!");
    
    • 1
    • 2
    • 3
    • 4

    并观察控制台输出来确认消息是否被成功接收:

    单机抢票系统

    一、依赖和配置文件

    spring-boot-starter
    spring-boot-starter-test
    spring-boot-starter-web
    mysql-connector-java
    spring-boot-starter-data-jpa
    lombok
    spring-boot-starter-amqp
    fastjson 阿里的JSON工具

    server.port=10000
    
    spring.datasource.url=jdbc:mysql://xxx/xxx?characterEncoding=utf-8
    spring.datasource.username=xxx
    spring.datasource.password=xxx
    spring.datasource.drive-class-name=com.mysql.jdbc.Driver
    
    spring.jpa.properties.hibernate.hbm2ddl.quto=update
    spring.jpa.show-sql=true
    
    spring.rabbit.host=localhost
    spring.rabbit.username=root
    spring.rabbit.password=root
    spring.rabbit.port=5672
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    二、数据表

    create table if not result(
    	id int auto_increment primary key,
    	ticket_id int null,
    	user_id int null
    );
    create table if not exists ticket{
    	id int auto_increment primary key,
    	name varchar(255)null,
    	content varchar(255)null,
    	user_name varchar(20)null,
    	count int default '6666' not null
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    根据数据表可以Generate出JavaBean
    在这里插入图片描述
    启动类

    @SpringBootApplication
    @EntityScan("com.fantj.springbootjpa.pojo")
    @EnableRabbit //开启对rabbit注解的支持
    public class AMQP{
    	public static void main(String[] args){
    		SpringApplication.run(AMQP.class,args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    三、Controller

    实现查询和抢票功能

    @RestController
    @RequestMapping("/ticket")
    public class TicketController{
    
    	@Autowired
    	private TicketService ticketService;
    
    	@Autowired
    	private MQSender mqSender;
    
    	@RequestMapping("/get/{id}")
    	public Ticket getByid(@PathVariable Integer id){
    		return ticketService.findById(id);
    	}
    
    	@RequestMapping("/reduce/{id}/{userId}")
    	public String reduceCount(@PathVariable Integer id,@PathVariable Integer userId){
    		
    		Message message = new Message(id,userId);
    		ticketService.reduceCount(id);
    		mqSneder.sendMessage(new Message(message.getTicketId(),message.getUserId()));
    		return "抢票成功";
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    四、Service

    @Service
    public class ResultServiceImpl implements ResultService{
    	
    	@Autowired
    	private ResultRepository resultRepository;
    
    	@Override
    	public void add(Result result){
    		resultRepository.add(result.getTicketId(),result.getUserId());
    	}
    	
    	@Override
    	public Result findOneByUserId(Integer userId){
    		return resultRepository.findByUserId(userId);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    =====================================================

    邮件发送案例

    一、创建一个消息发送者类,用于将待发送的邮件放入任务队列:

    @Component
    public class EmailSender{
    	
    	@Autowired	
    	private final AmqpTemplate amqpTemplate;
    
    	 public void sendEmail(String email) {
    	 
    	 	//使用了名为 "emailQueue" 的队列来存储待发送的邮件
    	   	amqpTemplate.convertAndSend("emailQueue", email);
    	   	System.out.println("Email sent: " + email);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    二、创建消息接收者,用于从任务队列中取出待发送的邮件并进行发送操作:

    @Component
    public class EmailReceiver{
    	
    	/**
    		有邮件到达队列时,receiveEmail 方法将被自动调用,并将邮件作为参数传入
    	*/
    	@RabbitListener(queues="emailQueue") //将该方法标记为消息监听器
    	public void receiveEmail(String email){
    		System.out.println("Sending email to: " + email);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ===============================================

    SpringBoot整合Canal+RabbitMQ监听数据变更

    • 使用Canal来监听MySQL的binlog变化
    • RabbitMQ来处理保存变更记录的操作

    一、依赖&配置文件

    <amqp.version>2.3.4.RELEASEamqp.version>  
      
      
    <dependency>  
      <groupId>org.springframework.bootgroupId>  
      <artifactId>spring-boot-starter-amqpartifactId>  
      <version>${amqp.version}version>  
    dependency>  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    spring:  
      rabbitmq:  
        #    host: myserverhost  
        host: 192.168.0.108  
        port: 5672  
        username: admin  
        password: RabbitMQ密码  
        # 消息确认配置项  
        # 确认消息已发送到交换机(Exchange)  
        publisher-confirm-type: correlated  
        # 确认消息已发送到队列(Queue)  
        publisher-returns: true  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    二、RabbitMQ配置类

    @Configuration
    public class RabbitConfig{
    	
    	@Bean
    	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    		
    		RabbitTemplate template = new RabbitTemplate();
    		template.setConnectionFactory(connectionFactory);
    		
    		//解决RabbitListener循环报错的问题
    		template .setMessageConverter(new Jackson2JsonMessageConverter());
    
    		return template;
    	}
    
    	@Bean
    	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    		
    		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    		factory.setConnectionFactory(connectionFactory); 
    		//解决RabbitListener循环报错的问题 
            factory.setMessageConverter(new Jackson2JsonMessageConverter());  
            return factory;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Canal消息生产者

    @Configuration
    public class CanalProvider{
    
    	public static final String CanalQueue = "canal-queue";  
    	public static final String CanalExchange = "canal-exchange";  
    	public static final String CanalRouting = "canal-routing-key"; 
    
    	@Bean
    	public Queue canalQueue(){
    		return new Queue(RabbitConstant.CanalQueue, true);
    	}
    
    	@Bean
    	DirectExchange canalExchange() {  
            return new DirectExchange(RabbitConstant.CanalExchange, true, false);  
        }  
    
    	@Bean
    	Binding bindingCanal(){
    		return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Canal消息消费者

    @Component
    @RabbitListener(queue = RabbitConstant.CanalQueue)
    public class CanalConmsumer{
    	
    	private final SysBackupService sysBackupService; 
    
    	public CanalComsumer(SysBackupService sysBackupService) {  
            this.sysBackupService = sysBackupService;  
        } 
    
    	@RabbitHandler  
        public void process(Map<String, Object> msg) {  
            System.out.println("收到canal消息:" + msg);  
            boolean isDdl = (boolean) msg.get("isDdl");  
      
            // 不处理DDL事件  
            if (isDdl) {  
                return;  
            }  
      
            // TiCDC的id,应该具有唯一性,先保存再说  
            int tid = (int) msg.get("id");  
            // TiCDC生成该消息的时间戳,13位毫秒级  
            long ts = (long) msg.get("ts");  
            // 数据库  
            String database = (String) msg.get("database");  
            // 表  
            String table = (String) msg.get("table");  
            // 类型:INSERT/UPDATE/DELETE  
            String type = (String) msg.get("type");  
            // 每一列的数据值  
            List<?> data = (List<?>) msg.get("data");  
            // 仅当type为UPDATE时才有值,记录每一列的名字和UPDATE之前的数据值  
            List<?> old = (List<?>) msg.get("old");  
      
            // 跳过sys_backup,防止无限循环  
            if ("sys_backup".equalsIgnoreCase(table)) {  
                return;  
            }  
      
            // 只处理指定类型  
            if (!"INSERT".equalsIgnoreCase(type)  
                    && !"UPDATE".equalsIgnoreCase(type)  
                    && !"DELETE".equalsIgnoreCase(type)) {  
                return;  
            }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,我们就能从监听的RabbitMQ队列中得到该条消息

    100%投递成功并被消费

    在这里插入图片描述

  • 相关阅读:
    【数据结构初阶-线性表】顺序表和链表,多图详解
    Typora设置标题自动标号
    C风格数组和std::array有什么区别
    React如何命令式调用自定义的Antd-Modal组件
    SOME/IP 协议介绍(三)参数和数据结构的序列化
    【vue2 】如何创建自定义项目
    IDEA JRebel安装使用教程
    Vue 3.0前的 TypeScript 最佳入门实践
    OpenCL专题04:ViennaCL与Eigen双剑合璧
    uniapp tabBar app页面滚动闪屏的问题
  • 原文地址:https://blog.csdn.net/usa_washington/article/details/132884629