注:仓储服务是进行发货逻辑的。
也就是说服务消费者和服务提供者之间没有任何依赖关系了,服务消费者只需要向Broker发送成功的事件就可以了,其他的和服务消费者就没有关系了,就是Broker和服务提供者之间的关系问题了。
几种常见的MQ对比:
MQ解决了什么问题:
就是上面异步中(新添加了Broker)的优点那些,因为我们知道MQ也就是事件驱动架构中的Broker,所以上面的 耦合度低、流量削锋、服务之间没有依赖关系了,不再担心级联问题了、性能提升,吞吐量提高就是Broker所起的作用,也可以说就是MQ所起的作用。
注:exchange交换机是通过路由的方式把消息发送给队列中的。
注:exchange交换机是通过路由的方式把消息发送给队列中的。
最终消息消费者可以在队列中获取消息,然后进行处理消息了。
VirtualHost:虚拟主机(也就是说交换机和队列会放在一个虚拟主机中),每创建一个用户就会有一个新的虚拟主机,各个虚拟主机之间是相互隔离的互不影响,这样可以避免干扰。
虚拟主机中放置的就是exchange交换机和消息队列,exchange交换机会通过路由的方式向queue消息队列发送消息提供者发送的消息(请求)。
写代码之前,我们还必须要理清楚消息是什么:
还是从这个图我们可以看出,RabbitMQ充当的就是下面的这个Broker,RabbitMQ就是消息队列,我们这个支付服务功能模块发送的支付成功事件就是一个消息(因此我们这个支付服务功能模块就可以叫做publisher消费发布者)然后这个支付成功的消息就暂存在queue队列中,后面的订单服务或者仓储服务等就可以叫做consumer消息消费者了,消息消费者通过向队列中订阅事件就可以拿到MQ队列中消费发布者发送的消息了,然后消息消费者拿到消息后,就可以处理消息了。
理清楚之后,我们就可以用代码演示了:
- package cn.itcast.mq.helloworld;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import org.junit.Test;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class PublisherTest {
- @Test
- public void testSendMessage() throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.150.101");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("itcast");
- factory.setPassword("123321");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
-
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
-
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 4.发送消息
- String message = "hello, rabbitmq!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
-
- // 5.关闭通道和连接
- channel.close();
- connection.close();
-
- }
- }
- package cn.itcast.mq.helloworld;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class ConsumerTest {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.150.101");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("itcast");
- factory.setPassword("123321");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
-
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
-
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 4.订阅消息
- channel.basicConsume(queueName, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 5.处理消息
- String message = new String(body);
- System.out.println("接收到消息:【" + message + "】");
- }
- });
- System.out.println("等待接收消息。。。。");
- }
- }
第一步:首先第一步建立连接,也就是说我们消息提供者想要把消息发送到MQ队列当中暂存的话,那么就需要先通过 ConnectionFactory 连接工厂这个对象进行与MQ建立联系:
第二步:设置连接参数
第三步:让消息提供者与RabbitMQ建立连接关系,然后建立一个通道Channel
通道Channel: 我们就可以理解为消息提供者和RabbitMQ建立好连接关系后,消息提供者就走这个通道向MQ消息队列中发送消息的(可以把这个通道比作一条指引的路)
第四步:然后我们就可以在RabbitMQ上创建一个队列了(用来暂存消息提供者发送的消息)
第五步:有了队列之后,消息提供者就可以向队列当中发送消息进行暂存了(发送完消息后记得把通道关闭即可)
发送完消息后,我们知道我们simple.queue队列中就已经有了消息提供者发送的消息了,我们可以查看一下是否有了:
同理消息消费者如果想在RabbitMQ消息队列中订阅获取消息,那么我们这个消息消费者同样需要先和RabbitMQ建立连接,然后创建一个通道Channel(和消息提供者一样,不过这里这个消息消费者等会就通过走这个通道向RabbitMQ消息队列中订阅获取消息)
注:一定要还和刚才那个RabbitMQ对应的ip值一样,要不然就连接不到刚才那个RabbitMQ了,如果连接都连接不上的话,更别说拿那个RabbitMQ消息队列中的消息了。
第三步:创建一个队列
思考:我们这个simple.queue队列,刚才消息提供者已经创建好了,为什么这里这个消息消费者还要再次创建呢:
原因就是实际开发中,我们不知道是消息提供者代码先执行,还是消息消费者的代码先执行,如果是消息消费者的代码先执行的话那么这个队列如果消费者不创建的话,那么RabbitMQ当中就没有这个simple.queue队列(因为消息提供者代码没有先执行,所以没有创建),因此为了保险起见,在消息消费者中也创建一下这个simple.queue队列。
注:如果这个队列已经在RabbitMQ存在的话,那么就不会再次创建了,因此不会有什么坏处的。
第四步: 就可以订阅RabbitMQ消息队列中的消息了
因为我们这个消息消费者已经和RabbitMQ建立了联系了,因此就可以订阅拉取RabbitMQ中simple.queue队列中刚才消息提供者发送的消息了
也就是上面消息提供者和消息消费者的流程: