• 【开发篇】二十、SpringBoot整合RocketMQ



    在这里插入图片描述

    1、整合

    首先导入起步依赖,RocketMQ的starter不是Spring维护的,这一点从starter的命名可以看出来(不是spring-boot-starter-xxx,而是xxx-spring-boot-starter,和MyBatisPlus、Druid一样),因此version值得自己加:

    <dependency>   
    	<groupId>org.apache.rocketmqgroupId>    
    	<artifactId>rocketmq-spring-boot-starterartifactId> 
    	<version>2.2.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    添加相关配置:

    rocketmq:  
      name-server: localhost:9876  
      producer:    
        group: group_rocketmq  # 设置一个自定义的生产者默认组名,省掉这个启动会报错
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在需要的地方注入RocketMQTemplate操作对象:

    @Autowired    
    private RocketMQTemplate rocketMQTemplate;
    
    • 1
    • 2

    2、消息的生产

    发送消息继续convertAndSend方法,接着上篇在Service层来演示:

    @Service
    @Slf4j
    public class MessageServiceRocketmqImpl implements MessageService {    
    
    	@Autowired    
    	private RocketMQTemplate rocketMQTemplate;    
    
    	@Override    
    	public void sendMessage(String id) {        
    		
    		rocketMQTemplate.convertAndSend("order_sm_id",id);      
    		
    		log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);    
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    convertAndSend方法依旧重载,可以直接传一个Object,也可以先传一个destination参数,即发到哪儿,再传要发的message

    3、消费

    这里不演示手动receive方法拿消息,直接用监听器自动拿来消费:实现RocketMQListener接口,泛型为Message类型,重写onMessage方法,加@RocketMQMessageListener注解,两个属性为主题名称和消费者组

    @Component
    @Slf4j
    @RocketMQMessageListener(topic="order_sm_id",consumerGroup = "group_rocketmq")
    public class RocketmqMessageListener implements RocketMQListener<String> {  
      
    	@Override    
    	public void onMessage(String id) {        
    		log.info("已完成短信发送业务,id:"+id);    
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4、发送异步消息

    @Service
    @Slf4j
    public class MessageServiceRocketmqImpl implements MessageService {   
    
    	@Autowired    
    	private RocketMQTemplate rocketMQTemplate;    
    
    	@Override   
    	public void sendMessage(String id) {        
    		
    		//回调逻辑
    		SendCallback callback = new SendCallback() {            
    			@Override            
    			public void onSuccess(SendResult sendResult) {                
    				//消息发送成功后你要做的业务
    				//...
    				log.info("消息发送成功");            
    			}     
    			       
    			@Override            
    			public void onException(Throwable throwable) {               
    				log.info("消息发送失败!!!!!!!!!!!");            
    			}        
    		};     
    				
    		//异步发送
    		rocketMQTemplate.asyncSend("order_sm_id",id,callback);  
    		log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);      
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    asyncSend异步发消息,有个参数是callback回调方法,类型是一个接口,创建这个对象的时候重写onSuccess和OnException方法,即消息发送成功以后的逻辑和消息发送失败以后的逻辑(异步的体现,不用等,来个回调)。

    5、补充:安装RocketMQ

    建议以Docker方式启动,下面备份下在Windows的安装(安装为一个系统服务):

    • 下载
    下载地址:https://rocketmq.apache.org/
    
    • 1
    • 安装:解压缩即可
    默认服务端口:9876
    
    • 1
    • 环境变量配置
    ROCKETMQ_HOME
    PATH
    NAMESRV_ADDR (建议): 127.0.0.1:9876
    
    • 1
    • 2
    • 3
    • 启动命名服务器:
    mqnamesrv
    
    • 1
    • 启动Broker
    mqbroker
    
    • 1
    • 服务器功能测试:生产数据
    tools org.apache.rocketmq.example.quickstart.Producer
    
    • 1
    • 服务器功能测试:消费数据
    tools org.apache.rocketmq.example.quickstart.Consumer
    
    • 1
  • 相关阅读:
    【Python】pyecharts 模块 ① ( ECharts 简介 | pyecharts 简介 | pyecharts 中文网站 | pyecharts 画廊网站 | pyecharts 画 )
    SSM流程
    Android Java 多线程常见问题
    C++知识篇--强制类型转换
    想在期货市场存活 需要关注什么数据?
    C专家编程 第8章 为什么程序员无法分清万圣节和圣诞节 8.2 根据位模式构筑图形
    Java精进-手写持久层框架
    CPU设计(单周期和流水线)
    Redis----布隆过滤器
    在排序数组中查找元素的第一个和最后一个位置
  • 原文地址:https://blog.csdn.net/llg___/article/details/133634668