• 18-基于CentOS7搭建RabbitMQ3.10.7集群镜像队列+HaProxy+Keepalived高可用架构


    集群架构

    虚拟机规划

    IP

    hostname

    节点说明

    端口

    控制台地址

    192.168.247.150

    rabbitmq.master

    rabbitmq master

    5672

    http://192.168.247.150:15672

    192.168.247.151

    rabbitmq.s.o

    rabbitmq slave

    5672

    http://192.168.247.151:15672

    192.168.247.152

    rabbitmq.s.t

    rabbitmq slave

    5672

    http://192.168.247.152:15672

    192.168.247.153

    haproxy.k.m

    haproxy+keepalived

    8100

    http://192.168.247.153:8100/rabbitmq-stats

    192.168.247.154

    haproxy.k.s

    haproxy+keepalived

    8100

    http://192.168.247.154:8100/rabbitmq-stats

    镜像模式

    • 镜像模式: 集群模式非常经典的就是Mirror镜像模式, 保证100%数据不丢失, 在实际工作中也是用的最多的, 并且实现集群非常的简单, 一般互联网大厂都会构建这种镜像集群模式
    • Mirror镜像队列, 目的是为了保证RabbitMQ数据的高可用性解决方案, 主要就是实现数据的同步, 一般来讲是2-3个节点实现数据同步[一般都是3节点+(奇数个节点)](对于100%数据可靠性解决方案一般都是3节点)集群架构如下

    服务器规划

    • 架构: RabbitMQ Cluster + Queue HA + Haproxy + Keepalived
    • 解释: 3台rabbitMQ服务器构建broker集群,允许任意2台服务器故障而服务不受影响,在此基础上,通过Queue HA (queue mirror)实现队列的高可用,在本例中镜像到所有服务器节点(即1个master,2个slave);为保证客户端访问入口地址的唯一性,通过haproxy做4层代理来提供MQ服务,并通过简单的轮询方式来进行负载均衡,设置健康检查来屏蔽故障节点对客户端的影响;使用2台haproxy并且通过keepalived实现客户端访问入口的高可用机制。

    服务架构设计

    参考资料

    官方文档手册:

    集群配置文档:Clustering Guide — RabbitMQ

    镜像队列文档:Classic Queue Mirroring — RabbitMQ

    集群操作文档:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

    中文版AMQP用户手册:

    Spring AMQP文档:http://www.blogjava.net/qbna350816/category/55056.html?Show=All

    事务文档:http://www.blogjava.net/qbna350816/archive/2016/08/13/431567.html

    集群架构搭建

    配置HOST[5台]

    5台服务器配置Host, 参考虚拟机规划

    复制代码
    vi /etc/hostname
    150改为rabbitmqmaster
    151改为rabbitmqso
    152改为rabbitmqst
    153改为haproxykm
    154改为haproxyks
    
    vi /etc/hosts
    192.168.247.150 rabbitmqmaster
    192.168.247.151 rabbitmqso
    192.168.247.152 rabbitmqst
    192.168.247.153 haproxykm
    192.168.247.154 haproxyks
    复制代码

    都修改完成后重启: 一定要重启, 我在这里就碰到了个大坑, 应为没有重启, 所以导致hostname没有生效 添加集群节点一致报错,我丢 重启后关闭防火墙

    安装依赖环境[5台]

    yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

    安装RabbitMQ[三台]

    150,151,152

    # 我用的直接就是3.10.7版本, 我没有找到老版本的, 而且现在应该也都是新版本的了
    1: 安装参考安装与启动, 建议三台同时进行
    2: 关闭5台电脑的防火墙

    官方的部署文档, 应为是英文的, 看起来优点蒙圈, 哎, 探索新版本总是困难的, 但是也总要有人前行

    访问控制台[三台]

    在虚拟机规划中有控制台地址

    不知道为什么, 150,151,可以访问, 152又提示不是私密连接, 我就又创建了一个账户就可以了

    150 toor 123456
    
    151 toor 123456
    
    152 toor 123456

    重启所有的RabbitMQ[三台]

    复制代码
    # 停止
    rabbitmqctl stop
    # 检测端口
    lsof -i:5672
    lsof -i:15672
    # 启动
    rabbitmq-server -detached
    复制代码

    文件同步步骤[150->151,152]

    因为我是最新版本安装的所以.erlang.cookie并没有在var/lib/rabbitmq下面, 而是在root下

    scp /root/.erlang.cookie 192.168.247.151:/root/
    scp /root/.erlang.cookie 192.168.247.152:/root/

    选择150,151,152任意一个节点为Master, 我选择150, 也就是说需要把150的cookie文件同步到151,152节点上, 进入/root目录, ll -a才可以看到隐藏文件, 使用上面的命令发送就可以

    注意: 需要在三台服务器RabbitMQ都启动的情况下, 去同步

    节点加入集群[151,152]

    150直接启动即可

    # 启动
    rabbitmq-server -detached

    151,152, 将自身加入集群

    # 停止app
    rabbitmqctl stop_app
    # 将节点加入集群 --ram是采用内存的方式 151,采用内存, 152不加, 默认使用磁盘
    rabbitmqctl join_cluster --ram rabbit@rabbitmqmaster
    # 启动app
    rabbitmqctl start_app

    控制台查看集群

    查看集群状态

    # 任意节点
    rabbitmqctl cluster_status

    配置镜像队列

    设置镜像队列策略

    # 任意节点
    rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

    将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致,RabbitMQ高可用集群就已经搭建好了,我们可以重启服务,查看其队列是否在从节点同步

    移除集群节点[扩展]

    // 在要移除的节点上执行
    rabbitmqctl forget_cluster_node rabbit@bhz71

    修改集群名称(默认为第一个node名称)[扩展]

    # 任意节点
    rabbitmqctl set_cluster_name rabbitmq_cluster1

    RabbitMQ配置位置

    我一般不修改, 正式环境有运维, 开发的话, 能跑就行

    /usr/local/rabbitmq/plugins/rabbit-3.10.7/ebin/rabbit.app

    测试集群

    测试队列同步

    在150上新建队列

    队列开始同步

    同步镜像队列两个

    151,152都已经同步

    安装Ha-Proxy[153,154]

    简介

    • HAProxy是一款提供高可用性、负载均衡以及基于TCP和HTTP应用的代理软件,HAProxy是完全免费的、借助HAProxy可以快速并且可靠的提供基于TCP和HTTP应用的代理解决方案。
    • HAProxy适用于那些负载较大的web站点,这些站点通常又需要会话保持或七层处理。
    • HAProxy可以支持数以万计的并发连接,并且HAProxy的运行模式使得它可以很简单安全的整合进架构中,同时可以保护web服务器不被暴露到网络上。

    安装

    复制代码
    # 下载
    wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.6.tar.gz
    # 解压
    tar -zxvf haproxy-1.6.6.tar.gz
    # 进入文件夹
    cd haproxy-1.6.6/
    # 编译
    make target=linux31 prefix=/usr/local/haproxy
    # 安装
    make install prefix=/usr/local/haproxy
    # 创建文件夹
    mkdir /etc/haproxy
    # 权限
    groupadd -r -g 149 haproxy
    useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
    # 创建配置文件
    touch /etc/haproxy/haproxy.cfg
    # haproxy 配置文件haproxy.cfg详解
    # 编辑配置文件
    vi /etc/haproxy/haproxy.cfg
    复制代码

    配置文件

    复制代码
    #logging options
    global
        log 127.0.0.1 local0 info
        maxconn 5120
        chroot /usr/local/haproxy
        uid 99
        gid 99
        daemon
        quiet
        nbproc 20
        pidfile /var/run/haproxy.pid
    
    defaults
        log global
        #使用4层代理模式,”mode http”为7层代理模式
        mode tcp
        #if you set mode to tcp,then you nust change tcplog into httplog
        option tcplog
        option dontlognull
        retries 3
        option redispatch
        maxconn 2000
        contimeout 10s
         ##客户端空闲超时时间为 60秒 则HA 发起重连机制
         clitimeout 10s
         ##服务器端链接超时时间为 15秒 则HA 发起重连机制
         srvtimeout 10s    
    #front-end IP for consumers and producters
    
    listen rabbitmq_cluster
        bind 0.0.0.0:5672
        #配置TCP模式
        mode tcp
        #balance url_param userid
        #balance url_param session_id check_post 64
        #balance hdr(User-Agent)
        #balance hdr(host)
        #balance hdr(Host) use_domain_only
        #balance rdp-cookie
        #balance leastconn
        #balance source //ip
        #简单的轮询
        balance roundrobin
        #rabbitmq集群节点配置 #inter 每隔五秒对mq集群做健康检查, 2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
            server rabbitmqmaster 192.168.247.150:5672 check inter 5000 rise 2 fall 2
            server rabbitmqso 192.168.247.151:5672 check inter 5000 rise 2 fall 2
            server rabbitmqst 192.168.247.152:5672 check inter 5000 rise 2 fall 2
    #配置haproxy web监控,查看统计信息
    listen stats
        # 154改为154
        bind 192.168.247.153:8100
        mode http
        option httplog
        stats enable
        #设置haproxy监控地址为http://localhost:8100/rabbitmq-stats
        stats uri /rabbitmq-stats
        stats refresh 5s
    复制代码

    启动

    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

    查看服务

    http://192.168.247.153:8100/rabbitmq-stats
    http://192.168.247.154:8100/rabbitmq-stats

    停止[扩展]

    killall haproxy
    ps -ef | grep haproxy
    netstat -tunpl | grep haproxy
    ps -ef |grep haproxy |awk '{print $2}'|xargs kill -9

    安装Keepalived[153,154]

    简介

    • 之前在Nginx已经用过Keepalived了
    • Keepalived,它是一个高性能的服务器高可用或热备解决方案,Keepalived主要来防止服务器单点故障的发生问题,可以通过其与Nginx、Haproxy等反向代理的负载均衡服务器配合实现web服务端的高可用。Keepalived以VRRP协议为实现基础,用VRRP协议来实现高可用性(HA).VRRP(Virtual Router Redundancy Protocol)协议是用于实现路由器冗余的协议,VRRP协议将两台或多台路由器设备虚拟成一个设备,对外提供虚拟路由器IP(一个或多个)。

    安装

    我直接采用2.0.18版本, 上传到linux

    复制代码
    tar -zxvf keepalived-2.0.18.tar.gz
    
    cd keepalived-2.0.18/
    
    yum install libnl libnl-devel
    
    ./configure \
    --prefix=/usr/local/keepalived \
    --sysconf=/etc
    
    make && make install
    
    cd /etc/keepalived/
    
    vi keepalived.conf
    复制代码

    153配置

    复制代码
    ! Configuration File for keepalived
    
    global_defs {
       router_id haproxykm  ##标识节点的字符串,通常为hostname
    
    }
    
    vrrp_script chk_haproxy {
        script "/etc/keepalived/haproxy_check.sh"  ##执行脚本位置
        interval 2  ##检测时间间隔
        weight -20  ##如果条件成立则权重减20
    }
    
    vrrp_instance VI_1 {
        state MASTER  ## 主节点为MASTER,备份节点为BACKUP
        interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eth0)
        virtual_router_id 74  ## 虚拟路由ID号(主备节点一定要相同)
        mcast_src_ip 192.168.247.153 ## 本机ip地址
        priority 100  ##优先级配置(0-254的值)
        nopreempt
        advert_int 1  ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
            authentication {  ## 认证匹配
            auth_type PASS
            auth_pass bhz
        }
    
        track_script {
            chk_haproxy
        }
    
        virtual_ipaddress {
            192.168.247.160  ## 虚拟ip,可以指定多个
        }
    }
    复制代码

    154配置

    复制代码
    ! Configuration File for keepalived
    
    global_defs {
       router_id haproxyks  ##标识节点的字符串,通常为hostname
    
    }
    
    vrrp_script chk_haproxy {
        script "/etc/keepalived/haproxy_check.sh"  ##执行脚本位置
        interval 2  ##检测时间间隔
        weight -20  ##如果条件成立则权重减20
    }
    
    vrrp_instance VI_1 {
        state BACKUP  ## 主节点为MASTER,备份节点为BACKUP
        interface ens33 ## 绑定虚拟IP的网络接口(网卡),与本机IP地址所在的网络接口相同(我这里是eth0)
        virtual_router_id 74  ## 虚拟路由ID号(主备节点一定要相同)
        mcast_src_ip 192.168.247.154 ## 本机ip地址
        priority 80  ##优先级配置(0-254的值)
        nopreempt
        advert_int 1  ## 组播信息发送间隔,俩个节点必须配置一致,默认1s
            authentication {  ## 认证匹配
            auth_type PASS
            auth_pass bhz
        }
    
        track_script {
            chk_haproxy
        }
    
        virtual_ipaddress {
            192.168.247.160  ## 虚拟ip,可以指定多个
        }
    }
    复制代码

    编写自动检测脚本

    touch /etc/keepalived/haproxy_check.sh
    
    chmod +x /etc/keepalived/haproxy_check.sh
    
    vi /etc/keepalived/haproxy_check.sh

    脚本内容

    复制代码
    #!/bin/bash
    COUNT=`ps -C haproxy --no-header |wc -l`
    if [ $COUNT -eq 0 ];then
        /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
        sleep 2
        if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
            killall keepalived
        fi
    fi
    复制代码

    启动

    复制代码
    # 记得先启动haproxy, 再启动Keepalived
    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    
    # 查看进程
    ps -ef | grep haproxy
    
    # 将Keepalived注册为系统服务
    cd /home/software/keepalived-2.0.18/keepalived/etc/
    cp init.d/keepalived /etc/init.d/
    cp sysconfig/keepalived /etc/sysconfig/
    systemctl daemon-reload
    
    # 启动Keepalived
    systemctl [start | stop | status | restart] keepalived
    systemctl start keepalived.service
    systemctl stop keepalived.service
    
    # 查看服务
    ps -ef|grep keepalived
    复制代码

    高可用测试

    160默认在153主Keepalived上

    节点宕机测试

    停止153上的keepalived

    查看153 ip

    160的vip已经不再了, 查看154

    已经自动绑定154了, 重启153

    重启后160重新漂移回153上, 高可用测试ok

    队列功能代码测试

    直接使用最简单的hello world程序测试, IP使用虚拟的VIP 160

    消费者

    复制代码
    package com.dance.redis.mq.rabbit.helloworld;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class Receiver {
        private static final String QUEUE_NAME = "queue-test";
        private static final String IP_ADDRESS = "192.168.247.160";
        private static final int PORT = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException,
                InterruptedException {
            Address[] address = new Address[]{
                    new Address(IP_ADDRESS, PORT)
            };
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("toor");
            factory.setPassword("123456");
            // 这里的连接方式与生产者的demo略有不同,注意区别。
            Connection connection = factory.newConnection(address); //创建连接
            final Channel channel = connection.createChannel();//创建信道
            channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException {
                    System.out.println("recvive message:" + new String(body));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, consumer);
            //等待回调函数执行完毕之后,关闭资源。
            TimeUnit.SECONDS.sleep(50);
            channel.close();
            connection.close();
        }
    }
    复制代码

    生产者

    复制代码
    package com.dance.redis.mq.rabbit.helloworld;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
     
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
     
    public class RabbitProducer {
        private static final String EXCHANGE_NAME = "exchange-test";
        private static final String ROUTING_KEY = "text.*";
        private static final String QUEUE_NAME = "queue-test";
        private static final String IP_ADDRESS = "192.168.247.160";
        private static final int PORT = 5672; //RabbitMQ服务默认端口为5672
     
        public static void main(String[] args) throws IOException,
                TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setVirtualHost("/");
            factory.setUsername("toor");
            factory.setPassword("123456");
            Connection connection = factory.newConnection();//创建连接
            Channel channel = connection.createChannel();//创建信道
            //创建一个type="topic"、持久化的、非自动删除的交换器。
            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
            //创建一个持久化、非排他的、非自动删除的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //将交换机与队列通过路由键绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            //发送一条持久化消息:Hello World!
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes());
            //关闭资源
            channel.close();
            connection.close();
        }
        
    }
    复制代码

    我的账户和密码是toor和123456, 请改为自己的

    启动消费者

    启动生产者

    查看消费者

    消费成功, ip也是160

    查看控制台 exchange

    多了exchange-test 功能是ha-all

    队列也是, 到此镜像队列+高可用已经实现

    也可以在HA的控制台上查看统计

    集群架构回顾

    我们实现的是RabbitMQ Cluster + Mirror Queue + Haproxy + Keepalived

    RabbitMQ Cluster 3台

    Mirror Queue RabbitMQ集群方式

    Haproxy 反向代理

    Keepalived Haproxy集群检测, 虚拟VIP, 实现统一IP对外提供

    架构图手绘

    emm, 就是这样一个架构, 我应该李姐的挺到位的

  • 相关阅读:
    【详细学习SpringBoot自动装配原理分析之核心流程初解析-1】
    2022年全球光纤连接市场将达50.1亿美元
    html菜单的基本制作
    【微服务专题】Nacos详解
    Python练习
    循序渐进介绍基于CommunityToolkit.Mvvm 和HandyControl的WPF应用端开发(3)--自定义用户控件
    Flask之路由(app.route)详解
    Studio One6.5安装源码教程
    RollBack Rx Professional RMC 安装教程
    基于SURF算法实现图像配准附Matlab代码
  • 原文地址:https://www.cnblogs.com/flower-dance/p/16754845.html