• SpringBoot整合RabbitMQ


    RabbitMQ安装部署详情可见RabbitMQ简介及在Linux中安装部署(yum)

     一、导入pom.xml依赖

    1. <dependency>
    2. <groupId>com.rabbitmqgroupId>
    3. <artifactId>amqp-clientartifactId>
    4. <version>5.8.0version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.slf4jgroupId>
    8. <artifactId>slf4j-nopartifactId>
    9. <version>1.7.29version>
    10. dependency>

    二、入门测试案例

      发送消息类:Send.java

    1. public class Send {
    2. private final static String QUEUE_NAME = "hello";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置RabbitMQ地址
    7. factory.setHost("192.168.119.129");
    8. factory.setUsername("admin");
    9. factory.setPassword("password");
    10. //建立连接
    11. Connection connection = factory.newConnection();
    12. //获得信道
    13. Channel channel = connection.createChannel();
    14. //声明队列
    15. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    16. //发布消息
    17. String message = "你好,老6";
    18. channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    19. System.out.println("发送了消息:" + message);
    20. //关闭连接
    21. channel.close();
    22. connection.close();
    23. }
    24. }

      接收消息类:Recv.java

    1. public class Recv {
    2. private final static String QUEUE_NAME = "hello";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置RabbitMQ地址
    7. factory.setHost("192.168.119.129");
    8. factory.setUsername("admin");
    9. factory.setPassword("password");
    10. //建立连接
    11. Connection connection = factory.newConnection();
    12. //获得信道
    13. Channel channel = connection.createChannel();
    14. //声明队列
    15. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    16. //接收消息并消费
    17. channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
    18. @Override
    19. public void handleDelivery(String consumerTag, Envelope envelope,
    20. BasicProperties properties, byte[] body) throws IOException {
    21. String message = new String(body, "UTF-8");
    22. System.out.println("收到消息:" + message);
    23. }
    24. });
    25. }
    26. }

    运行结果:            ​​​​​

     三、多任务消息队列案例

    发送多任务消息类NewTask.java

    1. public class NewTask {
    2. private final static String TASK_QUEUE_NAME = "task_queue";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置RabbitMQ地址
    7. factory.setHost("192.168.119.129");
    8. factory.setUsername("admin");
    9. factory.setPassword("password");
    10. //建立连接
    11. Connection connection = factory.newConnection();
    12. //获得信道
    13. Channel channel = connection.createChannel();
    14. //声明队列
    15. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    16. for (int i = 0; i < 10; i++) {
    17. String message;
    18. if (i % 2 == 0) {
    19. message = i + "...";
    20. } else {
    21. message = String.valueOf(i);
    22. }
    23. channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
    24. System.out.println("发送了消息:" + message);
    25. }
    26. channel.close();
    27. connection.close();
    28. }
    29. }

     接收多任务消息类Worker.java

    1. public class Worker {
    2. private final static String TASK_QUEUE_NAME = "task_queue";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置RabbitMQ地址
    7. factory.setHost("192.168.119.129");
    8. factory.setUsername("admin");
    9. factory.setPassword("password");
    10. //建立连接
    11. Connection connection = factory.newConnection();
    12. //获得信道
    13. final Channel channel = connection.createChannel();
    14. //声明队列
    15. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    16. System.out.println("开始接收消息");
    17. channel.basicQos(1);
    18. channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
    19. @Override
    20. public void handleDelivery(String consumerTag, Envelope envelope,
    21. BasicProperties properties, byte[] body) throws IOException {
    22. String message = new String(body, "UTF-8");
    23. System.out.println("收到了消息:" + message);
    24. try {
    25. doWork(message);
    26. } finally {
    27. System.out.println("消息处理完成");
    28. channel.basicAck(envelope.getDeliveryTag(), false);
    29. }
    30. }
    31. });
    32. }
    33. private static void doWork(String task) {
    34. char[] chars = task.toCharArray();
    35. for (char ch : chars) {
    36. if (ch == '.') {
    37. try {
    38. Thread.sleep(1000);
    39. } catch (InterruptedException e) {
    40. e.printStackTrace();
    41. }
    42. }
    43. }
    44. }
    45. }

    运行结果:

    四、接收指定消息案例

    发送消息类:EmitLogDirect.java

    1. public class EmitLogDirect {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("192.168.119.129");
    6. factory.setUsername("admin");
    7. factory.setPassword("password");
    8. Connection connection = factory.newConnection();
    9. Channel channel = connection.createChannel();
    10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    11. String message = "info:Hello World!";
    12. channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
    13. System.out.println("发送了消息," + "等级为info,消息内容:" + message);
    14. message = "warning:Hello World!";
    15. channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
    16. System.out.println("发送了消息," + "等级为warning,消息内容:" + message);
    17. message = "error:Hello World!";
    18. channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
    19. System.out.println("发送了消息," + "等级为error,消息内容:" + message);
    20. channel.close();
    21. connection.close();
    22. }
    23. }

    接收所有消息类:ReceiveLogsDirect1.java

    1. public class ReceiveLogsDirect1 {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("192.168.119.129");
    6. factory.setUsername("admin");
    7. factory.setPassword("password");
    8. Connection connection = factory.newConnection();
    9. Channel channel = connection.createChannel();
    10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    11. //生成一个随机的临时的queue
    12. String queueName = channel.queueDeclare().getQueue();
    13. //一个交换机同时绑定3个queue
    14. channel.queueBind(queueName, EXCHANGE_NAME, "info");
    15. channel.queueBind(queueName, EXCHANGE_NAME, "warning");
    16. channel.queueBind(queueName, EXCHANGE_NAME, "error");
    17. System.out.println("开始接收消息");
    18. Consumer consumer = new DefaultConsumer(channel) {
    19. @Override
    20. public void handleDelivery(String consumerTag, Envelope envelope,
    21. BasicProperties properties, byte[] body) throws IOException {
    22. String message = new String(body, "UTF-8");
    23. System.out.println("收到消息:" + message);
    24. }
    25. };
    26. channel.basicConsume(queueName, true, consumer);
    27. }
    28. }

    仅仅接收error消息类:ReceiveLogsDirect2.java

    1. public class ReceiveLogsDirect2 {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("192.168.119.129");
    6. factory.setUsername("admin");
    7. factory.setPassword("password");
    8. Connection connection = factory.newConnection();
    9. Channel channel = connection.createChannel();
    10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    11. //生成一个随机的临时的queue
    12. String queueName = channel.queueDeclare().getQueue();
    13. //一个交换机绑定1个queue
    14. channel.queueBind(queueName, EXCHANGE_NAME, "error");
    15. System.out.println("开始接收消息");
    16. Consumer consumer = new DefaultConsumer(channel) {
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope,
    19. BasicProperties properties, byte[] body) throws IOException {
    20. String message = new String(body, "UTF-8");
    21. System.out.println("收到消息:" + message);
    22. }
    23. };
    24. channel.basicConsume(queueName, true, consumer);
    25. }
    26. }

    运行结果:

     

  • 相关阅读:
    Docker 从构建开始导出一个镜像
    POI导出excel文件之取消合并单元格、删除列、移动列
    vue父子组件传值:父传子、子传父
    【路径最全用法】python代码讲解os.path包的最全用法
    基于java毕业生能力调查评价系统计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署
    【微服务解耦之事件启动】Spring Boot 解耦之事件驱动
    分库分表MySQL
    (十二)判断Python中变量是否是函数
    【密码学】DES 介绍
    Spring Security入门教程,springboot整合Spring Security
  • 原文地址:https://blog.csdn.net/yueyue763184/article/details/127915723