RabbitMQ属于消息中间件
原则:先进先出 (FIFO)
作用:
1.异步处理
节省时间,异步去执行任务
2.应用解耦
避免因为一个系统故障,导致另外一个请求完成不了
3.流量削峰
4.日志处理
5.消息通讯
docker-compose安装RabbitMQ
1.配置yaml文件
进入docker-compose.yaml文件(vi docker-compose.yaml),将此段配置信息追加进去
- rabbitmq:
- hostname: rabbitmq
- environment:
- RABBITMQ_DEFAULT_VHOST: "root"
- RABBITMQ_DEFAULT_USER: "root"
- RABBITMQ_DEFAULT_PASS: "123456"
- image: "rabbitmq:3.9.14-management"
- restart: always
- volumes:
- - "/usr/local/bank/rabbitmq/data:/var/lib/rabbitmq"
- - "/usr/local/bank/rabbitmq/log:/var/lib/rabbitmq/log"
- ports:
- - "15672:15672"
- - "4369:4369"
- - "5672:5672"
- - "25672:25672"

2.开启容器,拉取镜像
docker-compose up -d

3.开启25672,4369,5672,15672端口
15672 管理界面ui使用的端口
5672 AMQP 0-9-1 without and with TLSclient端通信口
4369 (epmd)epmd代表 Erlang端口映射守护进程,erlang发现口
25672 ( Erlang distribution) server间内部通信口

如果是contos7则需要手动开启

4.访问端口


简单案例
1.导入依赖
- <!--rabbitmq 依赖客户端-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.8.0</version>
- </dependency>
- <!--操作文件流的一个依赖-->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.6</version>
- </dependency>

2.创建工具类
-
- /**
- * 功能描述: <br>
- * 〈rabbtimq工具类〉
- * @Param:
- * @Return:
- * @Author: Mr.Qiu
- * @Date:
- */
- public class RabbitMQUtil {
-
- public static Channel getChannel(){
- ConnectionFactory factory=new ConnectionFactory();
- //设置主机 用户名 密码以及虚拟分区(填入自己的)
- factory.setHost("106.52.242.189");
- factory.setUsername("root");
- factory.setPassword("123456");
- factory.setVirtualHost("root");
- System.out.println("链接成功");
- Connection connection= null;
- Channel channel=null;
- try {
- connection = factory.newConnection();
- //创建信道
- channel = connection.createChannel();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- return channel;
- }
- }
3.建立生产者模块producer-service
pom文件引入common

4.创建测试类Producer
- package com.qiu.producer;
-
- import com.qiu.common.utils.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 功能描述: <br>
- * 〈消息的生产者〉
- * @Param:
- * @Return:
- * @Author: Mr.Qiu
- * @Date:
- */
- public class Producer {
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] args) {
- Channel channel= RabbitMQUtil.getChannel();
-
- //通过信道可以发送信息,先定义队列,再给队列发送消息
- /**
- * 生成一个队列
- * 1.队列名称
- * 2.队列里面的消息是否持久化 也就是是否用完就删除
- * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
- * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
- * 5.其他参数
- */
- try {
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "hello aqiu";
- /**
- * 发送一个消息
- * 1.发送到那个交换机
- * 2.路由的 key 是哪个
- * 3.其他的参数信息
- * 4.发送消息的消息体
- */
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("消息发送完毕");
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
-
-
- }
- }

运行测试


5.创建消费者模块consumer-service
pom文件引入common-service

6.建立Consumer测试类
- /**
- * 功能描述: <br>
- * 〈消息的消费者〉
- * @Param:
- * @Return:
- * @Author: Mr.Qiu
- * @Date:
- */
- public class Consumer {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- Channel channel= RabbitMQUtil.getChannel();
- System.out.println("等待接收消息.........");
-
- //推送的消息如何进行消费的接口回调
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody());
- System.out.println(message);
- };
- //取消消费的一个回调接口 如在消费的时候队列被删除掉了
- CancelCallback cancelCallback = (consumerTag) -> {
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者消费消息 - 接受消息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
- * 3.消费者未成功消费的回调
- * 4.消息被取消时的回调
- */
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
-
- }
- }
运行结果

1.创建消费者
- /**
- * 功能描述: <br>
- * 〈消息的消费者〉
- * @Param:
- * @Return:
- * @Author: Mr.Qiu
- * @Date:
- */
- public class Consumer {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- Channel channel= RabbitMQUtil.getChannel();
- System.out.println("等待接收消息.........");
-
- //推送的消息如何进行消费的接口回调
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody());
- System.out.println(message);
- };
- //取消消费的一个回调接口 如在消费的时候队列被删除掉了
- CancelCallback cancelCallback = (consumerTag) -> {
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者消费消息 - 接受消息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
- * 3.消费者未成功消费的回调
- * 4.消息被取消时的回调
- */
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
-
- }
- }

2.启动两个消费者


3.建立一个生产者类
- /**
- * 功能描述: <br>
- * 〈发送信息线程〉
- * @Param:
- * @Return:
- * @Author: Mr.Qiu
- * @Date:
- */
- public class Task01 {
- public static final String QUEUE_NAME = "hello";
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtil.getChannel();
-
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("消息发送完成:" + message);
- }
-
- }
- }

启动

4.轮询消费
持续发送消息,消费者轮询接收

如其中一个消费者发生故障,那么另外一个消费者会全部接收
C1停止

生产者持续发送消息,C2全部接收

