或者在docker上运行
# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \
-v /etc/localtime:/etc/localtime:ro \
-v rabbitmq-5672-data:/var/lib/rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.10-management
# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \
-v /etc/localtime:/etc/localtime:ro \
-v ~/docker/5672/data:/var/lib/rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.10-management
<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbit-testartifactId>
<scope>testscope>
dependency>
## 连接rabbitmq服务器
spring.rabbitmq.host=192.168.12.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=hl
## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
## 确认消息已发送到交换机( Exchange )
spring.rabbitmq.publisher-confirm-type=CORRELATED
# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
@Configuration
@EnableRabbit
@Slf4j
@Transactional
public class RabbitConfig {
@Resource
private MessageDao messageDao;
public static final String EMPLOYEE_LIST = "employee-list";
public static final String DEPARTMENT_DELETE = "department-delete";
@Bean
public Queue DepartmentDelete(){
return new Queue(DEPARTMENT_DELETE);
}
@Bean
public Queue employeeList(){
return new Queue(EMPLOYEE_LIST);
}
@Bean("rabbitTemplate")
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//每次发送队列信息将触发此方法,需要添加配置属性
System.out.println(correlationData.getId());
Message message = messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));
if (ack){
message.setStatus("B");
}
message.setRetryCount(message.getRetryCount()-1);
log.info("剩余消息数:"+message.getRetryCount());
messageDao.save(message);
}
});
// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// @Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息:{}", message);
// log.info("ReturnCallback 回应码:{}", replyCode);
// log.info("ReturnCallback 回应信息:{}", replyText);
// log.info("ReturnCallback 交换机:{}", exchange);
// log.info("ReturnCallback 路由键:{}", routingKey);
// }
// });
return rabbitTemplate;
}
}
drop table if exists message;
create table message
(
id bigint auto_increment,
exchange varchar(64) ,
routing_key varchar(64) not null,
content varchar(128) not null,
retry_count int not null,
status varchar(32) not null,
primary key (id)
);
创建对应的DAO类和实体类
public void deleteById(Long id) {
departmentDao.deleteById(id);
Message message = new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id+"", 5, "A");
messageDao.save(message);
}
@Component
@Slf4j
@Transactional
@EnableScheduling
public class RabbitTimer {
@Resource
private MessageMysqlDao messageDao;
@Resource
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 6000)
private void process(){
//获取状态不等于C和次数大于0的信息
QueryWrapper<Message> wrapper = new QueryWrapper<>();
wrapper.ne("status", "C");
wrapper.gt("retry_count", 0);
List<Message> messageList = messageDao.selectList(wrapper);
if (messageList.size()==0){
log.info("暂无消息发送,请等待...");
}else {
//进行信息发送
for (Message message : messageList) {
String content = message.getId()+":"+message.getContent();
CorrelationData correlationData = new CorrelationData(message.getId()+"");
if (message.getExchange()==null) {
rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);
}
else{
rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);
}
log.info("消息 {} 已发送",content);
}
}
}
}
@RestController
@RequestMapping("/message")
@RequiredArgsConstructor
public class MessageController implements IAMessageController{
@Resource
private MessageMysqlDao messageMysqlDao;
@PostMapping("/update/{id}")
public String messageUpdate(@PathVariable("id")Long id){
QueryWrapper<Message> wrapper = new QueryWrapper<>();
wrapper.eq("id", id);
Message message = new Message();
message.setStatus("C");
messageMysqlDao.update(message,wrapper);
return "success";
}
}
drop table if exists recived_message;
create table recived_message
(
id bigint auto_increment,
recived_at datetime
);
消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息
@Configuration
@RequiredArgsConstructor
@Transactional
public class HarvestResultLister {
private final HarvestPlanMysqlDao harvestPlanMysqlDao;
private final ReceivedMessageMysqlDao receivedMessageMysqlDao;
private final HarvestCheckClient harvestCheckClient;
@RabbitListener(queues = RabbitConfig.HARVEST_CHECK)
public void harvestUpdateByCheck(String msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
System.out.println(msg);
String[] split = msg.split(":");
if (split.length != 3) {
throw new RabbitDataError("发送的数据异常");
}
String mesId = split[0];
//获取发送内容id
String contentId = split[1];
//获取被修改的采收id
String harvestId = split[2];
ReceivedMessage receivedMessage = receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));
if (receivedMessage != null){
throw new RabbitDataError("发送重复数据");
}
//存入数据
receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));
QueryWrapper<HarvestPlan> wrapper = new QueryWrapper<>();
wrapper.eq("id", Long.parseLong(harvestId));
HarvestPlan harvestPlan = new HarvestPlan();
harvestPlan.setPurchaseStatusId(3L);
String result = harvestCheckClient.messageUpdate(Long.parseLong(mesId));
if (!"success".equals(result)){
throw new RabbitDataError("确认消息未正常传回");
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
try {
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}