• SpringBoot 中使用 RabbitTemplate


    SpringBoot 中 使用RabbtiMq 

    如图使用redisTemplate 一样的简单方便

    模拟发送邮件的情况

    pom.xml  

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.springframework.amqpgroupId>
    7. <artifactId>spring-rabbit-testartifactId>
    8. <scope>testscope>
    9. dependency>

    application.properties

    1. spring.rabbitmq.username=guest
    2. spring.rabbitmq.password=guest
    3. spring.rabbitmq.host=192.168.91.128
    4. spring.rabbitmq.port=5672
    5. ## 根据自己情况而定,可以不用
    6. spring.rabbitmq.listener.simple.acknowledge-mode=manual
    7. spring.rabbitmq.listener.simple.prefetch=100

    写在配置文件中,由 RabbitProperties 这个类进行读取,封装到ConnectionFactory 中。

    MailConstants (常量)

    1. public class MailConstants {
    2. public static final Integer DELIVERING = 0;//消息投递中
    3. public static final Integer SUCCESS = 1;//消息投递成功
    4. public static final Integer FAILURE = 2;//消息投递失败
    5. public static final Integer MAX_TRY_COUNT = 3;//最大重试次数
    6. public static final Integer MSG_TIMEOUT = 1;//消息超时时间
    7. public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue";
    8. public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange";
    9. public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key";
    10. }


    RabbitConfig (rabbitMq的配置类)

    1. import org.javaboy.vhr.model.MailConstants;
    2. import org.javaboy.vhr.service.MailSendLogService;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import org.springframework.amqp.core.Binding;
    6. import org.springframework.amqp.core.BindingBuilder;
    7. import org.springframework.amqp.core.DirectExchange;
    8. import org.springframework.amqp.core.Queue;
    9. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    11. import org.springframework.beans.factory.annotation.Autowired;
    12. import org.springframework.context.annotation.Bean;
    13. import org.springframework.context.annotation.Configuration;
    14. @Configuration
    15. public class RabbitConfig {
    16. public final static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    17. @Autowired
    18. CachingConnectionFactory cachingConnectionFactory;
    19. //发送邮件的
    20. @Autowired
    21. MailSendLogService mailSendLogService;
    22. @Bean
    23. RabbitTemplate rabbitTemplate() {
    24. RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
    25. //手动应答返回的标志
    26. rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
    27. String msgId = data.getId();
    28. if (ack) {
    29. logger.info(msgId + ":消息发送成功");
    30. mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功
    31. } else {
    32. logger.info(msgId + ":消息发送失败");
    33. }
    34. });
    35. rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {
    36. logger.info("消息发送失败");
    37. });
    38. return rabbitTemplate;
    39. }
    40. @Bean
    41. Queue mailQueue() {
    42. return new Queue(MailConstants.MAIL_QUEUE_NAME, true);
    43. }
    44. @Bean
    45. DirectExchange mailExchange() {
    46. return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME, true, false);
    47. }
    48. @Bean
    49. Binding mailBinding() {
    50. return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
    51. }
    52. }

    MailSendTask(定时任务,发送)

    1. @Component
    2. public class MailSendTask {
    3. @Autowired
    4. MailSendLogService mailSendLogService;
    5. @Autowired
    6. RabbitTemplate rabbitTemplate;
    7. @Autowired
    8. EmployeeService employeeService;
    9. @Scheduled(cron = "0/10 * * * * ?")
    10. public void mailResendTask() {
    11. List logs = mailSendLogService.getMailSendLogsByStatus();
    12. if (logs == null || logs.size() == 0) {
    13. return;
    14. }
    15. logs.forEach(mailSendLog->{
    16. if (mailSendLog.getCount() >= 3) {
    17. mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败
    18. }else{
    19. mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date());
    20. Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId());
    21. /**
    22. * 参数1:交换机名称
    23. * 参数2 :路由key
    24. * 参数三:数据
    25. * 参数4:作为唯一标识
    26. *
    27. */
    28. rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId()));
    29. }
    30. });
    31. }
    32. }

    MailReceiver(接收端)

    1. @Component
    2. public class MailReceiver {
    3. public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class);
    4. @Autowired
    5. JavaMailSender javaMailSender;
    6. @Autowired
    7. MailProperties mailProperties;
    8. @Autowired
    9. TemplateEngine templateEngine;
    10. @Autowired
    11. StringRedisTemplate redisTemplate;
    12. @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
    13. public void handler(Message message, Channel channel) throws IOException {
    14. Employee employee = (Employee) message.getPayload();
    15. MessageHeaders headers = message.getHeaders();
    16. Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    17. String msgId = (String) headers.get("spring_returned_message_correlation");
    18. if (redisTemplate.opsForHash().entries("mail_log").containsKey(msgId)) {
    19. //redis 中包含该 key,说明该消息已经被消费过
    20. logger.info(msgId + ":消息已经被消费");
    21. channel.basicAck(tag, false);//确认消息已消费
    22. return;
    23. }
    24. //收到消息,发送邮件
    25. MimeMessage msg = javaMailSender.createMimeMessage();
    26. MimeMessageHelper helper = new MimeMessageHelper(msg);
    27. try {
    28. helper.setTo(employee.getEmail());
    29. helper.setFrom(mailProperties.getUsername());
    30. helper.setSubject("入职欢迎");
    31. helper.setSentDate(new Date());
    32. Context context = new Context();
    33. context.setVariable("name", employee.getName());
    34. context.setVariable("posName", employee.getPosition().getName());
    35. context.setVariable("joblevelName", employee.getJobLevel().getName());
    36. context.setVariable("departmentName", employee.getDepartment().getName());
    37. //根据模板发送
    38. String mail = templateEngine.process("mail", context);
    39. helper.setText(mail, true);
    40. javaMailSender.send(msg);
    41. redisTemplate.opsForHash().put("mail_log", msgId, "javaboy");
    42. channel.basicAck(tag, false);
    43. logger.info(msgId + ":邮件发送成功");
    44. } catch (MessagingException e) {
    45. //手动应答, tag 消息id ,、
    46. channel.basicNack(tag, false, true);
    47. e.printStackTrace();
    48. logger.error("邮件发送失败:" + e.getMessage());
    49. }
    50. }
    51. }

    使用总结

    0. rabbtMq的本地服务,得开启。(跟redis差不多)

    1. 写 application.properties中的rabbitMq的连接配置等

    2. rabbitConfig配置文件。(包括:交换机选择与队列的配置,绑定),选择的模式在这里配置

    3. 直接使用,导入rabbitTemplate类,使用rabbitTemplate.convertAndSend()方法

    4. 接收类

    @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
     public void handler(Message message, Channel channel) throws IOException {

            业务逻辑了

            手动接收等等

    }

    相关文章:

    1. rabbitMq基础结构图

    2. channel接口常用方法

    3. rabbitTemplate模板

    4. rabbitMq的笔记1

    5. rabbitMq笔记2

  • 相关阅读:
    数字图像处理——基本运算
    玻璃生产过程中的窑内压力高精度恒定控制解决方案
    计算机网络——TCP/IP模型
    前后端分离 基础(新增与查询)
    奇异码,非奇异码和唯一可译码和即时码的区别
    财务费用分析怎么分析
    Python连接Clickhouse遇坑篇,耗时一天成功连接!
    分治算法详解
    P2P实现远程控制
    N点复序列求2个N点实序列的快速傅里叶变换
  • 原文地址:https://blog.csdn.net/qq_46539281/article/details/125998621