• RabbitMQ:使用Java进行操作


    使用Java操作消息队列

    现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:

    1. <dependency>
    2. <groupId>com.rabbitmqgroupId>
    3. <artifactId>amqp-clientartifactId>
    4. <version>5.14.2version>
    5. dependency>

    依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:

    1. public static void main(String[] args) {
    2. //使用ConnectionFactory来创建连接
    3. ConnectionFactory factory = new ConnectionFactory();
    4. //设定连接信息,基操
    5. factory.setHost("192.168.0.12");
    6. factory.setPort(5672); //注意这里写5672,是amqp协议端口
    7. factory.setUsername("admin");
    8. factory.setPassword("admin");
    9. factory.setVirtualHost("/test");
    10. //创建连接
    11. try(Connection connection = factory.newConnection()){
    12. }catch (Exception e){
    13. e.printStackTrace();
    14. }
    15. }

    这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。

    1. try(Connection connection = factory.newConnection();
    2. Channel channel = connection.createChannel()){ //通过Connection创建新的Channel
    3. //声明队列,如果此队列不存在,会自动创建
    4. channel.queueDeclare("yyds", false, false, false, null);
    5. //将队列绑定到交换机
    6. channel.queueBind("yyds", "amq.direct", "my-yyds");
    7. //发布新的消息,注意消息需要转换为byte[]
    8. channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
    9. }catch (Exception e){
    10. e.printStackTrace();
    11. }

    其中queueDeclare方法的参数如下:

    • queue:队列的名称(默认创建后routingKey和队列名称一致)
    • durable:是否持久化。
    • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
    • autoDelete:是否自动删除。
    • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

    其中queueBind方法参数如下:

    • queue:需要绑定的队列名称。
    • exchange:需要绑定的交换机名称。
    • routingKey:不用多说了吧。

    其中basicPublish方法的参数如下:

    • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
    • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
    • props:其他的配置。
    • body:消息本体。

    执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

    并且此消息队列已经成功与amq.direct交换机进行绑定:

    那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. ConnectionFactory factory = new ConnectionFactory();
    3. factory.setHost("10.37.129.4");
    4. factory.setPort(5672);
    5. factory.setUsername("admin");
    6. factory.setPassword("admin");
    7. factory.setVirtualHost("/test");
    8. //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
    9. //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
    10. Connection connection = factory.newConnection();
    11. Channel channel = connection.createChannel();
    12. //创建一个基本的消费者
    13. channel.basicConsume("yyds", false, (s, delivery) -> {
    14. System.out.println(new String(delivery.getBody()));
    15. //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
    16. //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
    17. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    18. //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
    19. //为false,那么消息就会被丢弃
    20. //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
    21. //跟上面一样,最后一个参数为false,只不过这里省了
    22. //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    23. }, s -> {});
    24. }

    其中basicConsume方法参数如下:

    ●queue  -  消息队列名称,直接指定。
    ●autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
    ●deliver  -  消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
    ●cancel  -  当消费者取消订阅时进行的函数回调,这里暂时用不到。

    现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:


    我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

    现在我们把刚刚创建好的消息队列删除。

    官方文档:Spring AMQP

    前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:
     

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    接着我们需要配置RabbitMQ的地址等信息:

    1. spring:
    2. rabbitmq:
    3. addresses: 192.168.0.4
    4. username: admin
    5. password: admin
    6. virtual-host: /test

    这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:

    1. @Configuration
    2. public class RabbitConfiguration {
    3. @Bean("directExchange") //定义交换机Bean,可以很多个
    4. public Exchange exchange(){
    5. return ExchangeBuilder.directExchange("amq.direct").build();
    6. }
    7. @Bean("yydsQueue") //定义消息队列
    8. public Queue queue(){
    9. return QueueBuilder
    10. .nonDurable("yyds") //非持久化类型
    11. .build();
    12. }
    13. @Bean("binding")
    14. public Binding binding(@Qualifier("directExchange") Exchange exchange,
    15. @Qualifier("yydsQueue") Queue queue){
    16. //将我们刚刚定义的交换机和队列进行绑定
    17. return BindingBuilder
    18. .bind(queue) //绑定队列
    19. .to(exchange) //到交换机
    20. .with("my-yyds") //使用自定义的routingKey
    21. .noargs();
    22. }
    23. }

    接着我们来创建一个生产者,这里我们直接编写在测试用例中:

    1. @SpringBootTest
    2. class SpringCloudMqApplicationTests {
    3. //RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
    4. @Resource
    5. RabbitTemplate template;
    6. @Test
    7. void publisher() {
    8. //使用convertAndSend方法一步到位,参数基本和之前是一样的
    9. //最后一个消息本体可以是Object类型,真是大大的方便
    10. template.convertAndSend("amq.direct", "my-yyds", "Hello World!");
    11. }
    12. }

    现在我们来运行一下这个测试用例:

    可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

    现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:

    1. @Component //注册为Bean
    2. public class TestListener {
    3. @RabbitListener(queues = "yyds") //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    4. public void test(Message message){
    5. System.out.println(new String(message.getBody()));
    6. }
    7. }

    接着我们启动服务器:

    可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

    接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

    当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:

    1. @Test
    2. void publisher() {
    3. //会等待消费者消费然后返回响应结果
    4. Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
    5. System.out.println("收到消费者响应:"+res);
    6. }

    消费者这边只需要返回一个对应的结果即可:

    1. @RabbitListener(queues = "yyds")
    2. public String receiver(String data){
    3. System.out.println("一号消息队列监听器 "+data);
    4. return "收到!";
    5. }

    测试没有问题:

    那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?

    1. @Data
    2. public class User {
    3. int id;
    4. String name;
    5. }

    1. @Configuration
    2. public class RabbitConfiguration {
    3. ...
    4. @Bean("jacksonConverter") //直接创建一个用于JSON转换的Bean
    5. public Jackson2JsonMessageConverter converter(){
    6. return new Jackson2JsonMessageConverter();
    7. }
    8. }

    接着我们只需要指定转换器就可以了:

    1. @Component
    2. public class TestListener {
    3. //指定messageConverter为我们刚刚创建的Bean名称
    4. @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
    5. public void receiver(User user){ //直接接收User类型
    6. System.out.println(user);
    7. }
    8. }

    现在我们直接在管理页面发送:

    {"id":1,"name":"LB"}

    可以看到成功完成了转换,并输出了用户信息:

    同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:

    1. @Test
    2. void publisher() {
    3. template.convertAndSend("amq.direct", "yyds", new User());
    4. }

    可以看到后台的数据类型为:

    这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。

  • 相关阅读:
    基于ISO13209(OTX)实现EOL下线序列
    mysql什么时候会发生file sort
    玩转 MaxCompute SQL 训练营! 数据分析挖掘迅速出师
    CCF推荐国际会议接收率(持续更新)
    centos8/centos9修改了静态IP地址,不生效,nmcli配置静态IP
    Spring使用(三)
    【OpenGL】七、混合
    考研回忆录【二本->211】
    可观测产品合集
    刷题知识回顾《二》LRU缓存详解
  • 原文地址:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/126082979