• RabbitMQ快速入门(详解)


    RabbitMQ属于消息中间件

    原则:先进先出 (FIFO)

    作用:

    1.异步处理

    节省时间,异步去执行任务

    2.应用解耦

    避免因为一个系统故障,导致另外一个请求完成不了

    3.流量削峰

    4.日志处理

    5.消息通讯

    docker-compose安装RabbitMQ

    1.配置yaml文件

    进入docker-compose.yaml文件(vi docker-compose.yaml),将此段配置信息追加进去

    1. rabbitmq:
    2. hostname: rabbitmq
    3. environment:
    4. RABBITMQ_DEFAULT_VHOST: "root"
    5. RABBITMQ_DEFAULT_USER: "root"
    6. RABBITMQ_DEFAULT_PASS: "123456"
    7. image: "rabbitmq:3.9.14-management"
    8. restart: always
    9. volumes:
    10. - "/usr/local/bank/rabbitmq/data:/var/lib/rabbitmq"
    11. - "/usr/local/bank/rabbitmq/log:/var/lib/rabbitmq/log"
    12. ports:
    13. - "15672:15672"
    14. - "4369:4369"
    15. - "5672:5672"
    16. - "25672:25672"

     

    2.开启容器,拉取镜像

    docker-compose up -d

     

    3.开启25672,4369,5672,15672端口 

    15672  管理界面ui使用的端口

    5672   AMQP 0-9-1 without and with TLSclient端通信口

    4369 (epmd)epmd代表 Erlang端口映射守护进程,erlang发现口

    25672  ( Erlang distribution) server间内部通信口

     

    如果是contos7则需要手动开启 

     

    4.访问端口

     

     

    简单案例

    1.导入依赖

    1. <!--rabbitmq 依赖客户端-->
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.8.0</version>
    6. </dependency>
    7. <!--操作文件流的一个依赖-->
    8. <dependency>
    9. <groupId>commons-io</groupId>
    10. <artifactId>commons-io</artifactId>
    11. <version>2.6</version>
    12. </dependency>

     

     2.创建工具类

    1. /**
    2. * 功能描述: <br>
    3. * 〈rabbtimq工具类〉
    4. * @Param:
    5. * @Return:
    6. * @Author: Mr.Qiu
    7. * @Date:
    8. */
    9. public class RabbitMQUtil {
    10. public static Channel getChannel(){
    11. ConnectionFactory factory=new ConnectionFactory();
    12. //设置主机 用户名 密码以及虚拟分区(填入自己的)
    13. factory.setHost("106.52.242.189");
    14. factory.setUsername("root");
    15. factory.setPassword("123456");
    16. factory.setVirtualHost("root");
    17. System.out.println("链接成功");
    18. Connection connection= null;
    19. Channel channel=null;
    20. try {
    21. connection = factory.newConnection();
    22. //创建信道
    23. channel = connection.createChannel();
    24. } catch (IOException e) {
    25. e.printStackTrace();
    26. } catch (TimeoutException e) {
    27. e.printStackTrace();
    28. }
    29. return channel;
    30. }
    31. }

     

    3.建立生产者模块producer-service

    pom文件引入common

    4.创建测试类Producer

    1. package com.qiu.producer;
    2. import com.qiu.common.utils.RabbitMQUtil;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * 功能描述: <br>
    10. * 〈消息的生产者〉
    11. * @Param:
    12. * @Return:
    13. * @Author: Mr.Qiu
    14. * @Date:
    15. */
    16. public class Producer {
    17. private final static String QUEUE_NAME = "hello";
    18. public static void main(String[] args) {
    19. Channel channel= RabbitMQUtil.getChannel();
    20. //通过信道可以发送信息,先定义队列,再给队列发送消息
    21. /**
    22. * 生成一个队列
    23. * 1.队列名称
    24. * 2.队列里面的消息是否持久化 也就是是否用完就删除
    25. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
    26. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
    27. * 5.其他参数
    28. */
    29. try {
    30. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    31. String message = "hello aqiu";
    32. /**
    33. * 发送一个消息
    34. * 1.发送到那个交换机
    35. * 2.路由的 key 是哪个
    36. * 3.其他的参数信息
    37. * 4.发送消息的消息体
    38. */
    39. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    40. System.out.println("消息发送完毕");
    41. } catch (IOException e) {
    42. e.printStackTrace();
    43. }
    44. }
    45. }

    运行测试

     

     

    5.创建消费者模块consumer-service

    pom文件引入common-service

     

    6.建立Consumer测试类

    1. /**
    2. * 功能描述: <br>
    3. * 〈消息的消费者〉
    4. * @Param:
    5. * @Return:
    6. * @Author: Mr.Qiu
    7. * @Date:
    8. */
    9. public class Consumer {
    10. private final static String QUEUE_NAME = "hello";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel= RabbitMQUtil.getChannel();
    13. System.out.println("等待接收消息.........");
    14. //推送的消息如何进行消费的接口回调
    15. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    16. String message = new String(delivery.getBody());
    17. System.out.println(message);
    18. };
    19. //取消消费的一个回调接口 如在消费的时候队列被删除掉了
    20. CancelCallback cancelCallback = (consumerTag) -> {
    21. System.out.println("消息消费被中断");
    22. };
    23. /**
    24. * 消费者消费消息 - 接受消息
    25. * 1.消费哪个队列
    26. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
    27. * 3.消费者未成功消费的回调
    28. * 4.消息被取消时的回调
    29. */
    30. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    31. }
    32. }

    运行结果

     

    Work Queues任务队列

    1.创建消费者

    1. /**
    2. * 功能描述: <br>
    3. * 〈消息的消费者〉
    4. * @Param:
    5. * @Return:
    6. * @Author: Mr.Qiu
    7. * @Date:
    8. */
    9. public class Consumer {
    10. private final static String QUEUE_NAME = "hello";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel= RabbitMQUtil.getChannel();
    13. System.out.println("等待接收消息.........");
    14. //推送的消息如何进行消费的接口回调
    15. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    16. String message = new String(delivery.getBody());
    17. System.out.println(message);
    18. };
    19. //取消消费的一个回调接口 如在消费的时候队列被删除掉了
    20. CancelCallback cancelCallback = (consumerTag) -> {
    21. System.out.println("消息消费被中断");
    22. };
    23. /**
    24. * 消费者消费消息 - 接受消息
    25. * 1.消费哪个队列
    26. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
    27. * 3.消费者未成功消费的回调
    28. * 4.消息被取消时的回调
    29. */
    30. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    31. }
    32. }

     

    2.启动两个消费者

     

     3.建立一个生产者类

    1. /**
    2. * 功能描述: <br>
    3. * 〈发送信息线程〉
    4. * @Param:
    5. * @Return:
    6. * @Author: Mr.Qiu
    7. * @Date:
    8. */
    9. public class Task01 {
    10. public static final String QUEUE_NAME = "hello";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel = RabbitMQUtil.getChannel();
    13. Scanner scanner = new Scanner(System.in);
    14. while (scanner.hasNext()) {
    15. String message = scanner.next();
    16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    17. System.out.println("消息发送完成:" + message);
    18. }
    19. }
    20. }

     启动

     

     4.轮询消费

    持续发送消息,消费者轮询接收

     

     

    如其中一个消费者发生故障,那么另外一个消费者会全部接收

    C1停止

     生产者持续发送消息,C2全部接收

     

     

  • 相关阅读:
    尚医通 (五) --------- 医院模块
    程序猿读历史
    【docker系列】docker-compose的YAML配置文件v2、v3版本详解
    【uniapp基础篇】实现微信登录
    快速创建Jenkins Job
    ClickHouse 物化视图
    【Python零基础入门篇 · 6】:Python中的注释、字符串的常见操作、对象的布尔值
    176. 第二高的薪水
    Java获取时间戳、字符串和Date对象的相互转换、日期时间格式化、获取年月日
    React入门
  • 原文地址:https://blog.csdn.net/lu__lala/article/details/125405591