• 05-RabbitMQ之原生API使用


    使用RabbitMQ提供的原生客户端API进行交互。
    一、Maven依赖

    
       com.rabbitmq
       amqp-client
       5.9.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、创建连接以及声明队列
    1、首先创建连接,获取Channel

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(HOST_NAME);
    factory.setPort(HOST_PORT);
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setVirtualHost("/mirror");
    connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2、声明queue队列
    (1)classic队列

    channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    
    • 1

    (2)Quorum队列
    需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum

    Map params = new HashMap<>();
    params.put("x-queue-type","quorum");
    channel.queueDeclare(QUEUE_NAME, true, false, false, params); //durable参数必须是true,exclusive必须是false
    
    • 1
    • 2
    • 3

    (3)Stream队列
    如果要声明一个Stream队列,则 x-queue-type参数要设置为stream

    Map params = new HashMap<>();
    params.put("x-queue-type","stream");
    params.put("x-max-length-bytes", 20_000_000_000L);
    params.put("x-stream-max-segment-size-bytes", 100_000_000);
    channel.queueDeclare(QUEUE_NAME, true, false, false, params); //durable参数必须是true,exclusive必须是false
    
    • 1
    • 2
    • 3
    • 4
    • 5

    三、Producer根据应用场景发送消息到queue

    channel.basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
    
    • 1

    其中exchange是一个Producer与queue的中间交互机制。可以让Producer把消息按一定的规则发送到不同的queue,不需要的话就传空字符串
    四、Consumer消费消息
    1、被动消费模式
    Consumer等待rabbitMQ服务器将message推送过来再消费,一般是启动一个一直挂起的线程进行等待。

    channel.basicConsume(String queue, boolean autoAck, Consumer callback)
    
    //autoAck为true则表示消息发送到该Consumer后就被Consumer消费掉了,不需要再往其他Consumer转发,false则会继续往其他Consumer转发
    //要注意如果每个Consumer一直为false,会导致消息不停的被转发,不停的吞噬系统资源,最终造成宕机
    
    • 1
    • 2
    • 3
    • 4

    2、主动消费模式
    Comsumer主动到rabbitMQ服务器上去获取指定的messge进行消费

    GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)
    
    • 1

    3、Stream队列消费
    消费Stream队列时,需要注意以下三点的设置:

    • channel必须设置basicQos属性
    • 正确声明Stream队列
    • 消费时需要指定offset
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = connection.createChannel();
    //1、这个属性必须设置。
    channel.basicQos(1);
    //2、声明Stream队列
    Map params = new HashMap<>();
    params.put("x-queue-type","stream");
    params.put("x-max-length-bytes", 20_000_000_000L);
    params.put("x-stream-max-segment-size-bytes", 100_000_000); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, params);
    //Consumer接口还一个实现QueueConsuemr,但是代码注释过期了。
    Consumer myconsumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope,
         AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("========================");
            String routingKey = envelope.getRoutingKey();
            System.out.println("routingKey >" + routingKey);
            String contentType = properties.getContentType();
            System.out.println("contentType >" + contentType);
            long deliveryTag = envelope.getDeliveryTag();
            System.out.println("deliveryTag >" + deliveryTag);
            System.out.println("content:" + new String(body, "UTF-8"));
            // (process the message components here ...)
            //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
            //没有答复过的消息,服务器会一直不停转发。
            channel.basicAck(deliveryTag, false);
        }
    };
    //3、消费时,必须指定offset。 可选的值:
    // first: 从日志队列中第一个可消费的消息开始消费
    // last: 消费消息日志中最后一个消息
    // next: 相当于不指定offset,消费不到消息。
    // Offset: 一个数字型的偏移量
    // Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
    Map consumeParam = new HashMap<>();
    consumeParam.put("x-stream-offset","last");
    channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);
    //channel.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    五、关闭连接,释放资源

    channel.close();
    connection.close();
    
    • 1
    • 2
  • 相关阅读:
    Debian | Vscode 安装与配置 C 环境
    Android logd日志原理
    Vue 过渡与动画
    ArrayList与顺序表(2)
    NFT协议OMNI因重入攻击损失1300ETH
    编辑SRT字幕,添加在视频中播放
    【Android知识笔记】RecyclerView专题
    WIFI版本云音响设置教程阿里云平台版本
    Matlab学习笔记(不定期更新)
    Java如何创建支付接口
  • 原文地址:https://blog.csdn.net/qq_39234967/article/details/126295128