• 【初始RabbitMQ】了解和安装RabbitMQ


    RabbitMQ的概念

    RabbitMQ是一个消息中间件:他可以接受并转发消息。例如你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据

    四大核心概念

    生产者:产生数据发送消息的程序

    交换机交换机是RabbitMQ中的一个重要的部件,一方面它接受来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接受到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,这个得有交换机类型决定

    队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但是它们只能存储在的队列中。队列仅受主机内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

    消费者:消费者大多数是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在一个机器上。同一个程序既可以是生产者也可以是消费者

    RabbitMQ核心部分

    各种名词介绍

    Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

    Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

    Connection:publisher/consumer 和 broker 之间的 TCP 连接

    Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是非常大的,效率是很低效的。Channel是在connect内部建立的逻辑连接,如果应用成型鼓支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销

    Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

    Queue:消息最终被送到这里等待 consumer 取走

    Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

    RabbitMQ的安装

    官网地址:rabbitmq.com/download.html

    百度网盘大家自取:

    链接:https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwd=d0jd 
    提取码:d0jd

    文件上传

    以CentOS系统举例:将以上的软件安装使用命令scp

    scp D:\Java学习课件\MQ\软件\rabbitmq-server-3.8.8-1.el7.noarch.rpm root@118.31.6.100:/root

    安装文件

    1. rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    2. yum install socat -y
    3. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

    常用命令(按照以下顺序)

    添加开机启动 RabbitMQ 服务

    chkconfig rabbitmq-server on

    启动服务

    /sbin/service rabbitmq-server start

    查看服务状态

    /sbin/service rabbitmq-server status

     停止服务(选择执行)

    /sbin/service rabbitmq-server stop

    开启 web 管理插件

    rabbitmq-plugins enable rabbitmq_management

    用默认账号密码(guest)访问地址 http://真实IP:15672/出现权限问题使用云服务器的一定要把云服务器的防火墙的打开

    添加一个新的用户

    创建账号

    rabbitmqctl add_user admin 123

    设置用户角色

    rabbitmqctl set_user_tags admin administrator

    设置用户权限

    1. set permissions-p <vhostpath><user><conf> <write><read>
    2. rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

    用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

    当前用户和角色

    rabbitmqctl list_users

    利用 admin 用户登录

    重置命令

    关闭应用的命令

    rabbitmqctl stop_app

    清除的命令

    rabbitmqctl reset

    重新启动命令

    rabbitmqctl start_app 

    Hello World

    引入依赖

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0modelVersion>
    6. <groupId>com.atguigu.rabbitmqgroupId>
    7. <artifactId>rabbitmq-helloartifactId>
    8. <version>1.0-SNAPSHOTversion>
    9. <build>
    10. <plugins>
    11. <plugin>
    12. <groupId>org.apache.maven.pluginsgroupId>
    13. <artifactId>maven-compiler-pluginartifactId>
    14. <configuration>
    15. <source>8source>
    16. <target>8target>
    17. configuration>
    18. plugin>
    19. plugins>
    20. build>
    21. <dependencies>
    22. <dependency>
    23. <groupId>com.rabbitmqgroupId>
    24. <artifactId>amqp-clientartifactId>
    25. <version>5.8.0version>
    26. dependency>
    27. <dependency>
    28. <groupId>commons-iogroupId>
    29. <artifactId>commons-ioartifactId>
    30. <version>2.6version>
    31. dependency>
    32. <dependency>
    33. <groupId>ch.qos.logbackgroupId>
    34. <artifactId>logback-classicartifactId>
    35. <version>1.2.6version>
    36. dependency>
    37. dependencies>
    38. project>

    生产者代码:

    1. package com.rabbitmq.one;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * 生产者:发消息
    10. */
    11. public class Produce {
    12. public static final String QUEUE_NAME="hello";
    13. public static void main(String[] args) throws IOException, TimeoutException {
    14. //创建工厂
    15. ConnectionFactory factory = new ConnectionFactory();
    16. //工厂IP连接rabbitmq
    17. factory.setHost("118.31.6.132");
    18. //用户名
    19. factory.setUsername("admin");
    20. //密码
    21. factory.setPassword("123");
    22. //创建连接
    23. Connection connection = factory.newConnection();
    24. //获取信道
    25. Channel channel = connection.createChannel();
    26. /*
    27. *生成一个队列
    28. * 1.队列名称
    29. * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
    30. * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
    31. * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
    32. * 5.其他参数 延迟消息等
    33. */
    34. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    35. //发消息
    36. String message = "hello world";
    37. /**
    38. * 发送一个消息
    39. * 1.发送到那个交换机
    40. * 2.路由的KEY值是哪个 本次是队列的名称
    41. * 3.其他参数信息
    42. * 4.发送消息的消息体
    43. */
    44. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    45. System.out.println("消息发送完毕!");
    46. }
    47. }

    消费者代码:

    1. package com.rabbitmq.one;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.Collections;
    5. import java.util.concurrent.TimeoutException;
    6. /**
    7. * 消费者
    8. */
    9. public class Consume {
    10. //队列名称
    11. public static final String QUEUE_NAME = "hello";
    12. //接受消息
    13. public static void main(String[] args) throws IOException, TimeoutException {
    14. //创建连接工厂
    15. ConnectionFactory factory = new ConnectionFactory();
    16. factory.setHost("118.31.6.132");
    17. factory.setUsername("admin");
    18. factory.setPassword("123");
    19. Connection connection = factory.newConnection();
    20. Channel channel = connection.createChannel();
    21. //声明 接受消息
    22. DeliverCallback deliverCallback = (consumerTag,message)->{
    23. System.out.println(new String(message.getBody()));
    24. };
    25. //取消消息的回调
    26. CancelCallback cancelCallback = consumerTag->{
    27. System.out.println("消息消费被中断");
    28. };
    29. /**
    30. * 消费者信息
    31. * 1.消费哪个队列
    32. * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
    33. * 3.消费者微车才能更改消费的回调
    34. * 4.消费者取消消费回调
    35. */
    36. channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);
    37. }
    38. }

    运行结果:

  • 相关阅读:
    运营商IMS网间互联互通组网关键技术研究
    MyBatis 源码分析之 Select 语句执行(上)
    vue路由组件传递参数四种方式(九)(布尔模式,命名视图,对象模式,函数模式)详细讲解使用
    我的创作纪念日
    flutter系列之:flutter中常用的Stack layout详解
    全链路压测:构建三大模型
    将CSV文件快速导入MySQL中
    2023/09/21 day5 qt
    DialogFragment方便地完成自定义弹窗
    元素的盒子模型+标签的尺寸大小和偏移量+获取页面滚动距离+JS直接修改标签样式 + 让页面滚动到指定位置
  • 原文地址:https://blog.csdn.net/loss_rose777/article/details/136137958