• RabbitMQ - Spring boot 整合 RabbitMQ


    在这里插入图片描述

    一、RabbitMQ

    1、RabbitMQ 使用场景

    1.1、服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密
    在这里插入图片描述

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可在这里插入图片描述

    1.2、流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对
    在这里插入图片描述
    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
    在这里插入图片描述
    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景
    在这里插入图片描述

    1.3、异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢
    在这里插入图片描述
    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
    在这里插入图片描述

    2、rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    在这里插入图片描述

    2.1、Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    在这里插入图片描述

    2.2、Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    2.3、Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    2.4、Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    3、rabbitmq安装

    3.1、Docker 启动Rabbitmq

    下载镜像,rabbitmq:management 镜像中已经安装了管理界面

    docker pull rabbitmq:management
    
    
    • 1
    • 2

    关闭防火墙

    systemctl stop firewalld
    systemctl disable firewalld
     
    # 重启 docker 系统服务
    systemctl restart docker
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    配置管理员用户名和密码

    mkdir /etc/rabbitmq
    vim /etc/rabbitmq/rabbitmq.conf
    
    # 添加两行配置:
    default_user = admin
    default_pass = admin
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    启动Rabbitmq

    docker run -d --name rabbit \
    -p 5672:5672 \
    -p 15672:15672 \
    -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
    -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
    --restart=always \
    rabbitmq:management
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    访问管理控制台 http://192.168.64.140:15672
    用户名密码是 admin

    3.2、离线安装

    下载离线安装包文件(https://download.csdn.net/download/weixin_38305440/12265906)

    上传离线安装包
    rabbitmq-install 目录上传到 /root

    切换到rabbitmq-install目录

    cd rabbitmq-install
    
    • 1

    安装

    rpm -ivh *.rpm
    
    • 1

    3.3、Yum在线安装

    以下内容来自 RabbitMQ 官方手册

    rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
    
    
    # centos7 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    # centos6 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    yum makecache
    
    yum install socat
    
    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
    rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
    
    yum install rabbitmq-server
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    3.4、启动rabbitmq服务器

    # 设置服务,开机自动启动
    systemctl enable rabbitmq-server
    
    # 启动服务
    systemctl start rabbitmq-server
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.5、rabbitmq管理界面

    3.5.1、启用管理界面
    # 开启管理界面插件
    rabbitmq-plugins enable rabbitmq_management
    
    # 防火墙打开 15672 管理端口
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    3.5.2、重启RabbitMQ服务
    systemctl restart rabbitmq-server
    
    • 1
    3.5.3、访问

    访问服务器的15672端口,例如:
    http://192.168.64.140:15672

    3.6、添加用户

    3.6.1、添加用户
    # 添加用户
    rabbitmqctl add_user admin admin
    
    # 新用户设置用户为超级管理员
    rabbitmqctl set_user_tags admin administrator
    
    • 1
    • 2
    • 3
    • 4
    • 5
    3.6.2、设置访问权限

    在这里插入图片描述
    在这里插入图片描述
    用户管理参考(https://www.cnblogs.com/AloneSword/p/4200051.html)

    3.7、开放客户端连接端口

    # 打开客户端连接端口
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --reload
    
    • 1
    • 2
    • 3

    主要端口介绍:
    4369 – erlang发现口
    5672 – client端通信口
    15672 – 管理界面ui端口
    25672 – server间内部通信口

    4、rabbitmq六种工作模式

    4.1、简单模式

    在这里插入图片描述
    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    发送消息的程序是生产者
    队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    消费者等待从队列接收消息
    在这里插入图片描述

    4.1.1、pom.xml

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.tedu</groupId>
    	<artifactId>rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>5.4.3</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.8.0</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    4.1.2、生产者发送消息
    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
       
    	public static void main(String[] args) throws Exception {
       
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    4.1.3消费者接收消息
    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
       
    	public static void main(String[] args) throws Exception {
       
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
       
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
       
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
       
    			@Override
    			public void handle(String consumerTag) throws IOException {
       
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    4.2、工作模式

    在这里插入图片描述
    在这里插入图片描述
    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    4.2.1、生产者发送消息

    这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
       
    	public static void main(String[] args) throws Exception {
       
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		//参数:queue,durable,exclusive,autoDelete,arguments
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		while (true) {
       
    		    //控制台输入的消息发送到rabbitmq
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			//如果输入的是"exit"则结束生产者进程
    			if ("exit".equals(msg)) {
       
    				break;
    			}
    			//参数:exchage,routingKey,props,body
    			ch.basicPublish("", "helloworld", null, msg.getBytes());
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    4.2.2、消费者接收消息
    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
       
    	public static void main(String[] args) throws Exception {
       
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
       
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
       
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    
    				//遍历字符串中的字符,每个点使进程暂停一秒
    				for (int i = 0; i < msg.length(); i++) {
       
    					if (msg.charAt(i)=='.') {
       
    						try {
       
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
       
    						}
    					}
    				}
    				System.out.println("处理结束");
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
       
    			@Override
    			public void handle(String consumerTag) throws IOException {
       
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    4.2.3、运行测试
       运行:
               **一个生产者**
               **两个消费者**
               
       生产者发送多条消息,
       如: 1,2,3,4,5. 两个消费者分别收到: 
              **消费者一: 1,3,5**  
              **消费者二: 2,4**   
              
       rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    4.2.4、消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?
    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

        如果生产者发送以下消息:
        1…
    
        2
    
        3
    
        4
    
        5
        两个消费者分别收到:
                                   消费者一: 1…, 3, 5
                                   消费者二: 2, 4
        当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理                           
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
       
    	public static void main(String[] args) throws Exception {
       
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
       
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
       
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
       
    					if (msg.charAt(i)=='.') {
       
    						try {
       
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
       
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
       
    			@Override
    			public void handle(String consumerTag) throws IOException {
       
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("helloworld", false, callback, cancel);
    	}
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

       忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, 
       rabbitmq会吃  掉越来越多的内存
    
       可以使用下面命令打印工作队列中未确认消息的数量:
    
    • 1
    • 2
    • 3
    • 4

    rabbitmqctl list_queues name messages_ready messages_unacknowledged

       当处理消息时异常中断, 可以选择让消息重回队列重新发送.    
       nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:
    
    • 1
    • 2

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)

    4.2.5、合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    在这里插入图片描述

    4.2.6、消息持久化

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    
                    
  • 相关阅读:
    01_网络编程_传统IO
    DP:解决路径问题
    【JAVA】快速排序
    Java:如何加速Java中的大型集合处理
    AWS云服务器EC2实例实现ByConity快速部署
    读书笔记:《浪潮之巅:上》
    【深度学习实验】网络优化与正则化(三):随机梯度下降的改进——Adam算法详解(Adam≈梯度方向优化Momentum+自适应学习率RMSprop)
    民宿租赁系统全栈开发:Java+SpringBoot+Vue+MySQL
    PX4模块设计之三十:Hysteresis类
    从零开始写 Docker(十七)---容器网络实现(中):为容器插上”网线“
  • 原文地址:https://blog.csdn.net/weixin_54685622/article/details/137646817