MQ(Message Quene):消息队列,是一种应用程序对应用程序的通信方法
使用场景:
RabittMQ采用Erlang语言开发,实现了高级消息队列协议(AMQP Advance Message Queuing Protocol)的开源消息中间件。
优点:
缺点
AMQP协议:高级消息队列协议

Virtual Host(虚拟主机:隔离不同用户的exchange、queue),每个Virtual Host里面可以有多个Exchange(交换机),每个交换机里面可以有多个消息队列。由于RabbitMQ是使用Erlang语言开发的,我们需要先安装Erlang,不同版本的RabbitMQ对应的Erlang版本也不一样,查看版本的对应链接如下:https://www.rabbitmq.com/which-erlang.html
RabittMQ下载地址:https://www.rabbitmq.com/download.html

Erlang的下载地址:https://www.erlang.org/downloads

Erlang的安装:
erl 打印出版本号即可RabittMQ安装:

rabbitmq-plugins.bat enable rabbitmq_management激活RabbitMQ的UI管理界面。net stop RabbitMQnet start RabbitMQlocalhost:15672
主页面:

Exchanges页面:创建Virtual Host之后,会给Virtual Host默认创建七个交换机,当然我们也可以手动创建交换机。

Admin页面:



官网Demo地址:https://www.rabbitmq.com/getstarted.html

官网第一种为例,https://www.rabbitmq.com/tutorials/tutorial-one-java.html,下面来创建一个HelloWorld使用案例:
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>5.7.1version>
dependency>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*
* 如果发送消息的时候,不给队列绑定交换机,则默认将消息发送到default交换机之中
* */
public class MQSend {
// 定义队列的名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] arg) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// MQ服务器的IP地址
factory.setHost("localhost");
// 端口号
factory.setPort(5672);
// MQ服务器里面的虚拟主机
factory.setVirtualHost("Vhost01");
// 虚拟主机隶属用户的账号
factory.setUsername("admin");
// 虚拟主机隶属用户的密码
factory.setPassword("admin");
//通过工厂获取链接
Connection connection = factory.newConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
//创建并声明一个队列,如果队列存在则使用这个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息内容
String message = "Hello World!";
//发送消息:第一个参数可以设置交换机的名字,不设定发送到default交换机之中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
System.out.println(" Send '" + message + "'");
}
}


import com.rabbitmq.client.*;
import java.io.IOException;
/*
* 消费者
* */
public class MQRevc {
//消费的队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] arg) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("Vhost01");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建并声明一个队列,如果队列存在则使用这个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String mess = new String(body);
System.out.println("接收到消息" + mess);
}
};
// 创建一个消费者监听器
// 确认我们是否收到消息,true是自动确认,false是手动确认
String res = channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("消费成功:" + res);
}
}

小结:
基本消息队列的消息发送流程:
建立connection
创建channel
利用channel声明队列
利用channel向队列发送消息
基本消息队列的消息接收流程:
建立connection
创建channel
利用channel声明队列
定义consumer的消费行为handleDelivery()
利用channel将消费者与队列绑定
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
public static final String FANOUT_EXCHANGE = "fanout.exchange";
@Bean(name = FANOUT_EXCHANGE)
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
public static final String FANOUT_QUEUE1 = "fanout.queue1";
@Bean(name = FANOUT_QUEUE1)
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, true, false, false);
}
@Bean
public Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,
@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Component
public class MQSender{
// 交换机
public static final String FANOUT_EXCHANGE = "fanout.exchange";
// 队列
public static final String FANOUT_QUEUE1 = "fanout.queue1";
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息,不需要实现任何接口,供外部调用。
public void send(String message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, FANOUT_QUEUE1, message, correlationId);
System.out.println("消息发送成功 : " + message);
}
}
@Component
public class MQReceiver {
@RabbitListener(queues = MQConfig.FANOUT_QUEUE1)
public void receiver(String msg, Channel channel) {
// 只包含发送的消息
System.out.println("receiver消费成功:" + msg);
// channel 通道信息
// message 附加的参数信息
}
}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=Vhost01