最近在准备面试,发现之前学习的RabbitMQ基本都忘了,于是乎,趁着复习准备的机会,顺便做一些RabbitMQ的知识整理工作
要了解RabbitMQ,首先需要了解什么是MQ
异步处理,想必大家都了解,就是把同步处理的事情变成异步来做,有效降低了处理时间,常见的如注册的时候发送注册邮件,发送注册短信码等



总结:
如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ;
如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或zeroMQ,kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!

<dependencies>
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>5.7.3version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.25version>
<scope>compilescope>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-lang3artifactId>
<version>3.9version>
dependency>
dependencies>
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
factory.setHost("192.168.204.141");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("test"); factory.setPassword("123123");
//3.通过工厂获得与MQ的连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception{
Connection connection = getConnection();
System.out.println("connection = " + connection);
connection.close();
}
}


即发送、传递、接收的过程
public class Sender {
public static void main(String[] args) throws Exception{
String msg = "Hello,RabbitMQ!";
// 1.获得连接
Connection connection = ConnectionUtil.getConnection();
// 2.在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 3.创建消息队列(1,2,3,4,5)
/*
参数1:队列的名称
参数2:队列中的数据是否持久化
参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
参数5:队列参数(没有参数为null)
*/
channel.queueDeclare("queue1",false,false,false,null);
// 4.向指定的队列发送消息(1,2,3,4)
/*
参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
参数2:目标队列的名称
参数3:设置消息的属性(没有属性则为null)
参数4:消息的内容(只接收字节数组)
*/
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送:" + msg);
// 5.释放资源
channel.close();
connection.close();
}
}
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认

public class Recer {
public static void main(String[] args) throws Exception{
// 1.获得连接
Connection connection = ConnectionUtil.getConnection();
// 2.在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 3.从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
//交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("queue1", true,consumer);
}
}
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0

从上面可以看出,消息一旦被消费,消息就会立刻从队列中移除,那么问题来了,RabbitMQ是如何感知到消息被消费了呢?
解决方案
基于上面的问题,RabbitMQ提供了一种ACK消息确认机制,当消费者获得消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
分类
代码
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);

如果生产者生产消息过多,而消费者来不及消费,这时候堆积的消息就会越来越多,那这时候我们该如何做呢?可以考虑增加消费者,让更多的消费者来消费消息便可解决这一情况
注意:如何解决消息堆积的情况?
采用工作队列的模式,多个消费者监听同一个队列,接收到消息后,通过线程池,进行异步消费

发布订阅模式其实本质上就是在工作队列的模式上进行了改造,不同的是,工作队列模式是每个任务都被准确地交付给一个工作者,而发布订阅模式是可以将每个任务交付给binding的每一个工作者,类似于广播的机制,广播给每一个消费者
流程
代码
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该 路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
String msg = "hello,大家好!";
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
System.out.println("生产者:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
// 绑定路由(关注)
/*
参数1:队列名
参数2:交换器名称
参数3:路由key(暂时无用,""即可)
*/
channel.queueBind("test_exchange_fanout_queue_1", "test_exchange_fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_fanout_queue_1", true,consumer);
}
}

路由会根据类型进行定向分发消息给不同的队列
代码实现
// 声明路由(路由名,路由类型)
// direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_exchange_direct", "direct");
// 绑定路由(如果路由键的类型是添加的话,绑定到这个队列1上)
channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert");
注意:

生产者
// 声明路由(路由名,路由类型,持久化)
channel.exchangeDeclare("test_exchange_topic", "topic",true); String msg = "商品降价";
// 发送消息(第三个参数作用是让消息持久化)
channel.basicPublish("test_exchange_topic", "product.price",MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
消费者
// 声明队列( 第二个参数为true:支持持久化)
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
代码示例
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_transaction", "topic");
channel.txSelect(); // 开启事务
try {
channel.basicPublish("test_transaction", "product.price", null, "商品 降价1".getBytes());
// System.out.println(1/0); // 模拟异常!
channel.basicPublish("test_transaction", "product.price", null, "商品 降价2".getBytes());
System.out.println("消息全部发出!");
channel.txCommit(); // 事务提交
}
catch (Exception e){
System.out.println("由于系统异常,消息全部撤回!");
channel.txRollback(); // 事务回滚
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}
}
RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量。试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚,太浪费性能了。那么有没有其它更好的解放方法呢 ?有,那就是Confirm模式,confirm模式采用补发第10条的措施来完成10条消息的送达
使用步骤
<rabbit:connection-factory id="connectionFactory"
host="192.168.204.141" port="5672"
username="test" password="123123"
virtual-host="/test" publisher-confirms="true" />
<rabbit:template id="rabbitTemplate" connection-
factory="connectionFactory" exchange="spring_topic_exchange"
message-converter="jsonMessageConverter" confirm-callback="msgSendConfirmCallback"/>
<bean id="msgSendConfirmCallback" class="confirm.MsgSendConfirmCallback"/>
@Component
public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("消息确认成功!!");
} else {
System.out.println("消息确认失败。。。");
// 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发
// 采用递归(固定次数,不可无限)或 redis+定时任务
}
}
}
生产者
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//获得rabbitTemplate的实例后,通过rabbitTemplate.convertAndSend()发送消息
假设一直情况:消费者客户端宕机了,Rabbitmq 服务器积压了成千上万条未处理的消息,修好消费者客户端后,随便打开一个消费者客户端,就会
出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据,就会被压垮崩溃,再次宕机
解决方案:
<rabbit:listener-container connection-factory="connectionFactory"
prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue- names="test_spring_queue_1" />
rabbit:listener-container>
订单中常用于过期订单,如30分钟自动取消
spring-rabbitmq-producer.xml
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000">entry>
rabbit:queue-arguments>
rabbit:queue>


ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 创建消息配置对象
MessageProperties messageProperties = new MessageProperties();
// 设置消息过期时间
messageProperties.setExpiration("6000");
// 创建消息
Message message = new Message("6秒后自动删除".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("msg.user", message);
注意:如果同时设置了queue和message的TTL值,则二者中较小的才会起作用

<rabbit:queue name="dlx_queue"/>
<rabbit:direct-exchange name="dlx_exchange" >
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue">rabbit:binding>
<rabbit:binding key="dlx_max" queue="dlx_queue">rabbit:binding>
rabbit:bindings>
rabbit:direct-exchange>
使用步骤:
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue" />
rabbit:listener-container>