通讯方式有两种方式,分为为:
这两个不同的通信方式各有优缺点
在以往的学习过程以及生活中,同步通信我们时常会遇到,除了前面说到的通话,远程调用技术Feign同样属于同步方式。虽然调用可以实时得到结果,时效性强,但还是存在一些不可避免的问题。
异步通信则可以有效的解决上述问题。以购买商品的业务为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。在事件模式中:
支付服务是事件发布者(publisher) ,在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
订单服务和物流服务是事件订阅者(Consumer) ,订阅支付成功的事件,监听到事件后完成自己业务即可。
同时为了事件发布者与订阅者之间的解耦,两者并不是直接通信,而是有一个中间商(Broker) 。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
中间商Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控,这些特性使得它具备以下好处:
吞吐量提升:无需等待订阅者处理完成,响应更快速;
故障隔离:服务没有直接调用,不存在级联失败问题;
调用间没有阻塞,不会造成无效的资源占用;
耦合度极低,每个服务都可以灵活插拔,可替换;
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件。
当然了,不能好处全被它拿了是吧,对于异步通信,同样具备着不可避免的缺点:
MQ(Message Queue),翻译过来叫做消息队列。顾名思义,它就是一个存放消息的队列,对应着事件驱动架构中的Broker。而市场上主流的MQ技术有如下几种,我们可以根据不同的需求选择适合的MQ。当然,这里我们使用的肯定是RabbitMQ。
追求高可用:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
在下面的操作之前,我们需要了解一下RabbitMQ的基本结构以及它的一些角色:
在快速入门(fen)之前,我们肯定得先对RabbitMQ进行安装,这里使用Docker进行安装。当然我们肯定是可以使用在线拉取的方式进行安装,同时也可以使用镜像包进行本地加载,这里提供了网盘链接有需自取:
docker pull rabbitmq:3-management
;docker load -i mq.tar
加载完成RabbitMQ镜像之后,我们需要通过命令来创建并运行RabbitMQ容器,命令对应的解释如下:
docker run \
-e RABBITMQ_DEFAULT_USER=xbaozi \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
在浏览器中输入服务器IP+管理平台端口号,如http://192.168.150.100:15672
,即可看到管理平台的登录界面
输入账号密码之后即可进入我们RabbitMQ管理平台的界面,这里的账号密码是上面自己设置的,如我这里账号是xbaozi,密码123456。
这里入门使用的案例是官方提供的一个**Hello World!**示例,为简单队列模式,是基于最基础的消息队列模型来实现的,因此其只包含了三个角色:
基本消息队列的消息发送,即发布者publisher的实现流程如下:
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xbaozi");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
在运行测试类发送消息成功之后我们再访问RabbitMQ的管理平台,可以发现在MQ中成功将消息纳入队列中了。
基本消息队列的消息接收,即订阅者consumer的实现流程如下:
这里事先将结果放到这里,因为这里有一个小细节,在这里提前解释可能对下面阅读代码会更有帮助。运行结果中是先等待接收后接收到信息,而代码中写的顺序是先接收信息后输出等待接收。这时就会有小伙伴疑问了:这是不是出bug了???
其实并不是,不知道大家是否还记得我们现在的通信方式是异步通信,而在订阅消息这一步骤中正是异步的表现,我发送了请求对消息进行订阅,发送完了我就可以执行下一步即输出等待接收信息……
这一信息,而接收的事情我不着急着要所以就没必要等着先输出接收到信息后输出等待接收信息。这与Java的多线程很相似,可以对应着理解。
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("xbaozi");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
在运行成功接收到信息后,我们再次进入到管理平台中查看,RabbitMQ确实将信息从队列中取出并发送给了订阅者。