• rabbitMQ


    安装

    docker

    rabbitmq_delayed_message_exchange插件需要从外部安装,github地址

    github地址 :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.9.0

    • Dockerfile
    FROM rabbitmq:3.8.11
    ADD ["rabbitmq_delayed_message_exchange-3.9.0.ez","/plugins/"]
    RUN ["ls","/plugins"]
    RUN ["rabbitmq-plugins","enable","rabbitmq_management"]
    RUN ["rabbitmq-plugins","enable","rabbitmq_delayed_message_exchange"]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # docker run 的时候暴露两个端口 15762 5762 
    # 添加用户 root root
    rabbitmqctl add_user root root
    # 为用户添加权限
    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
    rabbitmqctl set_user_tags root administrator
    
    #开通防火墙端口
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --zone=public --add-port=4369/tcp --permanent
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --zone=public --add-port=25672/tcp --permanent
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    概念

    在这里插入图片描述
    broker: 接收和分发消息的应用类似server
    channel:每次访问时会建立一个连接(connection),一个连接会有多个信道(channel)
    virtualhost: 小型的server 类似于mysql中有多个数据库

    测试

    • 工具类,主要提供channel
    public class MQUtil {
    //    获取信道
        public static Channel getChannel() throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("43.143.132.3");
            factory.setUsername("root");
            factory.setPassword("root");
            Connection connection = factory.newConnection();
            return connection.createChannel();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 生产者
    public class Producer implements Callable<String> {
        String QUEUE_NAME;
        Channel channel;
        Producer(String QUEUE_NAME, Channel channel){
            this.QUEUE_NAME = QUEUE_NAME;
            this.channel = channel;
        }
    
        @Override
        public String call() throws Exception {
            String msg;
            do {
                msg = WorkTest.buffer();
                if(Integer.parseInt(msg) > 10) break;
                System.out.println(Thread.currentThread().getName()+":"+new String(msg));
    //        队列名称,是否持久化,是否可以被多个消费者消费,是都断开连接,队列中的消息删除
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //        发送到哪个交换机,路由的key值,本次是队列的名称,其他参数信息,发送消息的消息体(二进制)
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            } while (true);
    
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 消费者
    public class Consumer implements Callable<String> {
        String QUEUE_NAME;
        Channel channel;
        Consumer(String QUEUE_NAME,Channel channel){
            this.QUEUE_NAME = QUEUE_NAME;
            this.channel = channel;
        }
    	@Override
    	public String call() throws Exception{
            System.out.println(Thread.currentThread().getName());
            DeliverCallback deliverCallback = (consumerTag,message)->{
                System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));
            };
            CancelCallback cancelCallback = (consumerTag)-> System.out.println("消息被取消");
    //        队列名,成功后是否自动应答(推荐false),消费成功的回调,消费未成功的回调
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 测试
    public class WorkTest {
        static int msg = 0;
        public static void main(String[] args) throws Exception{
            String QUEUE_NAME = "demo2";
            Channel channel = MQUtil.getChannel();
            for (int i = 0; i < 3; i++) {
                System.out.println("----开始----");
                FutureTask<String> ProductTask = new FutureTask<>(new Producer(QUEUE_NAME,channel));
                FutureTask<String> ConsumerTask = new FutureTask<>(new Consumer(QUEUE_NAME,channel));
                Thread p = new Thread(ProductTask,"producer"+i);
                Thread c = new Thread(ConsumerTask, "consumer" + i);
                p.start();
                c.start();
            }
        }
        public static synchronized String buffer(){//    线程共享
            msg++;
            return ""+msg;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    消息应答

    当消费者处理完消息之后,要向队列发送应答,自动应答是接收完消息之后就会应答,这里尝试手动应答

          DeliverCallback deliverCallback = (consumerTag,message)->{
                        System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));
            //            对哪一个消息进行应答,是否进行批量应答
                        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    };
    
    • 1
    • 2
    • 3
    • 4
    • 5

    持久化

    对生产者进行修改

    • 队列持久化
     # 第二参数改为true
     channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    • 1
    • 2

    之前声明的非持久化队列不能改,要新建队列

    • 消息持久化
    # 修改props为 MessageProperties.PERSISTENT_TEXT_PLAIN
    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    
    • 1
    • 2

    分发策略

    默认为轮询,不公平分发就是能者多劳

    • 不公平
    int prefetchCount = 0;
    channel.basicQos(prefetchCount);//0公平分发 1不公平分发
    
    • 1
    • 2
    • 预期值
      设置 prefetchCount ,预期值此消费者即为最大值,如果消息不够预期值,则分发取决于处理效率

    发布确认

    确保消息不会丢失
    就算 队列持久化,消息持久化,等到保存到磁盘上之后,收到确认才会确保消息不会丢失

    • 开启发布确认
    channel.confirmSelect();
    
    • 1

    每条要确认还是一次确认很多条呢

    单个确认(同步)

    每次发送的时候发布确认 ` channel.waitForConfirms();

    批量发布确认

    在发送多个消息后,确认一次

    异步确认发布

    发布单个确认,但是异步,而不是每次发一条数据都要等待确认(同步)

    • 使用
    1. 开启发布确认
    2. 设置针对发布成功和发布失败的回调函数
    3. 创建确认监听器

    注意不要每次创建队列!!!

      public static void main(String[] args) throws Exception {
            String QUEUE_NAME = "c4";
            Channel channel = MQUtil.getChannel();
    //        1.开启发布确认
            channel.confirmSelect();
    
            long begin = System.currentTimeMillis();
    
    //        2.异步确认监听
    //        成功回调函数  tag:消息的编号  multiple:是否批量确认
            ConfirmCallback ackCallback = (tag,multiple)->{};
    //        失败回调函数
            ConfirmCallback nackCallback = (tag,multiple)->{};
    //        3.创建监听器
            channel.addConfirmListener(ackCallback,nackCallback);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 1000; i++) {
                String msg = i+"";
                System.out.println(msg);
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println(end-begin);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    交换机

    生产者会将消息发送到交换机,交换机再转发给队列。

    • 临时队列 :不能持久化,没有具体名字,断开连接后自动删除
      创建 channel.queueDeclare().getQueue();这个方法返回随机分配的队列名

    • 绑定:交换机和队列的绑定,交换器就可以转发到绑定队列

    • 创建交换机 channel.exchangeDeclare(EXCHANGE_NAME, "类型");// 名字 类型

    三种订阅模式

    • fanout
      广播到所有队列
    • direct
      可以选择发送给哪一个队列,绑定需要设置一个rountingKey 生产需要一个rountingKey参数,这样消息就会发送到对应的queueu中
    • topic
      类似direct,但可以多播,解决了direct只能单播的问题
      • key规则
        . 连接多个key
        * 代替一个字符
        #代替0个或者多个字符(如果一个队列的绑定键为#,它将接收所有的消息)

    通配符用于队列绑定时,也就是绑定key匹配发送消息的key

    死信队列

    在这里插入图片描述
    死信需要的额外步骤

    1. 队列中传props要包含死信交换机名字(声明死信交换机)
    2. 队列中传props要包含死信交换机和死信队列绑定的key
    • consumer1
      dead_e 和 dead_q绑定
      norm_e norm_q 绑定
      新建交换机和队列
      配置死信key
      声明死信交换机
    public class c1 {
        public static void main(String[] args) throws Exception {
    //        初始化
            String NORMAL_EXCHANGE = "normal_exchange";
            String NORMAL_QUEUE = "normal_queue";
            String NORMAL_KEY = "normal_key";
            String DEAD_EXCHANGE = "dead_exchange";
            String DEAD_QUEUE = "dead_queue";
            String DEAD_KEY = "dead_key";
    //        配置props
            Map<String,Object> props = new HashMap<>();
            props.put("x-dead-letter-exchange",DEAD_EXCHANGE);//声明死信交换机`
            props.put("x-dead-letter-routing-key",DEAD_KEY); //设置死信key[死信交换机转发到死信队列的key]
            Channel channel = MQUtil.getChannel();
    //        配置交换机和队列
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,props);
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,NORMAL_KEY);//正常队列和正常交换机绑定
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_KEY);//死信队列和死信交换机绑定
            System.out.println("创建完毕");
            MQUtil.simpleConsumer(channel,NORMAL_QUEUE);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • consumer2
      正常接收死信消息
    MQUtil.simpleConsumer(MQUtil.getChannel(),DEAD_QUEUE);
    
    • 1
    • producer
      设置消息过期时间,如果消息过期将被设置为死信
      Channel channel = MQUtil.getChannel();
    //        设置ttl时间,将消息变为死信
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
            for (int i = 0; i < 10; i++) {
                String msg = i+"";
                channel.basicPublish(NORMAL_EXCHANGE,NORMAL_KEY,properties,msg.getBytes());
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    springboot 整合

    • 配置文件:创建交换机,队列,绑定交换机和队列
    • 生产者:只负责发送消息到交换机
    • 消费者:只负责从队列中接收消息
    配置文件
      //   创建交换机交换机
        @Bean("X")
        public DirectExchange XExchange() {
            return new DirectExchange(X_EXCHANGE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
      //  创建队列
        @Bean("dead_q")
        public Queue DeadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    //    绑定
        @Bean
        public Binding XA(@Qualifier("X") DirectExchange exchange,@Qualifier("A") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(A_KEY);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    一个配置文件的实例

    @Configuration
    public class conf01 {
        //    交换机配置
        public static final String X_EXCHANGE = "X";
        public static final String DEAD_EXCHANGE = "dead_exchange";
        public static final String A_QUEUE = "A";
        public static final String B_QUEUE = "B";
        public static final String DEAD_QUEUE = "dead_queue";
        public static final String DEAD_KAY = "dead_key";
        public static final String A_KEY = "A";
        public static final String B_KEY = "B";
    
        //    交换机
        @Bean("X")
        public DirectExchange XExchange() {
            return new DirectExchange(X_EXCHANGE);
        }
    
        @Bean("dead_e")
        public DirectExchange DeadExchange() {
            return new DirectExchange(DEAD_EXCHANGE);
        }
    
    //    队列
        @Bean("dead_q")
        public Queue DeadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        @Bean("A")
        public Queue AQueue() {
            Map<String, Object> props = new HashMap<>();
            props.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信交换机
            props.put("x-dead-letter-routing-key", DEAD_KAY);//设置死信key
            props.put("x-message-ttl", 20000);
            return QueueBuilder.durable(A_QUEUE).withArguments(props).build();
        }
    
        @Bean("B")
        public Queue BQueue() {
            Map<String, Object> props = new HashMap<>();
            props.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信交换机
            props.put("x-dead-letter-routing-key", DEAD_KAY);//设置死信key
            props.put("x-message-ttl", 1000);//设置有效时间
            return QueueBuilder.durable(B_QUEUE).withArguments(props).build();
        }
    //    绑定
        @Bean
        public Binding XA(@Qualifier("X") DirectExchange exchange,@Qualifier("A") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(A_KEY);
        }
        @Bean
        public Binding XB(@Qualifier("X") DirectExchange exchange,@Qualifier("B") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(B_KEY);
        }
        @Bean
        public Binding Dead(@Qualifier("dead_e") DirectExchange exchange,@Qualifier("dead_q") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_KAY);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    生产者
    @RestController
    public class Producer {
        public static final String X_EXCHANGE = "X";
        public static final String A_KEY = "A";
        public static final String B_KEY = "B";
        @Autowired
        private RabbitTemplate template;
        @GetMapping("rabbit/{msg}")
        public String setMessage(@PathVariable String msg){
            template.convertAndSend(X_EXCHANGE,A_KEY,msg);//向两个队列发送消息
            template.convertAndSend(X_EXCHANGE,B_KEY,msg);
            return msg+"发送成功";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    消费者

    消费者在springboot中是一种监听器的形式,用来监听队列

    @Component
    public class Consumer {
        public static final String DEAD_QUEUE = "dead_queue";
        public static final String A_QUEUE = "A";
        public static final String B_QUEUE = "B";
        @RabbitListener(queues = DEAD_QUEUE)//消费者接收来自两个队列的死信
        public void D(Message msg,Channel channel){
            System.out.println("DEAD:"+new String(msg.getBody()));
            System.out.println("---");
        }
        @RabbitListener(queues = A_QUEUE)//消费者接收来自两个队列的死信
        public void A(Message msg,Channel channel){
            System.out.println("A:"+new String(msg.getBody()));
        }
        @RabbitListener(queues = B_QUEUE)//消费者接收来自两个队列的死信
        public void B(Message msg,Channel channel){
            System.out.println("B:"+new String(msg.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    延迟队列(死信实现)

    • 需求:在注册十分钟后没有登录则发送通知信息
    • 死信实现:注册后发消息到队列,并设置队列ttl为10分钟,10分钟后消息过期自动转发到死信交换机,死信交换机路由到死信队列,最后被监听器获取
    • 代码实现:上面那个例子
    • 优化:生产信息创建消息时就设置ttl,而不是在队列中设置死ttl

    在这里插入图片描述

    • 缺点 死信只能排队执行,如果第二个消息ttl短于第一个死信消息,第二个消息不会按时过期

    用插件解决(延迟队列)

    前提是安装了插件rabbitmq_delayed_message_exchange

    无需新建死信交换机和死信队列,只需要以下结构
    p — x交换机—A队列----c

    修改的地方
    • 新建延迟交换机
        @Bean("X")
        public CustomExchange XExchange() {
            Map<String,Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type","direct");
            return new CustomExchange(X_EXCHANGE,"x-delayed-message",true,false,arguments);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 绑定
       @Bean
        public Binding XA(@Qualifier("X") CustomExchange exchange, @Qualifier("A") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(A_KEY).noargs();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 生产者
        @GetMapping("rabbitDelay/{ttl}")//设置消息延迟时间
        public String setDelayMessage(@PathVariable String ttl){
            template.convertAndSend(X_EXCHANGE,A_KEY,ttl,message -> {
                message.getMessageProperties().setDelay(Integer.parseInt(ttl));
                return message;
            });//向两个队列发送消息
            return ttl+"发送成功";
        }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    发布确认的回调函数

    目的还是为了防止消息丢失,如果交换机或者队列丢失,生产者不会有察觉,基于springboot的发布确认

    针对交换机
    • springboot配置文件中开启交换机确认消息
        publisher-confirm-type: correlated 
    
    • 1
    • 回调函数
    //定义回调接口
    @Component
    public class MyCallback implements RabbitTemplate.ConfirmCallback {
        /*
        -- 无论是否成功都会调用confirm函数
        1. correlationData 返回的信息,由发送方决定
        2. ack 成功为true
        3,失败原因
         */
        @Autowired
        RabbitTemplate template;
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("成功");
            }else{
                System.out.println("失败原因:"+cause);
            }
        }
    
        /**
         * callback是template内部接口,就算在这定义了也不会被调用,所以要将自定义的回调函数注入到template中
         * "@PostConstruct" 这个注解下的方法放在最后执行,因为如果template如果没有注入就执行inti,回报空指针异常
         */
        @PostConstruct
        public void init(){
            template.setConfirmCallback(this);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    针对队列

    和交换机很像

    1. 注入
            template.setReturnCallback(this);
    
    • 1
    1. 重写回调函数
    //    注意实现RabbitTemplate.ReturnCallback接口
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println(message+"被退回");
            System.out.println("原因:"+replyText);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    备用交换机

    也是针对交换机到队列这个过程
    在这里插入图片描述
    唯一要设置的就是当正常交换机发送失败后,应转发到备份交换机(fanout),备份交换机和备份队列正常写即可

       @Bean("E")
        public DirectExchange Exchange() {
            Map<String,Object> arguments = new HashMap<>();
            arguments.put("alternate-exchange",BACKUP_EXCHANGE);
            return ExchangeBuilder.directExchange(EXCHANGE).withArguments(arguments).build();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    一般备份交换机是fanout,绑定备份队列和报警队列,当收到消息时广播到这两个队列

    优先级队列

  • 相关阅读:
    分布式(一致性协议)之领导人选举( DotNext.Net.Cluster 实现Raft 选举 )
    小黑子—springMVC:第一章 请求处理与响应数据
    IDEA 新建 Maven 项目没有文件结构 pom 文件为空 解决方法
    物联网AI MicroPython传感器学习 之 ADXL345 3轴加速度传感器
    【刷题笔记9.25】LeetCode:相交链表
    Docker学习-Docker部署Golang项目和Mysql镜像
    修改Office2021 默认安装路径
    Flink 基础 -- 应用开发(项目配置)
    Django-(5)
    N皇后问题
  • 原文地址:https://blog.csdn.net/qq_51682771/article/details/127096790