• 【开发篇】十八、SpringBoot整合ActiveMQ


    1、安装ActiveMQ

    docker安装

    docker pull webcenter/activemq
    
    • 1
    docker run -d --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
    # * 61616是 activemq 的服务端口(映射为61616)
    # * 8161是 管理后台端口(对外映射为8161)
    
    • 1
    • 2
    • 3

    访问控制台http://IP:8161

    常规安装(安装为系统服务)

    下载:

    https://activemq.apache.org/components/classic/download/
    
    • 1

    在这里插入图片描述

    解压缩即安装:

    在这里插入图片描述

    启动服务:双击bin/win64/activemq.bat

    在这里插入图片描述

    访问控制台:http://127.0.0.1:8161/,用户名&密码:admin

    2、整合

    导入ActiveMQ的起步依赖:

    <dependency>   
    	<groupId>org.springframework.bootgroupId>    
    	<artifactId>spring-boot-starter-activemqartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    加入相关配置:

    spring:  
      activemq:    
        broker-url: tcp://localhost:61616  
      jms:    
        pub-sub-domain: false    # JMS的两种模型,点对点、发布订阅模型,这里为true即发布订阅模型
        template:      
          default-destination: codetest #默认的消息队列的名称
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注入JmsMessagingTemplate操作对象,进行消息的发送与接收

    @Autowired    
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    • 1
    • 2

    业务逻辑代码就实现上篇的MessageService接口就行,上篇在拿一个队列模拟MQ,来进行消息的生产与消费,这里有了真正的队列来存储消息了。

    3、发送消息到队列

    convertAndsend方法,发送消息到消息队列,convert,即转换,转换成能接受的数据类型然后发送,因此这个方法的形参类型可为Object,调用方便。

    @Service
    @Slf4j
    public class MessageServiceActivemqImpl implements MessageService {    
    	
    	@Autowired    
    	private JmsMessagingTemplate jmsMessagingTemplate;   
    	 
    	public void sendMessage(String id) {    
    	   
    		log.info("使用Active将待发送短信的订单纳入处理队列,id:"+id);
    		       
    		jmsMessagingTemplate.convertAndSend(id);    
    		
    	}    
    	
    	public String doMessage() {        
    	
    		String id = jmsMessagingTemplate.receiveAndConvert(String.class);
    		
    		log.info("已完成短信发送业务,订单id:" + id);
    		
    		return id;  
    		  	
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    取消息消费则相反,先receive接收,再convert转换,调用receiveAndConvert,形参为要转的目标类型。调用下上篇的接口(其实controller里就只是调用了一下service层方法),可以看到消息写入成功:

    在这里插入图片描述

    需要指定队列存储消息,则convertAndSend方法和receiveAndConvert方法传参多一个队列名即可。

    jmsMessagingTemplate.convertAndSend("order.queue.id",id); 
    
    • 1
    String id = jmsMessagingTemplate.receiveAndConvert("order.queue.id",String.class);
    
    • 1

    4、使用消息监听器对消息队列监听

    实际开发时,自然不用每次去手动取消息,直接@JmsListener注解监听队列,有消息了就自动拿出来消费(执行逻辑代码)就行:

    @Component
    @Slf4j
    public class MessageListener {    
    
    	@JmsListener(destination = "order.queue.id")    
    	public void receive(String id){ 
    		//实际业务逻辑代码       
    		log.info("已完成短信发送业务,id:"+id);    
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    重启服务,发现上面发送的消息已被自动消费:

    在这里插入图片描述
    在这里插入图片描述

    5、流程性业务消息消费完转入下一个消息队列

    @SendTo注解将一个方法的返回值给自动加到某个队列里,搭配@JmsListener注解,就可实现消费消息的流转。 当然也可单独使用,就发送就好,像@CachePut

    @Component
    @Slf4j
    public class MessageListener {   
     
    	@JmsListener(destination = "order.queue.id")   
    	@SendTo("order.other.queue.id")    
    	public String receive(String id){   
    	     
    		log.info("已完成短信发送业务,id:"+id); 
    		
    		//把这条消息处理完以后,返回
    		return  " a handled new id :" + id;     
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里对比下发布订阅模型与交换机:消息被消费以后,队列中就没了,如果其他程序也要用,又该如何处理? ==> 发布订阅模型,加入交换机来实现。比如RabbitMQ的Fanout Exchange广播、Direct Exchange路由、Topic Exchange话题。这里是把消息广播或路由到队列,而@SendTo搭配@JmsListener是在代码中处理完消息后的流转。

    在这里插入图片描述

    这种流水线的关系,体现的是一种顺序和依赖的业务。

    6、发布订阅模型

    spring:  
      activemq:    
        broker-url: tcp://localhost:61616  
      jms:    
        pub-sub-domain: true    #发布订阅模型
        template:      
          default-destination: codetest #默认的消息队列的名称
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此时发消息就在Topics中了:

    在这里插入图片描述

  • 相关阅读:
    SpringBoot 3.0最低版本要求的JDK 17,这几个新特性不能不知道
    BeanFactory与FactoryBean
    python作业
    GaiaDB-X 获选北京国家金融科技认证中心「数据领域首批专项示范与先行单位」
    C++ Reference: Standard C++ Library reference: C Library: cwchar: wcsncmp
    ECMAScript6(ES6)基础语法
    生态兼容性进一步提升!白鲸开源 WhaleStudio 与火山引擎ByteHouse完成产品互认
    【数据库原理及应用】——安全性与完整性(学习笔记)
    Python JSON
    python脚本(渗透测试)
  • 原文地址:https://blog.csdn.net/llg___/article/details/133618107