大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
---|---|
作业侠系列最新文章😉 | Java实现聊天程序 |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战 |
环境搭建大集合 | 环境搭建大集合(持续更新) |
在本栏中,我们之前已经完成了:
【SpringBoot实战系列】之发送短信验证码
【SpringBoot实战系列】之从Async组件应用实战到ThreadPoolTaskExecutor⾃定义线程池
【SpringBoot实战系列】之图形验证码开发并池化Redis6存储
【SpringBoot实战系列】阿里云OSS接入上传图片实战
【SpringBoot实战系列】Sharding-Jdbc实现分库分表到分布式ID生成器Snowflake自定义wrokId实战
本片速览:
1.RabbitMQ交换机类型
2.RabbitMQ配置开发实战
3.对应controller,service开发
4.业务逻辑消费者编写
5.部署rabbitMQ,并在application中配置rabbitMQ
6.异常监控队列配置
7.异常队列消费者代码如下,实现邮箱发送,监控报警
8.邮箱组件代码以及配置
9.测试结果
RabbitMQ交换机类型
- 简介
⽣产者将消息发送到 Exchange,交换器将消息路由到⼀个或者多个队列中,交换机有多个类型,队列和交换机是多对多的
关系。
交换机只负责转发消息,不具备存储消息的能⼒,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不⽤- 交换机类型
- Direct Exchange 定向
将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配
例⼦:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb- 处理路由健
Fanout Exchange ⼴播
只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得了⼀份复制的消息
Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形
不处理路由健
- Topic Exchange 通配符
主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上
符号“#”匹配⼀个或多个词,符号 ∗ * ∗匹配不多不少⼀个词
例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*”只会匹配到“abc.def”。
RabbitMQ配置开发实战
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
private String shortLinkEventExchange = "short_link.event.exchange";
/**
* 创建交换机 Topic类型
* ⼀般⼀个微服务⼀个交换机
*
* @return
*/
@Bean
public Exchange shortLinkEventExchange() {
return new TopicExchange(shortLinkEventExchange, true, false);
//return newFanoutExchange(shortLinkEventExchange,true,false);
}
/**
* 新增短链 队列
*/
private String shortLinkAddLinkQueue = "short_link.add.link.queue";
/**
* 新增短链映射 队列
*/
private String shortLinkAddMappingQueue = "short_link.add.mapping.queue";
/**
* 新增短链具体的routingKey,【发送消息使⽤】
*/
private String shortLinkAddRoutingKey = "short_link.add.link.mapping.routing.key";
/**
* topic类型的binding key,⽤于绑定队列和交换机,是⽤
* 于 link 消费者
*/
private String shortLinkAddLinkBindingKey = "short_link.add.link.*.routing.key";
/**
* topic类型的binding key,⽤于绑定队列和交换机,是⽤
* 于 mapping 消费者
*/
private String shortLinkAddMappingBindingKey = "short_link.add.*.mapping.routing.key";
/**
* 新增短链api队列和交换机的绑定关系建⽴
*/
@Bean
public Binding shortLinkAddApiBinding() {
return new Binding(shortLinkAddLinkQueue, Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddLinkBindingKey, null);
}
/**
* 新增短链mapping队列和交换机的绑定关系建⽴
*/
@Bean
public Binding shortLinkAddMappingBinding() {
return new Binding(shortLinkAddMappingQueue, Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddMappingBindingKey, null);
}
/**
* 新增短链api 普通队列,⽤于被监听
*/
@Bean
public Queue shortLinkAddLinkQueue() {
return new Queue(shortLinkAddLinkQueue, true, false, false);
}
/**
* 新增短链mapping 普通队列,⽤于被监听
*/
@Bean
public Queue shortLinkAddMappingQueue() {
return new Queue(shortLinkAddMappingQueue, true, false, false);
}
}
你要发送的消息,用一个类封装起来即可
对应controller,service开发
@PostMapping("/add")
public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest) {
JsonData jsonData = shortLinkService.createShortLink(shortLinkAddRequest);
return jsonData;
}
对应service,注入配置好的RabbitConfig以及rabbitTemplate即可
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMQConfig rabbitMQConfig;
发送代码
@Override
public JsonData createShortLink(ShortLinkAddRequest shortLinkAddRequest) {
/**
* 使用lombok建造者模式构建你要发送的消息,然后调用rabbitTemplate.convertAndSend方法,配置对应的交换机,路由key与消息即可
*/
Long account_no = LoginInterceptor.threadLocal.get().getAccountNo();
EventMessage eventMessage = EventMessage.builder().accountNo(account_no).messageId(IDUtil.geneSnowFlakeID().toString()).content(JsonUtil.obj2Json(shortLinkAddRequest)).eventMessageType(EventMessageType.SHORT_LINK_ADD.name()).build();
rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(),rabbitMQConfig.getShortLinkAddRoutingKey(),eventMessage);
return JsonData.buildSuccess();
}
消费者编写
@RabbitListener(queuesToDeclare = {@Queue("short_link.add.link.queue")})
@Slf4j
@Component
public class ShortLinkAddLinkMQListener {
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel)throws Exception{
log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:{}", message);
try {
int i=1/0;
//TODO 处理业务
} catch (Exception e) {
// 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
log.error("消费失败{}", eventMessage);
throw new Exception(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());
}
log.info("消费成功{}", eventMessage);
//手动确认消息消费成功
// channel.basicAck(msgTag, false);
}
}
application中配置rabbitMQ,部署可见环境搭建大集合(持续更新)
##----------rabbit配置--------------
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#需要⼿⼯创建虚拟主机
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=${admin}
spring.rabbitmq.password=${password}
#消息确认⽅式,manual(⼿动ack) 和auto(⾃动ack)
spring.rabbitmq.listener.simple.acknowledgemode=auto
#开启重试,消费者代码不能添加try catch捕获不往外抛异常
spring.rabbitmq.listener.simple.retry.enabled=true
#最⼤重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=4
# 重试消息的时间间隔,5秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000
将配置中host,port,username,password改为你自己的即可,并且记得去RabbitMQ可视化控制台创建一个名为dev的host,具体操作如下:
注意:
加了@bean配置交换机和queue,启动项⽬却没⾃动化创建队列
RabbitMQ懒加载模式, 需要配置消费者监听才会创建,
@RabbitListener(queues =“short_link.add.link.queue”)
另外种⽅式(若Mq中⽆相应名称的队列,也会⾃动创建Queue)
@RabbitListener(queuesToDeclare = {@Queue(“short_link.add.link.queue”) })
因为我们消费者代码逻辑中有1/0,用于模拟业务过程出错,这样即实战了消息发送也实现了异常监控
异常监控队列配置
@Configuration
@Data
public class RabbitMQErrorConfig {
private String shortLinkErrorExchange = "short_link.error.exchange";
private String shortLinkErrorQueue = "short_link.error.queue";
private String shortLinkErrorRoutingKey = "short_link.error.routing.key";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 异常交换机
*
* @return
*/
@Bean
public TopicExchange errorTopicExchange() {
return new TopicExchange(shortLinkErrorExchange, true, false);
}
/**
* 异常队列
*
* @return
*/
@Bean
public Queue errorQueue() {
return new Queue(shortLinkErrorQueue, true);
}
/**
* 队列与交换机进⾏绑定
*
* @return
*/
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange) {
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(shortLinkErrorRoutingKey);
}
/**
* 配置 RepublishMessageRecoverer
* ⽤途:消息重试⼀定次数后,⽤特定的routingKey转发到指
* 定的交换机中,⽅便后续排查和告警
*
* 顶层是 MessageRecoverer接⼝,多个实现类
*
* @return
*/
@Bean
public MessageRecoverer messageRecoverer() {
return new RepublishMessageRecoverer(rabbitTemplate, shortLinkErrorExchange, shortLinkErrorRoutingKey);
}
}
对应消费者代码如下,实现邮箱发送,监控报警
@RabbitListener(queuesToDeclare = {@Queue("short_link.error.queue")})
@Slf4j
@Component
public class ShortLinkErrorMQListener {
public static final String SUBJECT = "短链监控告警";
public static final String CONTENT = "用户%s短链创建%s,消息消费出现异常";
@Autowired
private ErrorNotifyComponent errorNotifyComponent;
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel)throws Exception{
log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:{}", message);
try {
errorNotifyComponent.sendMail("110@qq.com",SUBJECT,String.format(CONTENT,eventMessage.getAccountNo(),eventMessage.getContent()));
log.info("发送成功");
//TODO 处理业务
} catch (Exception e) {
// 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
log.error("消费失败{}", eventMessage);
throw new Exception(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());
}
log.info("消费成功{}", eventMessage);
//确认消息消费成功
// channel.basicAck(msgTag, false);
}
}
ErrorNotifyComponent实现邮箱发送代码及配置,发送端使⽤⽹易邮箱https://mail.126.com/
具体如何获得授权码,请见博客网易邮箱获取授权码
application中添加配置如下
spring.mail.host=smtp.126.com
spring.mail.username=${你注册的网易邮箱账号}
spring.mail.password=${得到的授权码}
spring.mail.from=${你注册的网易邮箱账号}
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.ssl.enable=true
spring.mail.default-encoding=utf-8
邮箱组件代码
@Component
@Slf4j
public class ErrorNotifyComponent {
@Autowired
private JavaMailSender mailSender;
@Value("${spring.mail.from}")
private String from;
@Async
public void sendMail(String to, String subject, String content) {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(subject);
message.setText(content);
mailSender.send(message);
log.info("邮件发送成功:{}",message.toString());
}
}
使用postman测试后,发现重试之后报了错,但是进入了异常队列并成功发送邮件