目录
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信) 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:
1. 消息通知系统:通知商家,你有一笔新的订单,请及时发货
2. 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
3. 会员系统:更新用户的积分和等级信息
- createOrder(...) {
- // 完成订单服务
- doCreateOrder(...);
- // 调用其他服务接口
- sendMsg(...);
- updateUserInterestedGoods(...);
- updateMemberCreditInfo(...);
- }
我们需要一个消息中间件,来实现解耦和缓冲的功能
小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走.久而久之,两人都觉得麻烦. 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看.
书架就是一个消息队列,小红是生产者,小明是消费者.
1. 小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了.这样小红小明的时间都更自由.
2. 小红相信小明的读书自觉和读书能力,不必亲眼观察小明的读书过程,小红只要做一个放书的动作,很节省时间.
3. 当明天有另一个爱读书的小伙伴小强加入,小红仍旧只需要把书放到书架上,小明和小强从书架上取书即可
4. 书架上的书放在那里,小明阅读速度快就早点看完,阅读速度慢就晚点看完,没关系,比起小红把书递给小明并监督小明读完的方式,小明的压力会小一些.
1. 解耦:每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系.
2. 提速:小红选只要做一个放书的动作,为自己节省了大量时间.
3. 广播:小红只需要劳动一次,就可以让多个小伙伴有书可读,这大大地节省了她的时间,也让新的小伙伴的加入成本很低.
4. 错峰与流控:小红给书的频率不稳定,如果今明两天连给了五本,之后隔三个月才又给一本,那小明只要在三个月内从书架上陆续取走五本书读完就行了,压力就不那么大了.
有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题.
例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况.
导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了.面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请 求,不会影响用户使用其他功能.
一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言.
Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
Message Queue:消息队列,用于存储还未被消费者消费的消息.
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.
1. 拉取 RabbitMQ 镜像
docker pull rabbitmq:management
注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
2. 创建容器
- docker run -d \
- --name my-rabbitmq \
- -p 5672:5672 -p 15672:15672 \
- -v /home/rabbitmq:/var/lib/rabbitmq \
- --hostname my-rabbitmq-host \
- -e RABBITMQ_DEFAULT_VHOST=my_vhost \
- -e RABBITMQ_DEFAULT_USER=admin \
- -e RABBITMQ_DEFAULT_PASS=admin \
- --restart=always \
- rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码
3. 容器启动后,可以通过 docker logs 容器 查看日志
docker logs my-rabbitmq
4. 进入管理后台
http://ip:15672
切记需要授权
- rabbitMQ (父工程maven工程搭建即可)
- - consumer(子工程:springboot项目搭建)消费者
- - provider(publisher)(子工程:springboot项目搭建)生产者
- "1.0" encoding="UTF-8"?>
-
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
-
- <groupId>com.jmhgroupId>
- <artifactId>rabbitMqartifactId>
- <version>1.0-SNAPSHOTversion>
- <packaging>pompackaging>
-
- <name>rabbitMq Maven Webappname>
-
- <url>http://www.example.comurl>
-
- <modules>
- <module>consumermodule>
- <module>providermodule>
- modules>
-
- <properties>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <maven.compiler.source>1.8maven.compiler.source>
- <maven.compiler.target>1.8maven.compiler.target>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <maven.compiler.source>1.7maven.compiler.source>
- <maven.compiler.target>1.7maven.compiler.target>
- <spring-boot.version>2.4.1spring-boot.version>
- <spring-cloud.version>2020.0.0spring-cloud.version>
- <spring-cloud-alibaba.version>2021.1spring-cloud-alibaba.version>
- properties>
-
- <dependencies>
- <dependency>
- <groupId>junitgroupId>
- <artifactId>junitartifactId>
- <version>4.11version>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-dependenciesartifactId>
- <version>${spring-boot.version}version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- dependencyManagement>
-
- project>
- server:
- port: 8080 #服务器端口
- spring:
- application:
- name: provider #服务名称
- rabbitmq:
- host: 192.168.119.128 #虚拟机ip
- password: 1234 rabbitmq管理页面密码
- port: 5672 # 端口
- username: springboot #rabbitmq管理页面账户名
- virtual-host: my_vhost #服务名 授权的
- package com.jmh.provider.config;
-
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- @SuppressWarnings("all")
- public class RabbitConfig {
- @Bean
- public Queue firstQueue() {
- return new Queue("firstQueue");
- }
- }
- package com.jmh.provider.utils;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * @author 蒋明辉
- * @data 2022/11/23 19:12
- */
- @Component
- @SuppressWarnings("all")
- public class Sender {
- @Autowired
- private AmqpTemplate rabbitTemplate;
- public void sendFirst() {
- rabbitTemplate.convertAndSend("firstQueue", "Hello World");
- }
-
- public void sendFirst(String json) {
- rabbitTemplate.convertAndSend("firstQueue", json);
- }
- }
-
- package com.jmh.provider.model;
-
- import lombok.*;
-
- import java.io.Serializable;
-
- /**
- * @author 蒋明辉
- * @data 2022/11/23 20:23
- */
- @SuppressWarnings("all")
- @Getter
- @Setter
- @AllArgsConstructor
- @NoArgsConstructor
- @ToString
- public class User implements Serializable {
- private String username;
- private String userpwd;
- }
- server:
- port: 8081
- spring:
- application:
- name: consumer
- rabbitmq:
- host: 192.168.119.128
- password: 1234
- port: 5672
- username: springboot
- virtual-host: my_vhost
-
- package com.jmh.consumer.model;
-
- import lombok.*;
-
- import java.io.Serializable;
-
- /**
- * @author 蒋明辉
- * @data 2022/11/23 20:23
- */
- @SuppressWarnings("all")
- @Getter
- @Setter
- @AllArgsConstructor
- @NoArgsConstructor
- @ToString
- public class User implements Serializable {
- private String username;
- private String userpwd;
- }
- package com.jmh.consumer.utils;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.jmh.consumer.model.User;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- @SuppressWarnings("all")
- @Slf4j
- @RabbitListener(queues = "firstQueue")
- public class Receiver {
- @RabbitHandler
- @SneakyThrows
- public void process(String json) {
- log.warn("接收到:" + json);
- ObjectMapper objectMapper=new ObjectMapper();
- log.warn("接收到:" + objectMapper.readValue(json, User.class));
- }
- }
启动consumer子工程服务(消费者)
- package com.jmh.provider;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.jmh.provider.model.User;
- import com.jmh.provider.utils.Sender;
- import lombok.SneakyThrows;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class ProviderApplicationTests {
-
- @Autowired
- private Sender sender;
-
- @Test
- @SneakyThrows
- void contextLoads() {
- /* sender.sendFirst();*/
- User user = new User("蒋明辉", "1234");
- ObjectMapper objectMapper=new ObjectMapper();
- sender.sendFirst(objectMapper.writeValueAsString(user));
- }
-
- }