• rabbitmq概述和六种工作模式


    MQ的概述:
    MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信
    在这里插入图片描述
    MQ 的优势:
    1.服务解耦:提高系统容错性和可维护性
    系统的耦合性越高,容错性就越低,可维护性就越低。使用 MQ 使得应用间解耦,提升容错性和可维护性,服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
    在这里插入图片描述
    2.异步提速:提升用户体验和系统吞吐量
    在这里插入图片描述
    3.削峰填谷(流量削峰):提高系统稳定性
    假设我们有一个应用,平时访问量是每秒300请求,而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力;

    MQ 的劣势:
    系统可用性降低,系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
    系统复杂度提高,MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况?

    常见的 MQ 产品:
    在这里插入图片描述
    RabbitMQ使用的网络协议:
    AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
    在这里插入图片描述
    RabbitMQ 简介:
    2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
    RabbitMQ 基础架构如下图:
    在这里插入图片描述
    RabbitMQ 中的相关概念:
    Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker;
    Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等,可以理解成在数据库中创建的DB;
    Connection:publisher/consumer 和 broker 之间的 TCP 连接
    Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

    Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
    Queue:消息最终被送到这里等待 consumer 取走
    Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    RabbitMQ 提供了 6 种工作模式:

    简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
    官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
    在这里插入图片描述
    六种模式代码演示:
    添加依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.4.3</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    关于ACK:
    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者
    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。
    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以;
    当消费者接受消息出现异常,没有进行ack,则接下来这个消费者将不会接受消息;
    如果没有发生异常,但是没有ack,则这个消费者不会接受其它的消息;这个没有ack的消息只有在这个没有ack的消费者断开连接后,才会被其它消费者接收;

    1.简单模式:一个生产者一个消费者

    public class Producer {
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.136");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("bijian");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("bjjian-vi-host");
    
            //获取TCP长连接
            Connection connection = connectionFactory.newConnection();
            //创建通信“通道”,相当于TCP中的虚拟连接
            Channel channel = connection.createChannel();
            //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            //第一个参数:队列名称ID
            //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
            //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
            //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
            //其他额外的参数, null
            channel.queueDeclare(RabbitmqConstant.QUEUE_HELLOWORD, false, false, false, null);
            for (int i = 0; i < 10000; i++) {
                String message = "bijian" + i;
                //四个参数
                //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
                //队列名称
                //额外的设置属性
                //最后一个参数是要传递的消息字节数组
                channel.basicPublish("", RabbitmqConstant.QUEUE_HELLOWORD, null, message.getBytes());
            }
            channel.close();
            connection.close();
            System.out.println("发送消息结束");
        }
    }
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
            //连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.136");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("bijian");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("bjjian-vi-host");
    
            /*
             * 与rabbitmq服务器建立连接,
             * rabbitmq服务器端使用的是nio,会复用tcp连接,
             * 并开辟多个信道与客户端通信
             * 以减轻服务器端建立连接的开销
             */
            Connection connection = connectionFactory.newConnection();
            //创建通信“通道”,相当于TCP中的虚拟连接
            Channel channel = connection.createChannel();
            //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            //第一个参数:队列名称ID
            //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
            //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
            //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
            //其他额外的参数, null
            channel.queueDeclare(RabbitmqConstant.QUEUE_HELLOWORD, false, false, false, null);
    
            //从MQ服务器中获取数据
            //创建一个消息消费者
            //第一个参数:队列名
            //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
            //第三个参数要传入DefaultConsumer的实现类
            channel.basicConsume(RabbitmqConstant.QUEUE_HELLOWORD, false, new Reciver(channel));
        }
    }
    
    class Reciver extends DefaultConsumer {
        private Channel channel;
        //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
        public Reciver(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body);
            System.out.println("consumer接受到的消息为:" + message);
            System.out.println("消息的TgaId为:" + envelope.getDeliveryTag());
            //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    2.Work queues 工作队列模式:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
    应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
    在这里插入图片描述
    3.Pub/Sub 订阅模式:
    在这里插入图片描述
    在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
    P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    C:消费者,消息的接收者,会一直等待消息到来
    Queue:消息队列,接收消息、缓存消息
    Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    Fanout:广播,将消息交给所有绑定到交换机的队列
    Direct:定向,把消息交给符合指定routing key 的队列
    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
    在这里插入图片描述

    4.Routing 路由模式:
    队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key),
    Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息;
    Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
    在这里插入图片描述
    5.Topic 通配符模式:
    Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert;
    Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
    在这里插入图片描述
    工作模式总结
    1.简单模式 HelloWorld
    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
    2.工作队列模式 Work Queue
    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
    3.发布订阅模式 Publish/subscribe
    需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
    4.路由模式 Routing
    需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
    5.通配符模式 Topic
    需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

    生产者被正确发送到exchenge或进入队列的消息确认机制:
    在这里插入图片描述
    在这里插入图片描述
    代码如下

    //开启confirm监听模式
    channel.confirmSelect();
    channel.addConfirmListener(new ConfirmListener() {
        public void handleAck(long l, boolean b) throws IOException {
            //第二个参数代表接收的数据是否为批量接收,一般我们用不到。
            System.out.println("消息已被Broker接收,Tag:" + l);
        }
        public void handleNack(long l, boolean b) throws IOException {
            System.out.println("消息已被Broker拒收,Tag:" + l);
        }
    });
    
    channel.addReturnListener(new ReturnCallback() {
        public void handle(Return r) {
            System.out.println("===========================");
            System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
            System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey());
            System.err.println("Return主题:" + new String(r.getBody()));
            System.err.println("===========================");
        }
    });
    Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
    while (itr.hasNext()) {
        Map.Entry<String, String> me = itr.next();
        //Routing key 第二个参数相当于数据筛选的条件
        //第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
        channel.basicPublish(RabbitmqConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), true, null, me.getValue().getBytes());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Rabbbitmq整合spring:
    引入maven依赖:

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.1.7.RELEASE</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.8.RELEASE</version>
    </dependency>
    
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.5</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    加入配置类

    @Configuration
    @EnableRabbit  //@RabbitListener注解打开使用
    @Slf4j
    public class RabbitMQConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setHost("192.168.244.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("bijian");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("bjjian-vi-host");
        return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
       RabbitTemplate rabbitTemplate = new RabbitTemplate();
    
            rabbitTemplate.setConnectionFactory(amqpConnectionFactory);
    
            rabbitTemplate.setRetryTemplate(retryTemplate);
    
            rabbitTemplate.setMessageConverter(messageConverter);
    
            rabbitTemplate.setChannelTransacted(false);
    
            rabbitTemplate.setReturnCallback(returnCallback);
    
            rabbitTemplate.setConfirmCallback(confirmCallback);
    
            rabbitTemplate.setMandatory(false);
    
            return rabbitTemplate;
    }
    setConnectionFactory:设置spring-amqp的ConnectionFactory。
    
    setRetryTemplate:设置重试机制,详情见后文。
    
    setMessageConverter:设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。
    
    
    /**
     * 消息监听容器
     * 一个监听器,一个容器
     *
     * @param connectionFactory
     * @return
     */
    //@Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // 设置监听的队列
        // simpleMessageListenerContainer.setQueueNames(RabbitmqConstant.QUEUE_SPRING_TOPIC);
        // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
        simpleMessageListenerContainer.setConcurrentConsumers(3);
        // 最大的并发消费者
        simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
        // 设置是否重回队列
        simpleMessageListenerContainer.setDefaultRequeueRejected(false);
        // 设置签收模式
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置非独占模式
        simpleMessageListenerContainer.setExclusive(false);
        // 设置consumer未被 ack 的消息个数
        simpleMessageListenerContainer.setPrefetchCount(1);
        // 接收到消息的后置处理
        simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> {
            message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理");
            return message;
        });
        // 设置 consumer 的 tag
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            private AtomicInteger consumer = new AtomicInteger(1);
    
            @Override
            public String createConsumerTag(String queue) {
                return String.format("consumer:%s:%d", queue, consumer.getAndIncrement());
            }
        });
        // 设置消息监听器
        // simpleMessageListenerContainer.setMessageListener(springMQListener());
    
        /**  ================ 消息转换器的用法 ================
         simpleMessageListenerContainer.setMessageConverter(new MessageConverter() {
         // 将 java 对象转换成 Message 对象
         @Override public Message toMessage(Object object, MessageProperties messageProperties) {
         return null;
         }
    
         // 将 message 对象转换成 java 对象
         @Override public Object fromMessage(Message message) {
         return null;
         }
         });
         */
    
        /**  ================ 消息适配器的用法,用于处理各种不同的消息 ================
         MessageListenerAdapter adapter = new MessageListenerAdapter();
         // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等
         adapter.setDelegate(null);
         adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名");
    
         ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter();
         // 文本装换器
         MessageConverter txtMessageConvert = null;
         // json 转换器
         MessageConverter jsonMessageConvert = null;
    
         converters.addDelegate("text", txtMessageConvert);
         converters.addDelegate("html/text", txtMessageConvert);
         converters.addDelegate("text/plain", txtMessageConvert);
    
         converters.addDelegate("json", jsonMessageConvert);
         converters.addDelegate("json/*", jsonMessageConvert);
         converters.addDelegate("application/json", jsonMessageConvert);
    
         adapter.setMessageConverter(converters);
         simpleMessageListenerContainer.setMessageListener(adapter);
    
         */
        return simpleMessageListenerContainer;
    }
    
    
    
    
    
    
    这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    
        factory.setConnectionFactory(connectionFactory());
    
    
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());
    
        /**
         * 关于spring的AcknowledgeMode需要说明,他一共有三种模式:NONE,MANUAL,AUTO,默认是AUTO模式。这比RabbitMq原生多了一种。
         * 这一点很容易混淆,这里的NONE对应其实就是RabbitMq的自动确认,MANUAL是手动。而AUTO其实也是手动模式,
         * 只不过是Spring的一层封装,他根据你方法执行的结果自动帮你发送ack和nack。如果方法未抛出异常,则发送ack。
         * 如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队列,并且无限接收此消息。
         * 如果抛出异常时AmqpRejectAndDontRequeueException则发送nack不会重新入队列。
         */
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    
        factory.setConcurrentConsumers(1);
    
        factory.setMaxConcurrentConsumers(1);
    
        /**
         * 还有一点需要注意的是消费者有一个参数prefetch,
         * 它表示的是一个Channel预取的消息数量,
         * 这个参数只会在手动确认的消费者才生效。
         * 可以客户端利用这个参数来提高性能和做流量控制。
         * 如果prefetch设置的是10,当这个Channel上unacked的消息数量到达10条时,
         * RabbitMq便不会在向你发送消息,客户端如果处理的慢,便可以延迟确认在方法消息的接收。
         * 至于提高性能就非常容易理解,因为这个是批量获取消息,
         * 如果客户端处理的很快便不用一个一个去等着去新的消息。
         * SpringAMQP2.0开始默认是250,这个参数应该已经足够了。
         * 注意之前的版本默认值是1所以有必要重新设置一下值。
         * 当然这个值也不能设置的太大,RabbitMq是通过round robin这个策略来做负载均衡的,
         * 如果设置的太大会导致消息不多时一下子积压到一台消费者,不能很好的均衡负载。
         */
        factory.setPrefetchCount(250);
    
        factory.setChannelTransacted(false);
    
        factory.setTxSize(1);
    
        factory.setDefaultRequeueRejected(true);
    
        // factory.setErrorHandler(errorHandler);
    
        return factory;
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183

    setConnectionFactory:设置spring-amqp的ConnectionFactory。

    setMessageConverter:对于consumer端,MessageConverter也可以在这里配置。

    setAcknowledgeMode:设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。

    NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;

    AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。

    MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。

    setConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。

    setMaxConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。

    setPrefetchCount:设置每次请求发送给每个Consumer的消息数量。

    setChannelTransacted:设置Channel的事务。

    setTxSize:设置事务当中可以处理的消息数量。

    setDefaultRequeueRejected:设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。

    setErrorHandler:实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。

    
    /**
     * 这个比较简单,默认采用了Java序列化,我们一般使用的Json格式,所以配置了Jackson,根据自己的情况来,直接贴代码:
     *
     * @return
     */
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    }
    
    使用@RabbitListener注解
    
    public class RabbitMQListener {
        /**
         * 如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。
         * <p>
         * direct和topic类型的exchange需要routingKey
         *
         * @param message
         * @Header 注入消息头的单个属性
         * @Payload 注入消息体到一个JavaBean中
         * @Headers 注入所有消息头到一个Map中
         * 这里有一点主要注意,如果是com.rabbitmq.client.Channel,org.springframework.amqp.core.Message和org.springframework.messaging.Message这些类型,可以不加注解,直接可以注入。
         * 如果不是这些类型,那么不加注解的参数将会被当做消息体。不能多于一个消息体。如下方法ExampleEvent就是默认的消息体:
         */
        @RabbitListener(bindings = @QueueBinding(
    
                value = @Queue(value = "queue-fanout", durable = "true"),
    
                exchange = @Exchange(value = "exchange-fanout", durable = "true", type = ExchangeTypes.FANOUT),
                key = {"exchange.fanout.message"}
        )
        )
        public void listener1(Message message, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
                              @Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) {
            System.out.println("queue-exchange-fanout接收到的消息为:" + new String(message.getBody()));
            System.out.println(redelivered);
            try {
                int i = 1 / 0;
                channel.basicAck(deliveryTag, false);
            } catch (IOException e) {
                //这一步千万不要忘记,不会会导致消息未确认,消息到达连接的qos之后便不能再接收新消息
                //一般重试肯定的有次数,这里简单的根据是否已经重发过来来决定重发。第二个参数表示是否重新分发
                try {
                    channel.basicReject(deliveryTag, !redelivered);
                } catch (IOException ex) {
                    //ex.printStackTrace();
                }
                //这个方法我知道的是比上面多一个批量确认的参数
                //channel.basicNack(deliveryTag, false,!redelivered);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    【深入浅出Java并发编程指南】「原理分析篇」底层角度去分析线程的实现原理
    有了这个网站,妈妈再也不用担心我找不到好看的配图了!
    大型电商系统的订单设计
    SylixOS网卡多 IP 配置
    953a-954(fpdlink3)寄存器配置
    前端开发工程师:职业前景、工资、 具体工作
    XTU-OJ 1146-矩阵乘法
    Python3 Linux相关命令大全
    Restcloud ETL实践之数据行列转换
    【李航统计学习笔记】第二章:感知机
  • 原文地址:https://blog.csdn.net/qq_38166592/article/details/125623068