• Spring整合RabbitMQ-注解方式


    maven导入

                <dependency>
                    <groupId>org.springframework.amqpgroupId>
                    <artifactId>spring-rabbitartifactId>
                    <version>2.2.7.RELEASEversion>
                dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    5.2.1 消息的生产者
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.MessagePropertiesBuilder;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.support.AbstractApplicationContext;
    
    import java.nio.charset.StandardCharsets;
    
    public class ProducterApplication {
    
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
    		
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
    		
            //构造消息属性对象
            MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()
                	//设置消息的类型为文本
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                	//消息的编码方式为UTF-8
                    .setContentEncoding(StandardCharsets.UTF_8.name())
                	//自定义消息头信息
                    .setHeader("test.header", "test.value")
                    .build();
    		//对象消息进行编码操作
            Message msg = MessageBuilder.withBody("你好 RabbitMQ!".getBytes(StandardCharsets.UTF_8))
                    .andProperties(msgBuild)
                    .build();
    
            template.send("ex.anno.fanout", "routing.anno", msg);
    
            context.close();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    RabbitConfig

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Configurable;
    import org.springframework.context.annotation.Bean;
    import java.net.URI;
    
    @Configurable
    public class RabbitConfig {
    
        /**
         * 连接工厂
         *
         * @return
         */
        @Bean
        public ConnectionFactory getConnectionFactory() {
            URI uri = URI.create("amqp://root:123456@node1:5672/%2f");
            ConnectionFactory factory = new CachingConnectionFactory(uri);
            return factory;
        }
    
        /**
         * RabbitTemplate
         */
        @Bean
        @Autowired
        public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
            return rabbitTemplate;
        }
    
    
        /**
         * RabbitAdmin
         */
        @Bean
        @Autowired
        public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
            RabbitAdmin admin = new RabbitAdmin(factory);
            return admin;
        }
    
        /**
         * Queue
         */
        @Bean
        public Queue queue() {
            Queue queue = QueueBuilder.nonDurable("queue.anno")
                    //是否排外,即是否只有当前这个连接才能看到。
                    //.exclusive()
                    //是否自动删除
                    //.autoDelete()
                    .build();
    
            return queue;
        }
    
        /**
         * Exchange
         */
        @Bean
        public Exchange exchange() {
            Exchange exchange = new FanoutExchange("ex.anno.fanout", false, false, null);
            return exchange;
        }
    
        /**
         * Binding
         */
        @Bean
        @Autowired
        public Binding binding(Queue queue, Exchange exchange) {
            //创建一个不指定参数的绑定
            Binding binding = BindingBuilder.bind(queue).to(exchange).with("routing.anno").noargs();
            return binding;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    提示:

    ConnectionFactory有三个实现

    CachingConnectionFactory 基于channel的缓存模式 最常用是这个。

    LocalizedQueueConnectionFactory 直接连接某个节点的方式。如果是集群,此种不太适合。

    SimpleRoutingConnectionFactory 在当前的连接工厂中按查找的KEY获取连接工厂。

    运行消息的生产者,查看消息发送信息

    [root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
    Listing exchanges for vhost / ...
    ┌────────────────────┬─────────┐
    │ name               │ type    │
    ├────────────────────┼─────────┤
    │ amq.fanout         │ fanout  │
    ├────────────────────┼─────────┤
    │ ex.anno.fanout     │ fanout  │
    ├────────────────────┼─────────┤
    │ ex.busi.topic      │ topic   │
    ├────────────────────┼─────────┤
    │ amq.rabbitmq.trace │ topic   │
    ├────────────────────┼─────────┤
    │ amq.headers        │ headers │
    ├────────────────────┼─────────┤
    │ amq.topic          │ topic   │
    ├────────────────────┼─────────┤
    │ amq.direct         │ direct  │
    ├────────────────────┼─────────┤
    │ ex.direct          │ direct  │
    ├────────────────────┼─────────┤
    │                    │ direct  │
    ├────────────────────┼─────────┤
    │ ex.routing         │ direct  │
    ├────────────────────┼─────────┤
    │ amq.match          │ headers │
    └────────────────────┴─────────┘
    [root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
    Listing bindings for vhost /...
    ┌────────────────┬─────────────┬──────────────────┬──────────────────┬──────────────┬───────────┐
    │ source_name    │ source_kind │ destination_name │ destination_kind │ routing_key  │ arguments │
    ├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
    │                │ exchange    │ queue.msg        │ queue            │ queue.msg    │           │
    ├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
    │                │ exchange    │ queue.anno       │ queue            │ queue.anno   │           │
    ├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
    │ ex.anno.fanout │ exchange    │ queue.anno       │ queue            │ routing.anno │           │
    ├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
    │ ex.direct      │ exchange    │ queue.msg        │ queue            │ routing.q1   │           │
    └────────────────┴─────────────┴──────────────────┴──────────────────┴──────────────┴───────────┘
    [root@nullnull-os ~]# rabbitmqctl list_queues --formatter pretty_table
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
    ┌────────────┬──────────┐
    │ name       │ messages │
    ├────────────┼──────────┤
    │ queue.msg  │ 0        │
    ├────────────┼──────────┤
    │ queue.anno │ 1        │
    └────────────┴──────────┘
    [root@nullnull-os ~]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    通过检查发现,消息已经成功的发送到了队列

    5.2.2 使用拉模式获取消息
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.support.AbstractApplicationContext;
    
    public class ConsumerGetApplication {
    
        public static void main(String[] args) throws Exception {
            //从指定类加载配制信息
            AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
            RabbitTemplate rabbit = context.getBean(RabbitTemplate.class);
    
            Message receive = rabbit.receive("queue.anno");
            String encoding = receive.getMessageProperties().getContentEncoding();
            System.out.println("消息信息:" + new String(receive.getBody(), encoding));
    
            context.close();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    RabbitConfig的配制

    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Configurable;
    import org.springframework.context.annotation.Bean;
    import java.net.URI;
    
    @Configurable
    public class RabbitConfig {
    
        /**
         * 连接工厂
         *
         * @return
         */
        @Bean
        public ConnectionFactory getConnectionFactory() {
            URI uri = URI.create("amqp://root:123456@node1:5672/%2f");
            ConnectionFactory factory = new CachingConnectionFactory(uri);
            return factory;
        }
    
        /**
         * RabbitTemplate
         */
        @Bean
        @Autowired
        public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
            return rabbitTemplate;
        }
    
    
        /**
         * RabbitAdmin
         */
        @Bean
        @Autowired
        public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
            RabbitAdmin admin = new RabbitAdmin(factory);
            return admin;
        }
    
        /**
         * Queue
         */
        @Bean
        public Queue queue() {
            Queue queue = QueueBuilder.nonDurable("queue.anno")
                    //是否排外,即是否只有当前这个连接才能看到。
                    //.exclusive()
                    //是否自动删除
                    //.autoDelete()
                    .build();
    
            return queue;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    运行主程序,检查控制台的输出。

    消息信息:你好 RabbitMQ!
    
    • 1

    至此使用拉模式,已经成功的获取队列中的数据。

    **5.2.3 使用推模式获取数据 **

    消费者处理的代码

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageListener {
    
    
        /**
         * com.rabbitmq.client.Channel to get access to the Channel channel对象
         * org.springframework.amqp.core.Message  message对象,可以直接操作原生的AMQP消息
         * org.springframework.messaging.Message to use the messaging abstraction counterpart
         *
         * @Payload-annotated 注解方法参数,该参数的值就是消息体。   method arguments including the support of validation
         * @Header-annotated 注解方法参数,访问指定的消息头字段的值。 method arguments to extract a specific header value, including standard AMQP headers defined by AmqpHeaders
         * @Headers-annotated 该注解的参数获取该消息的消息头的所有字段,参数集合类型对应的MAP argument that must also be assignable to java.util.Map for getting access to all headers.
         * MessageHeaders 参数类型,访问所有消息头字段  arguments for getting access to all headers.
         * MessageHeaderAccessor or AmqpMessageHeaderAccessor  访问所有消息头字段。
         * 

    * 消息监听 */ @RabbitListener(queues = "queue.anno") public void whenMessageCome(Message msg) throws Exception { String encoding = msg.getMessageProperties().getContentEncoding(); System.out.println("收到的消息:" + new String(msg.getBody(), encoding)); } /** // * 使用payload进行消费 // * // * 不可同时存在相同的队列被两个监听 // * // * @param data // */ //@RabbitListener(queues = "queue.anno") //public void whenMessageConsumer(@Payload String data) { // System.out.println("收到的消息:" + data); //} }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    此处存在两种方式,一种是接收Message作为参数,还有一种是使用@Payload接收内容作为参数

    配制处理

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Configurable;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.amqp.core.Queue;
    import java.net.URI;
    
    @EnableRabbit
    //@ComponentScan("com.nullnull.learn")
    @ComponentScan
    @Configurable //xml中也可以使用 启用@RabbitListener注解
    public class RabbitConfig {
    
    
        @Bean
        public ConnectionFactory connectionFactory() {
            URI uriInfo = URI.create("amqp://root:123456@node1:5672/%2f");
            return new CachingConnectionFactory(uriInfo);
        }
    
    
        @Bean
        @Autowired
        public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
            return new RabbitAdmin(factory);
        }
    
        @Bean
        @Autowired
        public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
            return new RabbitTemplate(factory);
        }
    
    
        @Bean
        public Queue queue() {
            return QueueBuilder.nonDurable("queue.anno").build();
        }
    
    
        /**
         * RabbitListener的容器管理对象
         * 

    * 使用监听器监听推送过来的消息。在一个应用中可能会有多个监听器。这些监听器是需要一个工厂管理起来的。 * * @return */ @Bean("rabbitListenerContainerFactory") @Autowired public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectFactory) { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); //要管理容器就得有连接 containerFactory.setConnectionFactory(connectFactory); containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO); //containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //containerFactory.setAcknowledgeMode(AcknowledgeMode.NONE); //设置并发的消费者,即可以同时存在10个消费都消费消息。 containerFactory.setConcurrentConsumers(10); //设置并发的最大消费者。 containerFactory.setMaxConcurrentConsumers(15); //按照批次处理消息消息。 containerFactory.setBatchSize(10); return containerFactory; } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    启动类

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    
    public class ConsumerListenerApplication {
    
        public static void main(String[] args) {
            new AnnotationConfigApplicationContext(RabbitConfig.class);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    再启动生产者

    对生产者作一点改造,让其发送多条

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.MessagePropertiesBuilder;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.support.AbstractApplicationContext;
    
    import java.nio.charset.StandardCharsets;
    
    public class ProducterApplication {
    
        public static void main(String[] args) throws Exception {
            AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
    
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
    
            MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setContentEncoding(StandardCharsets.UTF_8.name())
                    .setHeader("test.header", "test.value")
                    .build();
    
            for (int i = 0; i < 20; i++) {
                Message msg = MessageBuilder.withBody(("你好 RabbitMQ! id :" + i).getBytes(StandardCharsets.UTF_8))
                        .andProperties(msgBuild)
                        .build();
    
                template.send("ex.anno.fanout", "routing.anno", msg);
            }
    
            context.close();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    客户端接收,查看控制台

    收到的消息:你好 RabbitMQ! id :4
    收到的消息:你好 RabbitMQ! id :9
    收到的消息:你好 RabbitMQ! id :8
    收到的消息:你好 RabbitMQ! id :7
    收到的消息:你好 RabbitMQ! id :6
    收到的消息:你好 RabbitMQ! id :2
    收到的消息:你好 RabbitMQ! id :3
    收到的消息:你好 RabbitMQ! id :5
    收到的消息:你好 RabbitMQ! id :14
    收到的消息:你好 RabbitMQ! id :17
    收到的消息:你好 RabbitMQ! id :1
    收到的消息:你好 RabbitMQ! id :0
    收到的消息:你好 RabbitMQ! id :13
    收到的消息:你好 RabbitMQ! id :15
    收到的消息:你好 RabbitMQ! id :12
    收到的消息:你好 RabbitMQ! id :16
    收到的消息:你好 RabbitMQ! id :18
    收到的消息:你好 RabbitMQ! id :19
    收到的消息:你好 RabbitMQ! id :11
    收到的消息:你好 RabbitMQ! id :10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    通过观察发现,此处接收的顺序与并非发送的顺序进行的接收,这是因为批量以及并发的控制在这里起的作用,如果要按顺序,去接批量及并发则就是按顺序接收。

  • 相关阅读:
    “Java基础全方位解析,从入门到精通“
    字节系列「夏日游乐场 周周赚88元」进度与骰子次数R语言回归分析
    c++中的重载
    算法与数据结构 --- 遍历二叉树和线索二叉树
    CAPL中的键值对(hash)数据类型
    基于ssm的设备信息管理系统
    TMS320F28374S之DAC
    liunx docker 安装 nginx:stable-alpine 后报500错误
    Linux系统Redis安装教程-附带后台启动
    Faster Rcnn
  • 原文地址:https://blog.csdn.net/bug_null/article/details/132841720