【黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)】
RocketMQ 也是满足AMQP 协议的
但是

它默认支持的是RabbitMQ,即RocketMQ 它不是一个默认支持的东西
查一下坐标 https://mvnrepository.com/search?q=rocketmq


点进去

复制出来,贴到pom 文件中
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.2version>
dependency>

OK。
先把上次做的RabbitMQ 的bean 全部注掉,不然起两套会冲突

这些都注掉
【配置】
rocketmq:
name-server: localhost:9876

消息处理业务实现类
package com.dingjiaxiong.service.impl.rocketmq;
import com.dingjiaxiong.service.MessageService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* ClassName: MessageServiceRocketmqImpl
* date: 2022/10/23 12:49
*
* @author DingJiaxiong
*/
@Service
public class MessageServiceRocketmqImpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
//发送消息
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id " + id);
rocketMQTemplate.convertAndSend("order_id",id);
}
@Override
public String doMessage() {
return null;
}
}
现在直接启动服务器,看看是否可以发送成功

可以看到,启动报错了【老师也报了】
修改配置
rocketmq:
name-server: localhost:9876
producer:
group: group_rocketmq

再启动

OK
直接生产一个消息进去

OK, 没啥问题
【主动点击消费就不写了,直接监听】
package com.dingjiaxiong.service.impl.rocketmq.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* ClassName: MessageListener
* date: 2022/10/23 12:56
*
* @author DingJiaxiong
*/
@Component
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String id) {
System.out.println("已完成短信发送业务(rocketmq),id:" + id);
}
}
重启服务

可以看到上来就把之前发那个没消费的消费了
再生产几个

这就是完成了基本的生产消费队列操作
问题来了, 现在我们发的这种消息都属于同步消息【对于程序,还是得做成异步】
修改实现类
package com.dingjiaxiong.service.impl.rocketmq;
import com.dingjiaxiong.service.MessageService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* ClassName: MessageServiceRocketmqImpl
* date: 2022/10/23 12:49
*
* @author DingJiaxiong
*/
@Service
public class MessageServiceRocketmqImpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
//发送消息
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id " + id);
// rocketMQTemplate.convertAndSend("order_id",id);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//成功的时候做什么
System.out.println("消息发送成功!");
}
@Override
public void onException(Throwable throwable) {
//出现异常的时候做什么
System.out.println("消息发送失败");
}
};
rocketMQTemplate.asyncSend("order_id",id,callback);
}
@Override
public String doMessage() {
return null;
}
}
这就是发送异步消息,重启服务器看看
生产一个消息

没啥问题【简单的整合就是这样,后面还得自己深入研究】
回顾一下




