

(1)应用解耦
在没有MQ时,订单系统直接与其他系统交互,当其他任意一个系统出错时,都会影响到订单系统。加入MQ后,订单系统将信息都放入MQ中,其它系统从MQ取,即使出错也不会影响到订单系统。并且当再添加一个系统时也不会跟订单系统有所影响,直接根据MQ中的信息添加。

(2)异步提速
不加MQ,用户响应需要920s

加上MQ,仅需25s,就得到响应

(3)削峰填补
不加MQ,当请求瞬间过多,系统容易崩溃。

加上MQ,直接解决此问题,所有请求装进MQ,系统慢慢取慢慢处理。




RabbitMQ基于AMQP标准。

RabbitMQ基础架构图:



1、下面3个包传到虚拟机

2、在线安装依赖环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
3、安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
4、安装RabbitMQ
# 安装
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
5、开启管理界面及配置
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
6、启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
7、设置配置文件
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码。
可以在上面创建用户并为其分配权限、创建Virtual hosts、并给用户设置权限。可以查看交换机、队列、连接信息。

producer端:
package com.liu.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.182.134");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hellooo", true, false, false, null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称,这里简单模式跟队列名称相同
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 1 ;i<=5; i++) {
String body = "hello lmhlmh~~~";
//6. 发送消息
channel.basicPublish("", "hellooo", null, body.getBytes());
}
//7.发送到MQ中后就可以释放资源了。消费者直接去MQ中取
channel.close();
connection.close();
}
}
consumer端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.182.134");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//生产者刚才创建的队列,也可以不指定(但有个小bug是当producer来回多次启动后再取得话,就取不出来)
channel.queueDeclare("hellooo",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hellooo",true,consumer);
//关闭资源?不要
}
}

producer端:
package com.liu.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
//6. 发送消息
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
consumer1端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
consumer2端
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
小结:



producer端:
这里是广播模式:
package com.liu.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
consumer1端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 队列名称
String queue1Name = "test_fanout_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
consumer2端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息保存数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
小结:


producer端:
package com.liu.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_direct";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_direct_queue_first";
String queue2Name = "test_direct_queue_second";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
这里是direct类型,routingKey设置为特定的值
*/
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
//8. 发送消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
consumer1端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 找到队列,取其中的消息
String queue2Name = "test_direct_queue_second";
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("test_direct_queue_second",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
consumer2端:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 找到队列,取其中的消息
String queue1Name = "test_direct_queue_first";
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
channel.queueDeclare("test_direct_queue_first",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
小结:




producer端:
package com.liu.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_topic";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6. 创建队列
String queue1Name = "test_topic_queue_first";
String queue2Name = "test_topic_queue_second";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库; #代表任意个数据,*代表一个数据
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "日志信息:张三调用了delete方法...日志级别:error...";
//8. 发送消息
channel.basicPublish(exchangeName,"bb.aa",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
consumer1:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 找到队列,取消息
String queue1Name = "test_topic_queue_first";
String queue2Name = "test_topic_queue_second";
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
channel.queueDeclare(queue1Name,true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存入数据库.......");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
consumer2:
package com.liu.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("master");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/menghao");//虚拟机 默认值/
factory.setUsername("lmh");//用户名 默认 guest
factory.setPassword("123456");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
// 找到队列,取消息
String queue1Name = "test_topic_queue_first";
String queue2Name = "test_topic_queue_second";
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
channel.queueDeclare(queue2Name,true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台.......");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}

生产者和消费者 pom.xml文件添加依赖:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
生产者和消费者 application.yam 文件配置如下:
# 配置RabbitMQ的基本信息 ip 端口 username password..
spring:
rabbitmq:
host: master # ip
port: 5672
username: lmh
password: 123456
virtual-host: /menghao
生产者主要是在配置类中定义队列、交换机、二者绑定、以及一些参数规则。
package com.liu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// 定义的 队列、交换机、绑定规则 等都放在这个配置类中
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.交换机
@Bean("bootExchange") // 注入bean,为了后面绑定时的获取
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
//2.Queue 队列
@Bean("bootQueue") // 注入bean,为了后面绑定时的获取
public Queue bootQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
.build();
}
//3. 队列和交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("boot.#")
.noargs();
}
public static final String EXCHANGE_NAME_CONFIRM = "boot_exchange_confirm";
public static final String QUEUE_NAME_CONFIRM = "boot_queue_confirm";
//1.交换机
@Bean("confirmExchange") // 注入bean,为了后面绑定时的获取
public Exchange confirm_Exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_CONFIRM)
.durable(true)
.build();
}
//2.Queue 队列
@Bean("confirmQueue") // 注入bean,为了后面绑定时的获取
public Queue confirm_Queue(){
return QueueBuilder
.durable(QUEUE_NAME_CONFIRM)
.build();
}
@Bean
public Binding bindQueueExchangeConfirm(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("confirm.#")
.noargs();
}
// 设置过期时间的交换机
public static final String EXCHANGE_NAME_TTL = "boot_exchange_ttl";
public static final String QUEUE_NAME_TTL = "boot_queue_ttl";
//1.交换机
@Bean("ttlExchange") // 注入bean,为了后面绑定时的获取
public Exchange ttl_Exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_TTL)
.durable(true)
.build();
}
//2.Queue 队列
@Bean("ttlQueue") // 注入bean,为了后面绑定时的获取
public Queue tll_Queue(){
return QueueBuilder
.durable(QUEUE_NAME_TTL)
.ttl(10000) // 10s内队列里的消息没被消费的话 就统一全过期
.deadLetterExchange(EXCHANGE_NAME_DLX) // 绑定死信交换机
.deadLetterRoutingKey("dlx.#") // 定义死信队列路由
.maxLength(10) // 设置队列最大长度
.build();
}
@Bean
public Binding bindQueueExchangeTtl(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("ttl.#")
.noargs();
}
// 死信队列,当某一个队列 消息过期/达到队列长度限制/消息被消费者拒收 这些消息可以被转为与之绑定的死信队列中
public static final String EXCHANGE_NAME_DLX = "boot_exchange_dlx";
public static final String QUEUE_NAME_DLX = "boot_queue_dlx";
//1.交换机
@Bean("dlxExchange") // 注入bean,为了后面绑定时的获取
public Exchange dlx_Exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX)
.durable(true)
.build();
}
//2.Queue 队列
@Bean("dlxQueue") // 注入bean,为了后面绑定时的获取
public Queue dlx_Queue(){
return QueueBuilder
.durable(QUEUE_NAME_DLX)
.build();
}
@Bean
public Binding bindQueueExchangeDlx(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("dlx.#")
.noargs();
}
}
这里定义了四对 交换机&队列。
(1)第一对是一个简单的通配符模式的匹配规则。
通过测试方法,来向队列发送消息:
// 注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 1、普通发送
@Test
public void testSend(){
// 三个参数:交换机名称、匹配的规则、发送的信息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
}
然后在consumer将消息取出:
package com.liu.listener;
//注意这里引入的Channel 包
import com.rabbitmq.client.Channel;
// 注意这里引入的 Message 包
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName: RabbitMQListener
* @Author: liumenghao
* @Description:
* @Date: 2022/11/18 13:12
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue") // 项目启动后,就监听此队列,监听到消息就输出
public void ListenerQueue(Message message){
//System.out.println(message);
System.out.println(new String(message.getBody())); // 输出监听到的消息
}
}
主要用到了 @RabbitListener 此注解,来找到队列!
(2)第二对是开启确认模式

在生产者的yam文件中增添:
publisher-confirm-type: correlated # 开启确认模式
通过测试方法,像队列发送消息:
// 2、 确认模式下发送
/*
* 确认模式:当消息发送给Exchange后,返回给发送者的结果(仅仅判断是否成功发送到Exchange,而不管Exchange是否成功路由到Queue)
* 1、在 yam文件中设置 publisher-confirm-type: correlated
* 2、在 rabbitTemplate 中定义 confirmCallBack
* */
@Test
public void testConfirm(){
// 在确认模式下,定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/*
* correlationData: 相关配置信息
* b: 交换机是否成功收到了消息。
* s: 失败原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被执行了.....");
if(b) {
System.out.println("接受成功消息" + s);
} else {
System.out.println("接收失败消息" + s);
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm.aaa.bb","boot mq confirm~~~");
}
(2)第二对还开启回退模式
在生产者的yam文件中添加:
publisher-returns: true # 开启回退模式
通过测试方法,向队列发送消息:
// 3、回退模式下发送
/*
* 回退模式:处理成功被Exchange接收,而路由到Queue失败的消息
* 当消息发送给Exchange后,Exchange路由到Queue失败时,才会执行ReturnCallBack
* 步骤:
* 1、开启回退模式,在yam文件中 publisher-returns: true
* 2、设置Exchange处理消息的模式
* 1、如果消息没有路由到Queue,则丢弃消息(默认)
* 2、如果消息没有路由到Queue,返回给消息发送方(在ReturnCallBack中写)
* 3、如果处理消息模式为处理失败消息,定义 ReturnCallBack
* */
@Test
public void testReturn(){
// 设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println(returnedMessage.getMessage());
System.out.println(returnedMessage.getReplyCode());
System.out.println(returnedMessage.getReplyText());
System.out.println(returnedMessage.getExchange());
System.out.println(returnedMessage.getRoutingKey());
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm","boot mq confirm~~~");
}

生产端用的还是第二对的交换机&队列
消费端yam文件增加配置:
listener:
simple:
acknowledge-mode: manual # consumer ack 手动确认
还是上述测试方法的方式发送消息。
发送过之后,在consumer端取消息,这里在取消息的时候故意制造错误,会发现生产端不停重发
package com.liu.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ClassName: ConsumerAckListener
* @Author: liumenghao
* @Description:
* @Date: 2022/11/18 15:58
*/
@Component
public class ConsumerAckListener {
/*
* consumer ACK确认机制
* 1、默认情况下是自动签收,收到就返回,不管后续处理如何
* 2、手动签收:yam文件 配置 listener.simple.acknowledge-mode: manual
* 3、还是通过RabbitListener注解
* 4、如果消息处理成功,调用channel的 basicAck() 签收
* 5、如果消息处理失败,调用channel的 basicNack() 拒绝签收,第三方重新发送给consumer
* */
@RabbitListener(queues = "boot_queue_confirm")
// @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag 将请求头AmqpHeaders中的DELIVERY_TAG参数 映射到 deliveryTag
public void ListenerACKQueue(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception{
// Thread.sleep(1000); // 延迟1s看的更清楚
try {
// 1、接受转换消息
System.out.println("ack: " + new String(message.getBody()));
// 2、处理业务逻辑
System.out.println("处理业务逻辑...");
// 设置一个错误,会看到第三方会无线发消息
// int i = 3/0 ;
// 3、手动签收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
// 4、一旦出现异常就拒绝签收
// 第三个参数含义:设为true,消息重新返回到queue, 第三方会重新发送该消息给consumer
channel.basicNack(deliveryTag, true, true);
}
}
}
所有请求都先发送到MQ,消费端自己定义每次处理多少请求。避免了突然崩溃的情况。

通过测试方法向队列发送消息:
// 4、发送多条消息,测试 consumer 限流
@Test
public void testSendMany() {
for(int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm","boot mq confirm~~~");
}
}
在消费者的yam文件中增加如下配置:
prefetch: 1 # 限流量,consumer 每次从mq拉取的消息数,直到手动确认完毕后,才会继续拉取下一条消息
消费者获取消息代码:
package com.liu.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ClassName: QosListener
* @Author: liumenghao
* @Description:
* @Date: 2022/11/18 16:01
*/
/*
* Consumer 限流机制
* 1、确保consumer ack机制为手动确认
* 2、在 yam 文件中配置 listener.simple.prefetch
* prefetch = 1,表示consumer 每次从mq拉取一条消息来消费,直到手动确认消费完毕后才会继续拉取下一条消息。
* */
@Component
public class QosListener {
@RabbitListener(queues = "boot_queue_confirm")
public void qosListener(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception{
Thread.sleep(1000); // 每次处理后,休息1s,看的更清楚
// 获取消息
System.out.println("qos: " + new String(message.getBody()));
// 处理业务逻辑
// 签收. 如果把这行注释,则控制台只会打印一条消息,因为每次接收一个,没签收之前不会去接收下一个
channel.basicAck(deliveryTag, true);
}
}
消费端的确认模式一定要为手动确认!
第三对交换机&队列 设置了过期时间。在队列中的消息一段时间没被取出就过期。

对队列整体设置过期时间:

测试代码(可以为单个发送的消息设置过期时间)
// 5、发送多条消息,测试 过期时间
/*
* TTL: 过期时间
* 1、队列统一过期
*
* 2、消息单独过期
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* */
@Test
public void testTtl() {
// 单个消息过期时间的设置
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 1、设置message的信息
message.getMessageProperties().setExpiration("5000"); // 消息过期时间为5s
// 2、返回该消息
return message;
}
};
for(int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq ttl~~~", messagePostProcessor);
}
}




测试代码向队列发送消息:
在三种方式下可以看到,到达指定时间、或超出队列长度、或消息被拒收都会转移到死信队列中
// 6、测试死信队列
@Test
public void testDlx() {
// 1、超出过期时间从而转入死信队列测试
// rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
// 2、超出队列长度从而转入死信队列测试
// for(int i=1 ;i<=20 ;i++) {
// rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
// }
// 3、测试消费者拒收消息从而转入死信队列测试
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
}
小结:



