• RabbitMQ消息发送和接收(实例)


    消息发送:

    1.首先启动rabbitmq

    2.查看防火墙状态,如果是开启状态则需要关闭防火墙

    3.通过浏览器访问rabbitmq控制台,ip+15672端口号 ,例如http://192.168.174.129:15672
    登录时输入自己的此前设置的登录名和密码

    4.打开idea,创建rabbitmq-product-java模块
    导入依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.rabbitmqgroupId>
    4. <artifactId>amqp-clientartifactId>
    5. <version>5.1.1version>
    6. dependency>
    7. dependencies>

    编写主程序代码

    1. package com.it.rabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class Product {
    8. public static void main(String[] args) {
    9. //创建连接工厂
    10. ConnectionFactory factory=new ConnectionFactory();
    11. //配置rabbitMQ的连接信息
    12. factory.setHost("192.168.174.129");
    13. factory.setPort(5672);
    14. factory.setUsername("root");
    15. factory.setPassword("123456");
    16. //定义连接
    17. Connection connection=null;
    18. //定义通道
    19. Channel channel=null;
    20. try {
    21. connection=factory.newConnection();//获取连接
    22. channel=connection.createChannel();//获取通道
    23. /**
    24. * 声明一个队列
    25. * 参数1:为队列名取任意值
    26. * 参数2:是否为持久化队列
    27. * 参数3:是否排外,如果排外则这个队列只允许一个消费者监听
    28. * 参数4:是否自动删除,如果为true则表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列。
    29. * 参数5:为队列的一些属性设置通常为null即可
    30. */
    31. channel.queueDeclare("myQueue",true,false,false,null);
    32. //定义消息
    33. String message="我的RabbitMQ的测试消息";
    34. /**
    35. * 发送消息
    36. * 参数1:为交换机名称,这里为空字符串表示不使用交换机
    37. * 参数2:为队列名或RoutingKey,当指定了交换机名称以后这个值就是RoutingKey
    38. * 参数3:为消息的属性信息,通常为空即可
    39. * 参数4:为具体的消息数据的字节数组
    40. */
    41. channel.basicPublish("","myQueue",null,message.getBytes("utf-8"));
    42. System.out.println("消息发送成功!");
    43. } catch (IOException e) {
    44. e.printStackTrace();
    45. } catch (TimeoutException e) {
    46. e.printStackTrace();
    47. }finally {
    48. if (channel!=null){
    49. try {
    50. channel.close();
    51. } catch (IOException e) {
    52. e.printStackTrace();
    53. } catch (TimeoutException e) {
    54. e.printStackTrace();
    55. }
    56. }
    57. if (connection!=null){
    58. try {
    59. connection.close();
    60. } catch (IOException e) {
    61. e.printStackTrace();
    62. }
    63. }
    64. }
    65. }
    66. }

    运行代码

     

    通过rabbitmq控制台查看添加的消息

    点击该条消息进入消息详情页

    点击消息出队

     

    再次返回消息队列,此时消息队列已经没有了消息,消息已经模拟出队了

    4.模拟连续向队列中放两次消息,这两条消息的队列名称相同,内容不同。

    修改Idea消息的内容再向队列里发送一次消息

     运行一次

    进入rabbitmq控制台

     修改Idea消息的内容再向队列里发送一次消息

     运行一次

    进入rabbitmq控制台

     

     点击进入消息详细页

    模拟从队列中取出消息,查看取出的消息内容

    由此可知每次只能模拟取一条数据,取消息时符合队列先进先出的理念

     消息接收:

    1.在idea中创建rabbitmq-consumer-java的模块
    2.导入Maven依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.rabbitmqgroupId>
    4. <artifactId>amqp-clientartifactId>
    5. <version>5.1.1version>
    6. dependency>
    7. dependencies>

    3.创建主函数

    1. package com.it.rabbitmq;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer {
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. ConnectionFactory factory = new ConnectionFactory();
    8. factory.setUsername("root");
    9. factory.setPassword("root");
    10. factory.setHost("192.168.222.128");
    11. //建立到代理服务器到连接
    12. Connection conn = factory.newConnection();
    13. //获得信道
    14. final Channel channel = conn.createChannel();
    15. //声明队列
    16. channel.queueDeclare("myQueue", true, false, false, null);
    17. //消费消息
    18. boolean autoAck = true;
    19. String consumerTag = "";
    20. //接收消息
    21. //参数1 队列名称
    22. //参数2 是否自动确认消息 true表示自动确认 false表示手动确认
    23. //参数3 为消息标签 用来区分不同的消费者这里暂时为""
    24. // 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)
    25. //注意:使用了basicConsume方法以后,会启动一个线程持续的监听队列,如果队列中有信息的数据进入则会自动接收消息
    26. //因此不能关闭连接和通道对象
    27. channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) {
    28. @Override
    29. public void handleDelivery(String consumerTag,
    30. Envelope envelope,
    31. AMQP.BasicProperties properties,
    32. byte[] body) throws IOException {
    33. //获取消息数据
    34. String bodyStr = new String(body, "UTF-8");
    35. System.out.println("消费者-- "+bodyStr);
    36. }
    37. });
    38. // channel.close();
    39. // conn.close();
    40. }
    41. }

    4.运行主函数

    5.运行发送消息主函数


    6.再次运行接收消息主函数


    7.此时进入控制台查看队列,发现队列中的消息已经全部取出

    注意:

    1Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果有消费者监听队列则立即消费发送到队列中的消息

    2Queue的消息可以保证每个消息都一定能被消费
    3、消息接收编码时,不能关闭连接,一旦关闭就无法持续监听,照成功能失效。

  • 相关阅读:
    【LeetCode75】第五十四题 咒语和药水的成功对数
    Golang代码漏洞扫描工具介绍——trivy
    JS-闭包的用法
    通过Vue自带服务器实现Ajax请求跨域(vue-cli)
    GRPC MacOS M1 处理器的问题
    语音合成:概述【不等长序列关系建模的生成任务】
    redis安装包
    c#的反编译工具ISPY和net reflector 使用比较
    开源框架(三) :你知道mybatis的初始化是怎么使用建造者模式创建对象的吗
    [RoarCTF 2019]Easy Calc
  • 原文地址:https://blog.csdn.net/weixin_59334478/article/details/127715591