• RabbitMQ 消息中间件


    消息中间件

    1、简介

    消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

    当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。

    2、作用

    1、消息中间件主要作用

    • 冗余(存储)

    • 扩展性

    • 可恢复性

    • 顺序保证

    • 缓冲

    • 异步通信

    2、消息中间件的两种模式

    1、P2P模式

    P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

    P2P的特点:
    • 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中

    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列

    • 接收者在成功接收消息之后需向队列应答成功

    • 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式

    2、Pub/Sub模式

    Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

    Pub/Sub的特点:

    • 每个消息可以有多个消费者

    • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息

    • 为了消费消息,订阅者必须保持运行的状态

    • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

    3、常用中间件介绍与对比

    1、Kafka

    Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

    2、RabbitMQ

    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    3、RocketMQ

    RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

    RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

    RabbiMQ

    RabbiMQ简介

    RabbiMQ是⽤Erang开发的,集群⾮常⽅便,因为Erlang天⽣就是⼀⻔分布式语⾔,但其本身并不⽀持负载均衡。支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    2、RabbitMQ 特点

    • 可靠性

    • 扩展性

    • 高可用性

    • 多种协议

    • 多语言客户端

    • 管理界面

    • 插件机制

    3、什么是消息队列

    MQ 全称为Message Queue, 。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

    消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。

    RabbiMQ模式

    注意:RabbitMQ模式⼤概分为以下三种:

    (1)单⼀模式。

    (2)普通模式(默认的集群模式)。

    (3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA⽅案,在对业务可靠性要求较⾼的场合中⽐较适⽤)。要实现镜像模式,需要先搭建⼀个普通集群模式,在这个模式的基础上再配置镜像模式以实现⾼可⽤。

    了解集群中的基本概念:

    RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

    一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的。

    ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
    Channel(信道):消息推送使用的通道;
    Exchange(交换器):用于接受、分配消息;
    Queue(队列):用于存储生产者的消息;
    RoutingKey(路由键):用于把生成者的数据分配到交换器上;
    BindingKey(绑定键):用于把交换器的消息绑定到队列上;
    Broker:简单来说就是消息队列服务器实体
    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离.
    producer:消息生产者,就是投递消息的程序。
    consumer:消息消费者,就是接受消息的程序。
    user:用户
    

    安装Rabbitmq软件

    关闭防火墙与selinux

    上传了俩个包直接上传下载即可

    1. rz
    2. 上传上来,然后
    3. 安装依赖,安装rabbitmq
    4. [root@localhost ~]# ls
    5. erlang-21.3.8.21-1.el7.x86_64.rpm rabbitmq-server-3.7.10-1.el7.noarch.rpm
    6. [root@localhost ~]# yum install -y epel-release gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
    7. [root@localhost ~]# yum -y install erlang-21.3.8.21-1.el7.x86_64.rpm
    8. [root@localhost ~]# yum -y install rabbitmq-server-3.7.10-1.el7.noarch.rpm

    测试

    1. [root@localhost ~]# erl
    2. Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
    3. Eshell V9.3 (abort with ^G)
    4. 1>

    启动

    1. 启动,用systemctl管理
    2. [root@rabbitmq-1 ~]# systemctl daemon-reload
    3. [root@rabbitmq-1 ~]# systemctl start rabbitmq-server
    4. [root@rabbitmq-1 ~]# systemctl enable rabbitmq-server
    5. 启动方式二:
    6. [root@rabbitmq-1 ~]# /sbin/service rabbitmq-server status ---查看状态
    7. [root@rabbitmq-1 ~]# /sbin/service rabbitmq-server start ---启动

    每台都操作开启rabbitmq的web访问界面:

    1. [root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
    2. Enabling plugins on node rabbit@localhost:
    3. rabbitmq_management
    4. The following plugins have been configured:
    5. rabbitmq_management
    6. rabbitmq_management_agent
    7. rabbitmq_web_dispatch
    8. Applying plugin configuration to rabbit@localhost...
    9. The following plugins have been enabled:
    10. rabbitmq_management
    11. rabbitmq_management_agent
    12. rabbitmq_web_dispatch
    13. started 3 plugins.

    创建用户

    1. 添加用户和密码
    2. [root@localhost ~]# rabbitmqctl add_user zyq 123456
    3. Adding user "zyq" ...
    4. 设置管理员
    5. [root@localhost ~]# rabbitmqctl set_user_tags zyq administrator
    6. Setting tags for user "zyq" to [administrator] ...
    7. 查看用户
    8. [root@localhost ~]# rabbitmqctl list_users
    9. Listing users ...
    10. user tags
    11. zyq [administrator]
    12. guest [administrator]
    13. 此处设置权限时注意'.*'之间需要有空格 三个'.*'分别代表了conf权限,read权限与write权限 例如:当没有给
    14. newrain设置这三个权限前是没有权限查询队列,在ui界面也看不见
    15. [root@localhost ~]# rabbitmqctl set_permissions -p "/" zyq ".*" ".*" ".*"
    16. Setting permissions for user "zyq" in vhost "/" ...

    开启用户远程登录

    1. [root@rabbitmq-1 ~]# cd /etc/rabbitmq/
    2. [root@rabbitmq-1 rabbitmq]# cp /usr/share/doc/rabbitmq-server-3.7.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    3. [root@rabbitmq-1 rabbitmq]# ls
    4. enabled_plugins rabbitmq.config
    5. [root@rabbitmq-1 rabbitmq]# vim rabbitmq.config
    6. 修改如下:

    将61行的注释打开,并将最后行尾的 逗号 删除即可。

    重启服务

    systemctl restart rabbitmq-server

    查看端口

    1. [root@localhost rabbitmq]# netstat -nplt
    2. Active Internet connections (only servers)
    3. Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
    4. tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 14396/beam.smp
    5. tcp 0 0 0.0.0.0:4369 0.0.0.0:* LISTEN 14592/epmd
    6. tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 917/sshd
    7. tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 14396/beam.smp
    8. tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 1137/master
    9. tcp6 0 0 :::5672 :::* LISTEN 14396/beam.smp
    10. tcp6 0 0 :::4369 :::* LISTEN 14592/epmd
    11. tcp6 0 0 :::22 :::* LISTEN 917/sshd
    12. tcp6 0 0 ::1:25 :::* LISTEN 1137/master

    4369 -- erlang发现端口
    5672 --程序连接端口
    15672 -- 管理界面ui端口
    25672 -- server间内部通信口

    访问:192.168.231.192:15672

    这里需要注意:

    rabbitmq默认管理员用户:guest   密码:guest

    新添加的用户为:newrain 密码:123456

    压力测试

    1. 第一步是获取源代码 ,执行完在前台运行,直接打开浏览器即可
    2. git clone https://gitea.beyourself.org.cn/newrain001/rabbitmq-test.git && \
    3. cd rabbitmq-test && yum install -y python3 python3-devel && \
    4. pip3 install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple && \
    5. export FLASK_ENV=development ; flask run --reload -p 80 -h 0.0.0.0

    观察

    开始部署集群 三台机器

    192.168.231.192  rabbit-1

    192.168.231.193  rabbit-2

    192.168.231.194  rabbit-3

    1. 安装上面的步骤
    2. 进行域名解析
    3. [root@rabbit-1 ~]# cat /etc/hosts
    4. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
    5. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
    6. 10.36.192.100 package.qf.com
    7. 192.168.231.192 rabbit-1
    8. 192.168.231.193 rabbit-2
    9. 192.168.231.194 rabbit-3
    10. 复制到另外俩台服务器
    11. scp /etc/hosts/ 192.168.231.193:/etc/hosts
    12. scp /etc/hosts/ 192.168.231.194:/etc/hosts

    创建数据存放目录和日志存放目录。三台机器都做相同的操作

    1. [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/data
    2. [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/logs
    3. [root@rabbitmq-1 ~]# chmod 777 -R /data/rabbitmq
    4. [root@rabbitmq-1 ~]# chown rabbitmq.rabbitmq /data/ -R
    5. 创建配置文件
    6. [root@rabbitmq-1 ~]# vim /etc/rabbitmq/rabbitmq-env.conf
    7. [root@rabbitmq-1 ~]# cat /etc/rabbitmq/rabbitmq-env.conf
    8. RABBITMQ_MNESIA_BASE=/data/rabbitmq/data
    9. RABBITMQ_LOG_BASE=/data/rabbitmq/logs
    10. 重启服务
    11. [root@rabbitmq-1 ~]# systemctl restart rabbitmq-server

    Rabbitmq的集群是依附于erlang的集群来⼯作的,所以必须先构建起erlang的集群景象。Erlang的集群中

    各节点是经由过程⼀个magic cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,⽂件是400的权限。所以必须保证各节点cookie⼀致,不然节点之间就⽆法通信.

    如果执行# rabbitmqctl stop_app 这条命令报错:需要执行

    1. #如果执行# rabbitmqctl stop_app 这条命令报错:需要执行
    2. #chmod 400 .erlang.cookie
    3. #chown rabbitmq.rabbitmq .erlang.cookie

    (官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。)

    1. [root@rabbit-1 ~]# cat /var/lib/rabbitmq/.erlang.cookie
    2. ISOGGRKUABBYNHYKQKZN
    3. ⽤scp的⽅式将rabbitmq-1节点的.erlang.cookie的值复制到其他两个节点中。
    4. [root@rabbit-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.231.193:/var/lib/rabbitmq/
    5. [root@rabbit-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.231.194:/var/lib/rabbitmq/

    将mq-2、mq-3作为内存节点加⼊mq-1节点集群中

    1. 在mq-2、mq-3执⾏如下命令:
    2. [root@rabbitmq-2 ~]# systemctl restart rabbitmq-server
    3. [root@rabbitmq-2 ~]# rabbitmqctl stop_app
    4. [root@rabbitmq-2 ~]# rabbitmqctl reset
    5. [root@rabbitmq-2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
    6. Clustering node 'rabbit@rabbitmq-2' with 'rabbit@rabbitmq-1' ...
    7. [root@rabbitmq-2 ~]# rabbitmqctl start_app
    8. Starting node 'rabbit@rabbitmq-2' ...
    9. ======================================================================
    10. [root@rabbitmq-3 ~]# systemctl restart rabbitmq-server
    11. [root@rabbitmq-3 ~]# rabbitmqctl stop_app
    12. Stopping node 'rabbit@rabbitmq-3' ...
    13. [root@rabbitmq-3 ~]# rabbitmqctl reset
    14. Resetting node 'rabbit@rabbitmq-3' ...
    15. [root@rabbitmq-3 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
    16. Clustering node 'rabbit@rabbitmq-3' with 'rabbit@rabbitmq-1' ...
    17. [root@rabbitmq-3 ~]# rabbitmqctl start_app
    18. Starting node 'rabbit@rabbitmq-3' ...
    19. 1)默认rabbitmq启动后是磁盘节点,在这个cluster命令下,mq-2和mq-3是内存节点,
    20. mq-1是磁盘节点。
    21. 2)如果要使mq-2、mq-3都是磁盘节点,去掉--ram参数即可。
    22. 3)如果想要更改节点类型,可以使⽤命令rabbitmqctl change_cluster_node_type
    23. disc(ram),前提是必须停掉rabbit应⽤
    24. 注:
    25. #如果有需要使用磁盘节点加入集群
    26. [root@rabbitmq-2 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1
    27. [root@rabbitmq-3 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1

    执行rabbitmqctl  stop_app 停止节点 出现报错,将终端退出 然后重新登录即可

    最好将三个服务器都退出终端 重新登录

    查看集群状态

    在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。
    在mq-1磁盘节点上面查看

    1. [root@rabbit-1 ~]# rabbitmqctl cluster_status
    2. Cluster status of node rabbit@rabbit-1 ...
    3. [{nodes,[{disc,['rabbit@rabbit-1']},
    4. {ram,['rabbit@rabbit-3','rabbit@rabbit-2']}]},
    5. {running_nodes,['rabbit@rabbit-2','rabbit@rabbit-3','rabbit@rabbit-1']},
    6. {cluster_name,<<"rabbit@rabbit-1">>},
    7. {partitions,[]},
    8. {alarms,[{'rabbit@rabbit-2',[]},
    9. {'rabbit@rabbit-3',[]},
    10. {'rabbit@rabbit-1',[]}]}]

    在web页面查看

    登录rabbitmq web管理控制台,创建新的队列

    只有一个

    在RabbitMQ集群集群中,必须⾄少有⼀个磁盘节点,否则队列元数据⽆法写⼊到集群中,当

    磁盘节点宕掉时,集群将⽆法写⼊新的队列元数据信息。

    创建镜像集群:

    1. rabbitmqctl set_permissions ".*" ".*" ".*"
    2. (后面三个”*”代表用户拥有配置、写、读全部权限)
    1. [root@rabbitmq-1 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
    2. [root@rabbitmq-2 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
    3. [root@rabbitmq-3 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

    再次查看队列已经同步到其他两台节点:

    则此时镜像队列设置成功。(这里的虚拟主机是代码中需要用到的虚拟主机,虚拟主机的作用是做一个消息的隔离,本质上可认为是一个rabbitmq-server,是否增加虚拟主机,增加几个,这是由开发中的业务决定,即有哪几类服务,哪些服务用哪一个虚拟主机,这是一个规划)。

    安装并且配置负载均衡器HA

    如果使用阿里云,可以使用阿里云的内网slb来实现负载均衡,不用自己搭建HA。

    在192.168.231.192上安装haproxy

    yum -y install haproxy
    

    修改 /etc/haproxy/haproxy.cfg

    1. 备份配置文件,防止出错
    2. [root@rabbitmq-1 ~]# cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
    3. 编写配置文件
    4. [root@rabbitmq-1 ~]# vim /etc/haproxy/haproxy.cfg
    5. global
    6. log 127.0.0.1 local2
    7. chroot /var/lib/haproxy
    8. pidfile /var/run/haproxy.pid
    9. maxconn 4000
    10. user haproxy
    11. group haproxy
    12. nbproc 4
    13. daemon
    14. # turn on stats unix socket
    15. stats socket /var/lib/haproxy/stats
    16. #---------------------------------------------------------------------
    17. defaults
    18. mode http
    19. log global
    20. retries 3
    21. timeout connect 10s
    22. timeout client 1m
    23. timeout server 1m
    24. timeout check 10s
    25. maxconn 2048
    26. #---------------------------------------------------------------------
    27. ##监控查看本地状态#####
    28. listen admin_stats
    29. bind *:80 ####与python3端口冲突,后续如果做python3实验可以改端口
    30. mode http
    31. option httplog
    32. option httpclose
    33. log 127.0.0.1 local0 err
    34. stats uri /haproxy
    35. stats auth admin:123456 ####登录监控的用户 密码
    36. stats refresh 30s
    37. ####################################
    38. ###反代监控
    39. frontend server
    40. bind *:5670 ####haproxy的监听端口
    41. log global
    42. mode tcp
    43. #option forwardfor
    44. default_backend rabbitmq
    45. maxconn 3
    46. backend rabbitmq
    47. mode tcp
    48. log global
    49. balance roundrobin
    50. server rabbitmq1 192.168.231.192:5672 check inter 2000s rise 2 fall 3
    51. server rabbitmq2 192.168.231.193:5672 check inter 2000s rise 2 fall 3
    52. server rabbitmq3 192.168.231.194:5672 check inter 2000s rise 2 fall 3
    53. 以上三行是负载均衡rabbitmq集群的IP

    重启haproxy

     systemctl start haproxy

    浏览器输入192.168.231.192/haproxy  查看rabbitmq状态

  • 相关阅读:
    Servlet实现一个简单的表白墙网站
    kafka消费的完整解决方案
    面试突击77:Spring 依赖注入有几种?各有什么优缺点?
    揭秘:为什么那么多公司都要招聘测试开发?
    校园论坛系统设计
    HTTPS(对称加密+非对称加密+证书)
    第一章:Spring流程执行步骤
    MP3算法及代码例程
    一文学会Canal怎么用
    【雷达检测】基于复杂环境下的雷达目标检测技术(Matlab代码实现)
  • 原文地址:https://blog.csdn.net/m0_59933574/article/details/134271046