• MQ(二)RabbitMQ快速入门


    一、RabbitMQ 概述和安装

    RabbitMQ 是基于 Erlang 语言开发的开源消息通信中间件

    1. RabbitMQ的结构和概念

    RabbitMQ中的几个概念:

    (1)channel:操作MQ的工具

    (2)exchange:路由消息到队列中

    (3)queue:缓存消息

    (4)virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组 

    二、常见消息模型

    MQ的官方文档中给出了5个 MQ 的 Demo 示例,对应了几种不同的用法:

    (1)基本消息队列(BasicQueue)

    (2) 工作消息队列(WorkQueue)

     

     (3)发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:

    1️⃣Fanout Exchange:广播

    2️⃣Direct Exchange:路由

     

    3️⃣Topic Exchange:主题

     

     

    三、快速入门

    1. HelloWorld 案例

    官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:

    (1)publisher:消息发布者,将消息发送到队列 queue

    (2)queue:消息队列,负责接受并缓存消息

    (3)consumer:订阅队列,处理队列中的消息

    (1)基本消息队列的消息发送流程:

    1️⃣建立 connection

    2️⃣创建 channel

    3️⃣利用 channel 声明队列

    4️⃣利用 channel 向队列发送消息

    1. public class PublisherTest {
    2. @Test
    3. public void testSendMessage() throws IOException, TimeoutException {
    4. // 1.建立连接
    5. ConnectionFactory factory = new ConnectionFactory();
    6. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    7. factory.setHost("192.168.150.101");
    8. factory.setPort(5672);
    9. factory.setVirtualHost("/");
    10. factory.setUsername("itcast");
    11. factory.setPassword("123321");
    12. // 1.2.建立连接
    13. Connection connection = factory.newConnection();
    14. // 2.创建通道Channel
    15. Channel channel = connection.createChannel();
    16. // 3.创建队列
    17. String queueName = "simple.queue";
    18. channel.queueDeclare(queueName, false, false, false, null);
    19. // 4.发送消息
    20. String message = "hello, rabbitmq!";
    21. channel.basicPublish("", queueName, null, message.getBytes());
    22. System.out.println("发送消息成功:【" + message + "】");
    23. // 5.关闭通道和连接
    24. channel.close();
    25. connection.close();
    26. }
    27. }

     (2)基本消息队列的消息接收流程:

    1️⃣建立 connection

    2️⃣创建 channel

    3️⃣利用 channel 声明队列

    4️⃣定义 consumer 的消费行为 handleDelivery()

    5️⃣利用 channel 将消费者与队列绑定

    1. public class ConsumerTest {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. // 1.建立连接
    4. ConnectionFactory factory = new ConnectionFactory();
    5. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    6. factory.setHost("192.168.150.101");
    7. factory.setPort(5672);
    8. factory.setVirtualHost("/");
    9. factory.setUsername("itcast");
    10. factory.setPassword("123321");
    11. // 1.2.建立连接
    12. Connection connection = factory.newConnection();
    13. // 2.创建通道Channel
    14. Channel channel = connection.createChannel();
    15. // 3.创建队列
    16. String queueName = "simple.queue";
    17. channel.queueDeclare(queueName, false, false, false, null);
    18. // 4.订阅消息
    19. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
    20. @Override
    21. public void handleDelivery(String consumerTag, Envelope envelope,
    22. AMQP.BasicProperties properties, byte[] body) throws IOException {
    23. // 5.处理消息
    24. String message = new String(body);
    25. System.out.println("接收到消息:【" + message + "】");
    26. }
    27. });
    28. System.out.println("等待接收消息。。。。");
    29. }
    30. }

  • 相关阅读:
    【计算机网络】——传输层
    C++11 auto和decltype
    在vue3中如何使用百度地图API(详细步骤+demo示例)
    使用docker安装RocketMQ
    SQLite System.Data.SQLite和sqlite-net-pcl之间的区别
    深入理解Linux网络技术内幕(九)——中断和网络驱动程序
    CUDA-矩阵乘2
    C++ 中迭代器的使用
    这一次,彻底梳理各种布局问题
    <学习笔记>从零开始自学Python-之-web应用框架Django( 十)通用模板
  • 原文地址:https://blog.csdn.net/yirenyuan/article/details/127903484