• docker启动rabbitmq及使用


    搜索rabbitmq镜像

    docker search rabbitmq:management

    在这里插入图片描述

    下载镜像

    docker pull rabbitmq:management

    在这里插入图片描述

    启动容器

    docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

    在这里插入图片描述
    在这里插入图片描述

    打印容器

    docker logs rabbitmq

    在这里插入图片描述
    在这里插入图片描述

    访问RabbitMQ Management

    http://localhost:15672
    账户密码默认:guest
    在这里插入图片描述

    编写生产者类

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
            /**
             * 生成一个queue队列
             * 1、队列名称 QUEUE_NAME
             * 2、队列里面的消息是否持久化(默认消息存储在内存中)
             * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
             * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
             * 5、其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message = "Hello world!";
            /**
             * 发送一个消息
             * 1、发送到哪个exchange交换机
             * 2、路由的key
             * 3、其他的参数信息
             * 4、消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println(" [x] Sent '"+message+"'");
    
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    运行该方法,可以看到控制台的打印
    在这里插入图片描述
    name=hello的队列收到Message
    在这里插入图片描述

    消费者

    package com.xun.rabbitmqdemo.example;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Receiver {
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setConnectionTimeout(600000);//milliseconds
            factory.setRequestedHeartbeat(60);//seconds
            factory.setHandshakeTimeout(6000);//milliseconds
            factory.setRequestedChannelMax(5);
            factory.setNetworkRecoveryInterval(500);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            System.out.println("Waiting for messages. ");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME,true,consumer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    在这里插入图片描述
    在这里插入图片描述

    工作队列

    RabbitMqUtils工具类

    package com.xun.rabbitmqdemo.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMqUtils {
        public static Channel getChannel() throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("guest");
            factory.setPassword("guest");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    启动2个工作线程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
            };
            System.out.println("C1 消费者启动等待消费....");
            /**
             * 消费者消费消息
             * 1、消费哪个队列
             * 2、消费成功后是否自动应答
             * 3、消费的接口回调
             * 4、消费未成功的接口回调
             */
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    
    
    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    
    public class Work02 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String receivedMessage = new String(delivery.getBody());
                System.out.println("接收消息:"+receivedMessage);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
            };
            System.out.println("C2 消费者启动等待消费....");
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    启动工作线程
    在这里插入图片描述

    启动发送线程

    package com.xun.rabbitmqdemo.workQueue;
    
    import com.rabbitmq.client.Channel;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task01 {
        private static final String QUEUE_NAME = "hello";
        public static void main(String[] args) throws Exception{
            try(Channel channel= RabbitMqUtils.getChannel();){
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //从控制台接收消息
                Scanner scanner = new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.next();
                    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                    System.out.println("发送消息完成:"+message);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动发送线程,此时发送线程等待键盘输入
    在这里插入图片描述
    发送4个消息
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    可以看到2个工作线程按照顺序分别接收message。

    消息应答机制

    rabbitmq将message发送给消费者后,就会将该消息标记为删除。
    但消费者在处理message过程中宕机,会导致消息的丢失。
    因此需要设置手动应答。

    生产者

    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;
    
    public class Task02 {
        private static final String TASK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            try(Channel channel = RabbitMqUtils.getChannel()){
                channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入信息");
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                    System.out.println("生产者task02发出消息"+ message);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者

    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work03 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work03 等待接收消息处理时间较短");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(1);
                System.out.println("接收到消息:"+message);
                /**
                 * 1、消息的标记tag
                 * 2、是否批量应答
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
            };
            //采用手动应答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    
    
    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;
    
    public class Work04 {
        private static final String ACK_QUEUE_NAME = "ack_queue";
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("Work04 等待接收消息处理时间较长");
            DeliverCallback deliverCallback = (consumerTag,delivery)->{
                String message = new String(delivery.getBody());
                SleepUtils.sleep(30);
                System.out.println("接收到消息:"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            };
            CancelCallback cancelCallback = (consumerTag)->{
                System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
            };
            //采用手动应答
            boolean autoAck = false;
            channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    工具类SleepUtils

    package com.xun.rabbitmqdemo.utils;
    public class SleepUtils {
        public static void sleep(int second){
            try{
                Thread.sleep(1000*second);
            }catch (InterruptedException _ignored){
                Thread.currentThread().interrupt();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    模拟
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    work04等待30s后发出ack
    在这里插入图片描述
    在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    不公平分发

    上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
    不公平分发:

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    
    • 1
    • 2

    通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。
    在这里插入图片描述

  • 相关阅读:
    mysql—视图
    我干了8年测试,告诉你现在软件测试还能不能找到工作!
    RuoYi 若依后台管理系统存在SQL注入漏洞CVE-2023-49371
    Mysql的函数方法
    BOM与DOM--记录
    基于Halcon的喷码识别方法
    Java开发之框架(spring、springmvc、springboot、mybatis)【面试篇 完结版】
    Java 初学者必备核心基础知识有哪些?
    Scala基础教程--14--隐式转换
    证券相关业务知识
  • 原文地址:https://blog.csdn.net/web15286201346/article/details/126327425