• RabbitMQ快速使用代码手册


    本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
    内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu

    参考文档#

    csdn博客:

    基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134

    高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

    application.yml#

    server:
    
    port: 8021
    
    spring:
    
    #给项目来个名字
    
    application:
    
    name: rabbitmq-provider
    
    #配置rabbitMq 服务器
    
    rabbitmq:
    
    host: 127.0.0.1
    
    port: 5672
    
    username: root
    
    password: root
    
    #虚拟host 可以不设置,使用server默认host
    
    virtual-host: JCcccHost
    
    #确认消息已发送到交换机(Exchange)
    
    #publisher-confirms: true
    
    publisher-confirm-type: correlated
    
    #确认消息已发送到队列(Queue)
    
    publisher-returns: true
    

    完善更多信息

    spring:
    
    rabbitmq:
    
    host: localhost
    
    port: 5672
    
    virtual-host: /
    
    username: guest
    
    password: guest
    
    publisher-confirm-type: correlated
    
    publisher-returns: true
    
    template:
    
    mandatory: true
    
    retry:
    
    #发布重试,默认false
    
    enabled: true
    
    #重试时间 默认1000ms
    
    initial-interval: 1000
    
    #重试最大次数 最大3
    
    max-attempts: 3
    
    #重试最大间隔时间
    
    max-interval: 10000
    
    #重试的时间隔乘数,比如配2,0
    第一次等于10s,第二次等于20s,第三次等于40s
    
    multiplier: 1
    
    listener:
    
    \# 默认配置是simple
    
    type: simple
    
    simple:
    
    \# 手动ack Acknowledge mode of container. auto none
    
    acknowledge-mode: manual
    
    #消费者调用程序线程的最小数量
    
    concurrency: 10
    
    #消费者最大数量
    
    max-concurrency: 10
    
    #限制消费者每次只处理一条信息,处理完在继续下一条
    
    prefetch: 1
    
    #启动时是否默认启动容器
    
    auto-startup: true
    
    #被拒绝时重新进入队列
    
    default-requeue-rejected: true
    

    相关注解说明#

    @RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
    里面的消息。

    @RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。

    \@Component
    
    public class PointConsumer {
    
    //监听的队列名
    
    \@RabbitListener(queues = \"point.to.point\")
    
    public void processOne(String name) {
    
    System.out.println(\"point.to.point:\" + name);
    
    }
    
    }
    

    @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用

    @RabbitListener 标注在类上面表示当有收到消息的时候,就交给
    @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。

    \@Component
    
    \@RabbitListener(queues = \"consumer_queue\")
    
    public class Receiver {
    
    \@RabbitHandler
    
    public void processMessage1(String message) {
    
    System.out.println(message);
    
    }
    
    \@RabbitHandler
    
    public void processMessage2(byte\[\] message) {
    
    System.out.println(new String(message));
    
    }
    
    }
    

    @Payload

    可以获取消息中的 body 信息

    \@RabbitListener(queues = \"debug\")
    
    public void processMessage1(@Payload String body) {
    
    System.out.println(\"body:\"+body);
    
    }
    

    @Header,@Headers

    可以获得消息中的 headers 信息

    \@RabbitListener(queues = \"debug\")
    
    public void processMessage1(@Payload String body, \@Header String token)
    {
    
    System.out.println(\"body:\"+body);
    
    System.out.println(\"token:\"+token);
    
    }
    
    \@RabbitListener(queues = \"debug\")
    
    public void processMessage1(@Payload String body, \@Headers
    Map\ headers) {
    
    System.out.println(\"body:\"+body);
    
    System.out.println(\"Headers:\"+headers);
    
    }
    

    快速使用#

    配置xml文件#

    
    
    \org.springframework.boot\
    
    \spring-boot-starter-amqp\
    
    \
    

    配置exchange、queue#

    注解快速创建版本#

    \@Configuration
    
    public class RabbitmqConfig {
    
    //创建交换机
    
    //通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机
    
    \@Bean(\"bootExchange\")
    
    public Exchange bootExchange() {
    
    return
    ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();
    
    }
    
    //创建队列
    
    \@Bean(\"bootQueue\")
    
    public Queue bootQueue() {
    
    return QueueBuilder.durable(\"zx_queue\").build();
    
    }
    
    /\*\*
    
    \* 将队列与交换机绑定
    
    \*
    
    \* \@param queue
    
    \* \@param exchange
    
    \* \@return
    
    \*/
    
    \@Bean
    
    public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,
    \@Qualifier(\"bootExchange\") Exchange exchange) {
    
    return
    BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();
    
    }
    
    }
    

    Direct#

    import org.springframework.amqp.core.Binding;
    
    import org.springframework.amqp.core.BindingBuilder;
    
    import org.springframework.amqp.core.DirectExchange;
    
    import org.springframework.amqp.core.Queue;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    /\*\*
    
    \* \@Author : JCccc
    
    \* \@CreateTime : 2019/9/3
    
    \* \@Description :
    
    \*\*/
    
    \@Configuration
    
    public class DirectRabbitConfig {
    
    //队列 起名:TestDirectQueue
    
    \@Bean
    
    public Queue TestDirectQueue() {
    
    //
    durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    
    //
    exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除此参考优先级高于durable
    
    //
    autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除
    
    // return new Queue(\"TestDirectQueue\",true,true,false);
    
    //一般设置一下队列的持久化就好,其余两个就是默认false
    
    return new Queue(\"TestDirectQueue\",true);
    
    }
    
    //Direct交换机 起名:TestDirectExchange
    
    \@Bean
    
    DirectExchange TestDirectExchange() {
    
    // return new DirectExchange(\"TestDirectExchange\",true,true);
    
    return new DirectExchange(\"TestDirectExchange\",true,false);
    
    }
    
    //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    
    \@Bean
    
    Binding bindingDirect() {
    
    return
    BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");
    
    }
    
    \@Bean
    
    DirectExchange lonelyDirectExchange() {
    
    return new DirectExchange(\"lonelyDirectExchange\");
    
    }
    
    }
    

    Fanout#

    import org.springframework.amqp.core.Binding;
    
    import org.springframework.amqp.core.BindingBuilder;
    
    import org.springframework.amqp.core.FanoutExchange;
    
    import org.springframework.amqp.core.Queue;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    /\*\*
    
    \* \@Author : JCccc
    
    \* \@CreateTime : 2019/9/3
    
    \* \@Description :
    
    \*\*/
    
    \@Configuration
    
    public class FanoutRabbitConfig {
    
    /\*\*
    
    \* 创建三个队列 :fanout.A fanout.B fanout.C
    
    \* 将三个队列都绑定在交换机 fanoutExchange 上
    
    \* 因为是扇型交换机, 路由键无需配置,配置也不起作用
    
    \*/
    
    \@Bean
    
    public Queue queueA() {
    
    return new Queue(\"fanout.A\");
    
    }
    
    \@Bean
    
    public Queue queueB() {
    
    return new Queue(\"fanout.B\");
    
    }
    
    \@Bean
    
    public Queue queueC() {
    
    return new Queue(\"fanout.C\");
    
    }
    
    \@Bean
    
    FanoutExchange fanoutExchange() {
    
    return new FanoutExchange(\"fanoutExchange\");
    
    }
    
    \@Bean
    
    Binding bindingExchangeA() {
    
    return BindingBuilder.bind(queueA()).to(fanoutExchange());
    
    }
    
    \@Bean
    
    Binding bindingExchangeB() {
    
    return BindingBuilder.bind(queueB()).to(fanoutExchange());
    
    }
    
    \@Bean
    
    Binding bindingExchangeC() {
    
    return BindingBuilder.bind(queueC()).to(fanoutExchange());
    
    }
    
    }
    

    Topic#

    import org.springframework.amqp.core.Binding;
    
    import org.springframework.amqp.core.BindingBuilder;
    
    import org.springframework.amqp.core.Queue;
    
    import org.springframework.amqp.core.TopicExchange;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    /\*\*
    
    \* \@Author : JCccc
    
    \* \@CreateTime : 2019/9/3
    
    \* \@Description :
    
    \*\*/
    
    \@Configuration
    
    public class TopicRabbitConfig {
    
    //绑定键
    
    public final static String man = \"topic.man\";
    
    public final static String woman = \"topic.woman\";
    
    \@Bean
    
    public Queue firstQueue() {
    
    return new Queue(TopicRabbitConfig.man);
    
    }
    
    \@Bean
    
    public Queue secondQueue() {
    
    return new Queue(TopicRabbitConfig.woman);
    
    }
    
    \@Bean
    
    TopicExchange exchange() {
    
    return new TopicExchange(\"topicExchange\");
    
    }
    
    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    
    \@Bean
    
    Binding bindingExchangeMessage() {
    
    return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    
    }
    
    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    
    // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    
    \@Bean
    
    Binding bindingExchangeMessage2() {
    
    return
    BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");
    
    }
    
    }
    

    生产者发送消息#

    直接发送给队列

    //指定消息队列的名字,直接发送消息到消息队列中
    
    \@Test
    
    public void testSimpleQueue() {
    
    // 队列名称
    
    String queueName = \"simple.queue\";
    
    // 消息
    
    String message = \"hello, spring amqp!\";
    
    // 发送消息
    
    rabbitTemplate.convertAndSend(queueName, message);
    
    }
    

    发送给交换机,然后走不同的模式

    ////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息
    
    \@Test
    
    public void testSendDirectExchange() {
    
    // 交换机名称,有三种类型
    
    String exchangeName = \"itcast.direct\";
    
    // 消息
    
    String message =
    \"红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!\";
    
    // 发送消息,red为队列的key,因此此队列会得到消息
    
    rabbitTemplate.convertAndSend(exchangeName, \"red\", message);
    
    }
    

    也可以将发送的消息封装到HashMap中然后发送给交换机

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.web.bind.annotation.GetMapping;
    
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    
    import java.time.format.DateTimeFormatter;
    
    import java.util.HashMap;
    
    import java.util.Map;
    
    import java.util.UUID;
    
    /\*\*
    
    \* \@Author : JCccc
    
    \* \@CreateTime : 2019/9/3
    
    \* \@Description :
    
    \*\*/
    
    \@RestController
    
    public class SendMessageController {
    
    \@Autowired
    
    RabbitTemplate rabbitTemplate;
    //使用RabbitTemplate,这提供了接收/发送等等方法
    
    \@GetMapping(\"/sendDirectMessage\")
    
    public String sendDirectMessage() {
    
    String messageId = String.valueOf(UUID.randomUUID());
    
    String messageData = \"test message, hello!\";
    
    String createTime =
    LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd
    HH:mm:ss\"));
    
    Map\ map=new HashMap\<\>();
    
    map.put(\"messageId\",messageId);
    
    map.put(\"messageData\",messageData);
    
    map.put(\"createTime\",createTime);
    
    //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
    
    rabbitTemplate.convertAndSend(\"TestDirectExchange\",
    \"TestDirectRouting\", map);
    
    return \"ok\";
    
    }
    
    }
    

    消费者接收消息#

    //使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
    
    \@Component
    
    public class MessageListener {
    
    \@RabbitListener(queues = \"direct_queue\")
    
    public void receive(String id){
    
    System.out.println(\"已完成短信发送业务(rabbitmq direct),id:\"+id);
    
    }
    
    }
    
    参数用Map接收也可以
    
    \@Component
    
    \@RabbitListener(queues = \"TestDirectQueue\")//监听的队列名称
    TestDirectQueue
    
    public class DirectReceiver {
    
    \@RabbitHandler
    
    public void process(Map testMessage) {
    
    System.out.println(\"DirectReceiver消费者收到消息 : \" +
    testMessage.toString());
    
    }
    
    }
    

    高级特性#

    消息可靠性传递#

    有confirm和return两种

    在application.yml中添加以下配置项:

    server:
    
    port: 8021
    
    spring:
    
    #给项目来个名字
    
    application:
    
    name: rabbitmq-provider
    
    #配置rabbitMq 服务器
    
    rabbitmq:
    
    host: 127.0.0.1
    
    port: 5672
    
    username: root
    
    password: root
    
    #虚拟host 可以不设置,使用server默认host
    
    virtual-host: JCcccHost
    
    #确认消息已发送到交换机(Exchange)
    
    #publisher-confirms: true
    
    publisher-confirm-type: correlated
    
    #确认消息已发送到队列(Queue)
    
    publisher-returns: true
    

    有两种配置方法:

    写到配置类中

    写到工具类或者普通类中,但是这个类得实现那两个接口

    写法一#

    编写消息确认回调函数

    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    \@Configuration
    
    public class RabbitConfig {
    
    \@Bean
    
    public RabbitTemplate createRabbitTemplate(ConnectionFactory
    connectionFactory){
    
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    
    rabbitTemplate.setConnectionFactory(connectionFactory);
    
    //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    
    rabbitTemplate.setMandatory(true);
    
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
    \@Override
    
    public void confirm(CorrelationData correlationData, boolean ack, String
    cause) {
    
    System.out.println(\"ConfirmCallback:
    \"+\"相关数据:\"+correlationData);
    
    System.out.println(\"ConfirmCallback: \"+\"确认情况:\"+ack);
    
    System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);
    
    }
    
    });
    
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    
    \@Override
    
    public void returnedMessage(Message message, int replyCode, String
    replyText, String exchange, String routingKey) {
    
    System.out.println(\"ReturnCallback: \"+\"消息:\"+message);
    
    System.out.println(\"ReturnCallback: \"+\"回应码:\"+replyCode);
    
    System.out.println(\"ReturnCallback: \"+\"回应信息:\"+replyText);
    
    System.out.println(\"ReturnCallback: \"+\"交换机:\"+exchange);
    
    System.out.println(\"ReturnCallback: \"+\"路由键:\"+routingKey);
    
    }
    
    });
    
    return rabbitTemplate;
    
    }
    
    }
    

    写法二#

    \@Component
    
    \@Slf4j
    
    public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
    RabbitTemplate.ReturnsCallback {
    
    \@Resource
    
    private RedisTemplate\<String, String\> redisTemplate;
    
    \@Resource
    
    private RabbitTemplate rabbitTemplate;
    
    private String finalId = null;
    
    private SmsDTO smsDTO = null;
    
    /\*\*
    
    \* 发布者确认的回调
    
    \*
    
    \* \@param correlationData 回调的相关数据
    
    \* \@param b ack为真,nack为假
    
    \* \@param s 一个可选的原因,用于nack,如果可用,否则为空
    
    \*/
    
    \@Override
    
    public void confirm(CorrelationData correlationData, boolean b, String
    s) {
    
    // 消息发送成功,将redis中消息的状态(status)修改为1
    
    if (b) {
    
    redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
    finalId, \"status\", 1);
    
    } else {
    
    // 发送失败,放入redis失败集合中,并删除集合数据
    
    log.error(\"短信消息投送失败:{}\--\>{}\", correlationData, s);
    
    redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
    
    redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
    this.smsDTO);
    
    }
    
    }
    
    /\*\*
    
    \* 发生异常时的消息返回提醒
    
    \*
    
    \* \@param returnedMessage
    
    \*/
    
    \@Override
    
    public void returnedMessage(ReturnedMessage returnedMessage) {
    
    log.error(\"发生异常,返回消息回调:{}\", returnedMessage);
    
    // 发送失败,放入redis失败集合中,并删除集合数据
    
    redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
    
    redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
    this.smsDTO);
    
    }
    
    \@PostConstruct
    
    public void init() {
    
    rabbitTemplate.setConfirmCallback(this);
    
    rabbitTemplate.setReturnsCallback(this);
    
    }
    
    }
    

    消息确认机制#

    手动确认

    yml配置
    
    #手动确认 manual
    
    listener:
    
    simple:
    
    acknowledge-mode: manual
    

    写法一#

    首先在消费者项目中创建MessageListenerConfig

    import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    
    import org.springframework.amqp.core.Queue;
    
    import
    org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    
    import
    org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    \@Configuration
    
    public class MessageListenerConfig {
    
    \@Autowired
    
    private CachingConnectionFactory connectionFactory;
    
    \@Autowired
    
    private MyAckReceiver myAckReceiver;//消息接收处理类
    
    \@Bean
    
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    
    SimpleMessageListenerContainer container = new
    SimpleMessageListenerContainer(connectionFactory);
    
    container.setConcurrentConsumers(1);
    
    container.setMaxConcurrentConsumers(1);
    
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
    RabbitMQ默认是自动确认,这里改为手动确认消息
    
    //设置一个队列
    
    container.setQueueNames(\"TestDirectQueue\");
    
    //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
    
    //
    container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");
    
    //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
    
    //container.setQueues(new Queue(\"TestDirectQueue\",true));
    
    //container.addQueues(new Queue(\"TestDirectQueue2\",true));
    
    //container.addQueues(new Queue(\"TestDirectQueue3\",true));
    
    container.setMessageListener(myAckReceiver);
    
    return container;
    
    }
    
    }
    

    然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)

    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.core.Message;
    
    import
    org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    
    import org.springframework.stereotype.Component;
    
    import java.io.ByteArrayInputStream;
    
    import java.io.ObjectInputStream;
    
    import java.util.Map;
    
    \@Component
    
    public class MyAckReceiver implements ChannelAwareMessageListener {
    
    \@Override
    
    public void onMessage(Message message, Channel channel) throws Exception
    {
    
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
    
    byte\[\] body = message.getBody();
    
    ObjectInputStream ois = new ObjectInputStream(new
    ByteArrayInputStream(body));
    
    Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
    
    String messageId = msgMap.get(\"messageId\");
    
    String messageData = msgMap.get(\"messageData\");
    
    String createTime = msgMap.get(\"createTime\");
    
    ois.close();
    
    System.out.println(\" MyAckReceiver messageId:\"+messageId+\"
    messageData:\"+messageData+\" createTime:\"+createTime);
    
    System.out.println(\"消费的主题消息来自:\"+message.getMessageProperties().getConsumerQueue());
    
    channel.basicAck(deliveryTag, true);
    //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认
    delivery_tag 小于等于传入值的所有消息
    
    //channel.basicReject(deliveryTag,
    true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
    
    } catch (Exception e) {
    
    channel.basicReject(deliveryTag, false);
    
    e.printStackTrace();
    
    }
    
    }
    
    }
    

    如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:

    首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可

    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.core.Message;
    
    import
    org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    
    import org.springframework.stereotype.Component;
    
    import java.io.ByteArrayInputStream;
    
    import java.io.ObjectInputStream;
    
    import java.util.Map;
    
    \@Component
    
    public class MyAckReceiver implements ChannelAwareMessageListener {
    
    \@Override
    
    public void onMessage(Message message, Channel channel) throws Exception
    {
    
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    try {
    
    byte\[\] body = message.getBody();
    
    ObjectInputStream ois = new ObjectInputStream(new
    ByteArrayInputStream(body));
    
    Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
    
    String messageId = msgMap.get(\"messageId\");
    
    String messageData = msgMap.get(\"messageData\");
    
    String createTime = msgMap.get(\"createTime\");
    
    ois.close();
    
    if
    (\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){
    
    System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());
    
    System.out.println(\"消息成功消费到 messageId:\"+messageId+\"
    messageData:\"+messageData+\" createTime:\"+createTime);
    
    System.out.println(\"执行TestDirectQueue中的消息的业务处理流程\...\...\");
    
    }
    
    if
    (\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){
    
    System.out.println(\"消费的消息来自的队列名为:\"+message.getMessageProperties().getConsumerQueue());
    
    System.out.println(\"消息成功消费到 messageId:\"+messageId+\"
    messageData:\"+messageData+\" createTime:\"+createTime);
    
    System.out.println(\"执行fanout.A中的消息的业务处理流程\...\...\");
    
    }
    
    channel.basicAck(deliveryTag, true);
    
    //channel.basicReject(deliveryTag, true);//为true会重新放回队列
    
    } catch (Exception e) {
    
    channel.basicReject(deliveryTag, false);
    
    e.printStackTrace();
    
    }
    
    }
    
    }
    

    写法二#

    \@Component
    
    \@Slf4j
    
    public class SendSmsListener {
    
    \@Resource
    
    private RedisTemplate\<String, String\> redisTemplate;
    
    \@Resource
    
    private SendSmsUtils sendSmsUtils;
    
    /\*\*
    
    \* 监听发送短信普通队列
    
    \* \@param smsDTO
    
    \* \@param message
    
    \* \@param channel
    
    \* \@throws IOException
    
    \*/
    
    \@RabbitListener(queues = SMS_QUEUE_NAME)
    
    public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
    channel) throws IOException {
    
    String messageId = message.getMessageProperties().getMessageId();
    
    int retryCount = (int)
    redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
    messageId, \"retryCount\");
    
    if (retryCount \> 3) {
    
    //重试次数大于3,直接放到死信队列
    
    log.error(\"短信消息重试超过3次:{}\", messageId);
    
    //basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
    
    //该方法reject后,该消费者还是会消费到该条被reject的消息。
    
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    
    redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
    
    return;
    
    }
    
    try {
    
    String phoneNum = smsDTO.getPhoneNum();
    
    String code = smsDTO.getCode();
    
    if(StringUtils.isAnyBlank(phoneNum,code)){
    
    throw new RuntimeException(\"sendSmsListener参数为空\");
    
    }
    
    // 发送消息
    
    SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
    code);
    
    SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();
    
    SendStatus sendStatus = sendStatusSet\[0\];
    
    if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send
    success\".equals(sendStatus.getMessage())){
    
    throw new RuntimeException(\"发送验证码失败\");
    
    }
    
    //手动确认消息
    
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
    log.info(\"短信发送成功:{}\",smsDTO);
    
    redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
    
    } catch (Exception e) {
    
    redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);
    
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    
    }
    
    }
    
    /\*\*
    
    \* 监听到发送短信死信队列
    
    \* \@param sms
    
    \* \@param message
    
    \* \@param channel
    
    \* \@throws IOException
    
    \*/
    
    \@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)
    
    public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
    channel) throws IOException {
    
    try{
    
    log.error(\"监听到死信队列消息==\>{}\",sms);
    
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
    }catch (Exception e){
    
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    
    }
    
    }
    
    }
    

    消费端限流#

    #配置RabbitMQ
    
    spring:
    
    rabbitmq:
    
    host: 192.168.126.3
    
    port: 5672
    
    username: guest
    
    password: guest
    
    virtual-host: /
    
    #开启自动确认 none 手动确认 manual
    
    listener:
    
    simple:
    
    #消费端限流机制必须开启手动确认
    
    acknowledge-mode: manual
    
    #消费端最多拉取的消息条数,签收后不满该条数才会继续拉取
    
    prefetch: 5
    

    消息存活时间TTL#

    可以设置队列的存活时间,也可以设置具体消息的存活时间

    设置队列中所有消息的存活时间

    return QueueBuilder

    .durable(QUEUE_NAME)//队列持久化

    .ttl(10000)//设置队列的所有消息存活10s

    .build();

    即在创建队列时,设置存活时间

    设置某条消息的存活时间

    //发送消息,并设置该消息的存活时间

    \@Test
    
    public void testSendMessage()
    
    {
    
    //1.创建消息属性
    
    MessageProperties messageProperties = new MessageProperties();
    
    //2.设置存活时间
    
    messageProperties.setExpiration(\"10000\");
    
    //3.创建消息对象
    
    Message message = new
    Message(\"sendMessage\...\".getBytes(),messageProperties);
    
    //4.发送消息
    
    rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);
    
    }
    

    若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。

    在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准

    死信队列#

    死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能

    import org.springframework.amqp.core.\*;
    
    import org.springframework.beans.factory.annotation.Qualifier;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    \@Configuration//Rabbit配置类
    
    public class RabbitConfig4 {
    
    private final String DEAD_EXCHANGE = \"dead_exchange\";
    
    private final String DEAD_QUEUE = \"dead_queue\";
    
    private final String NORMAL_EXCHANGE = \"normal_exchange\";
    
    private final String NORMAL_QUEUE = \"normal_queue\";
    
    //创建死信交换机
    
    \@Bean(DEAD_EXCHANGE)
    
    public Exchange deadExchange()
    
    {
    
    return ExchangeBuilder
    
    .topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字
    topic为通配符模式的交换机
    
    .durable(true)//是否持久化,true即存到磁盘,false只在内存上
    
    .build();
    
    }
    
    //创建死信队列
    
    \@Bean(DEAD_QUEUE)
    
    public Queue deadQueue()
    
    {
    
    return QueueBuilder
    
    .durable(DEAD_QUEUE)//队列持久化
    
    //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
    
    .build();
    
    }
    
    //死信交换机绑定死信队列
    
    \@Bean
    
    //@Qualifier注解,使用名称装配进行使用
    
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
    exchange, \@Qualifier(DEAD_QUEUE) Queue queue)
    
    {
    
    return BindingBuilder
    
    .bind(queue)
    
    .to(exchange)
    
    .with(\"dead_routing\")
    
    .noargs();
    
    }
    
    //创建普通交换机
    
    \@Bean(NORMAL_EXCHANGE)
    
    public Exchange normalExchange()
    
    {
    
    return ExchangeBuilder
    
    .topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字
    topic为通配符模式的交换机
    
    .durable(true)//是否持久化,true即存到磁盘,false只在内存上
    
    .build();
    
    }
    
    //创建普通队列
    
    \@Bean(NORMAL_QUEUE)
    
    public Queue normalQueue()
    
    {
    
    return QueueBuilder
    
    .durable(NORMAL_QUEUE)//队列持久化
    
    //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
    
    .deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机
    
    .deadLetterRoutingKey(\"dead_routing\")//死信队列路由关键字
    
    .ttl(10000)//消息存活10s
    
    .maxLength(10)//队列最大长度为10
    
    .build();
    
    }
    
    //普通交换机绑定普通队列
    
    \@Bean
    
    //@Qualifier注解,使用名称装配进行使用
    
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
    exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)
    
    {
    
    return BindingBuilder
    
    .bind(queue)
    
    .to(exchange)
    
    .with(\"my_routing\")
    
    .noargs();
    
    }
    
    }
    

    延迟队列#

    RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能

    即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。

    消费者监听延迟队列

    \@Component
    
    public class ExpireOrderConsumer {
    
    //监听过期订单队列
    
    \@RabbitListener(queues = \"expire_queue\")
    
    public void listenMessage(String orderId)
    
    {
    
    //模拟处理数据库等业务
    
    System.out.println(\"查询\"+orderId+\"号订单的状态,如果已支付无需处理,如果未支付则回退库存\");
    
    }
    
    }
    
    控制层代码
    
    \@RestController
    
    public class OrderController {
    
    \@Autowired
    
    private RabbitTemplate rabbitTemplate;
    
    \@RequestMapping(value = \"/place/{orderId}\",method =
    RequestMethod.GET)
    
    public String placeOrder(@PathVariable String orderId)
    
    {
    
    //模拟service层处理
    
    System.out.println(\"处理订单数据\...\");
    
    //将订单id发送到订单队列
    
    rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);
    
    return \"下单成功,修改库存\";
    
    }
    
    }
    
  • 相关阅读:
    下载安装和使用Nvm
    JAVA:实现Damm达姆算法(附完整源码)
    C++模拟实现——list
    FIX三天日记-quick fix简介
    oracle查询历史SQL记录
    electron-vue operation not permitted
    一种改进多旋翼无人机动态仿真的模块化仿真环境研究(Matlab代码实现)
    Stable Diffusion教程
    南京邮电大学高级语言程序设计实验四(一维与二维数组实验)
    (SAR)Sentinel-1影像自动下载
  • 原文地址:https://www.cnblogs.com/Changes404/p/17486179.html