• rabbitmq安装部署和常用命令


     


     python操作rabbitmq

    rabbitmq实现可以使用java或者springboot的封装方法,自己创建实现,也可以使用中间件实现,相对于自建,使用rabbitmq应用场景及使用更系统安全。本文具体介绍rabbitmq中间件部署。

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ
    在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景

    以下本人linux部署步骤:

    安装erlang(rabbitmq需要该语言运行)

    配置相关依赖环境

    ##yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget
    yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel socat

    下载安装包(本人rabbitmq要求的版本是22以上)

    cd  /home/erlang
     wget http://www.erlang.org/download/otp_src_23.3.tar.gz

    解压并安装

    tar -xzvf otp_src_23.3.tar.gz
    cd otp_src_23.3
    ./configure --prefix=/usr/local/erlang
    make && make install

    配置环境

    复制代码
    vim /etc/profile
    
    ###为文件添加值start:
    ERL_PATH=/home/erlang/otp_src_23.3/bin
    PATH=$ERL_PATH:$PATH
    ###end
    
    source /etc/profile  使配置生效,在shell中使用
    复制代码

    查看安装情况

    erl -version 查看版本号
    #启动erlang命令
    ./bin/erl

     

    下载rabbitmq安装包

    rabbitmq官网下载地址:https://www.rabbitmq.com/ 下载压缩包(注意下载linux版的,类似:rabbitmq-server-3.9.6-1.el7.noarch.rpm
    )本人下载的 centos7相应版本:在window机中下载相关包上传到linux机中

    https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el7.noarch.rpm

    rpm包安装后没具象文件位置集,相关文件应该是融入主文件里其他位置中,使用模糊查询,查看安装位置,如: rpm -aq rabbitmq*

    安装rabbitmq

    ##安装命令
    rpm -Uvh rabbitmq-server-3.9.6-1.el7.noarch.rpm 
    ##若报错:erlang >= 23.2 被 rabbitmq-server-3.9.6-1.el7.noarch 需要;可使用如下命令:(nodeps是忽视依赖关系,可不加)
    rpm -ivh --nodeps rabbitmq-server-3.8.16-1.el7.noarch --force --nodeps

    启动rabbitmq

    复制代码
     service rabbitmq-server start
    # 配置用户及WEBUI插件
    ##启用插件:
    rabbitmq-plugins enable rabbitmq_management
    ##添加用户:
     rabbitmqctl add_user admin admin
    ##设置用户tag:
     rabbitmqctl set_user_tags admin administrator
    ##赋予用户默认vhost的全部操作权限:
     rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    
    ##开机自动启动
    chkconfig rabbitmq-server on
    复制代码

     找不到。启动程序找不到erl命令,那么添加erlang安装的bin目录,我这里是编译安装的,目录如下。

    vim /usr/lib/rabbitmq/bin/rabbitmq-server

    在set -e 下面加两行

    #erlang
    export PATH=$PATH:/opt/otp_src_23.3/bin

    无法访问页面,查看message日志,发现命令没有发现,rabbitmq失败 

    查看位置 

     在环境文件前面也加上他的环境变量

     这下重启没有报错了

     

    添加admin用户

    http://localhost:15672 登录,无法直接使用远程ip登录访问,需要进行配置,在配置之前先创建一个admin账号,并进行授权:

    查看用户 :rabbitmqctl list_users

    创建一个admin用户:rabbitmqctl add_user admin admin

    用户授权 :rabbitmqctl set_user_tags admin administrator

                #  rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"  #这条命令好像不用执行就可以

    创建用户之后需要进行配置,若使用rpm安装,则修改配置/etc/rabbitmq/rabbitmq.config,配置内容大致如下:

    复制代码
    [
    {rabbit,
    [%%
    %% Network Connectivity
    %% ====================
    %%
    %% By default, RabbitMQ will listen on all interfaces, using
    %% the standard (reserved) AMQP port.
    %%
    {tcp_listeners, [5672]},
    {loopback_users, ["admin"]}
    ]}
    ].
    复制代码

    最后面那个点必填的

    配置之后,如果rabbitmq无法正常启动,出现错误:

    init terminating in do_boot ()
    
    Crash dump is being written to: erl_crash.dump...done

    此时有可能是配置文件配置有问题,确保配置文件没问题之后,重启即可。

     

     可以看到,没有那个配置文件,创建并写上面的配置,然后重启rabbitmq,可以看到rabbitmq服务状态中,显示使用这个新建的配置

     再次查看用户,可以看到多了个admin用户

     

    开启管理页面

    需要开启15672 web管理页面端口,需要执行命令 ,不然页面无法访问,这个端口没有监听

    rabbitmq-plugins enable rabbitmq_management

     

     

    访问Rabbitmq http://xxx.xxx.xxx.xxx:15672/
    默认账号密码:guest guest 只能在本地访问
    使用新添用户:admin admin
    访问即可,创建成功:

     
    admin admin 进入的

     

     

     

     

     

     

     
     
     

     

    其他命令

    1、可以查看服务状态:
    [root@localhost ~]# rabbitmqctl status

    [root@mcw14 opt]# rabbitmqctl status
    Status of node rabbit@mcw14 ...
    Runtime
    
    OS PID: 114466
    OS: Linux
    Uptime (seconds): 1512
    Is under maintenance?: false
    RabbitMQ version: 3.8.16
    Node name: rabbit@mcw14
    Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
    Erlang processes: 371 used, 1048576 limit
    Scheduler run queue: 1
    Cluster heartbeat timeout (net_ticktime): 60
    
    Plugins
    
    Enabled plugin file: /etc/rabbitmq/enabled_plugins
    Enabled plugins:
    
     * rabbitmq_management
     * amqp_client
     * rabbitmq_web_dispatch
     * cowboy
     * cowlib
     * rabbitmq_management_agent
    
    Data directory
    
    Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
    Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14
    
    Config files
    
     * /etc/rabbitmq/rabbitmq.config
    
    Log file(s)
    
     * /var/log/rabbitmq/rabbit@mcw14.log
     * /var/log/rabbitmq/rabbit@mcw14_upgrade.log
    
    Alarms
    
    (none)
    
    Memory
    
    Total memory used: 0.0906 gb
    Calculation strategy: rss
    Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb
    
    code: 0.0282 gb (28.15 %)
    other_proc: 0.0273 gb (27.19 %)
    allocated_unused: 0.0202 gb (20.11 %)
    other_system: 0.0124 gb (12.36 %)
    plugins: 0.006 gb (5.97 %)
    other_ets: 0.0033 gb (3.26 %)
    atom: 0.0015 gb (1.45 %)
    binary: 0.0011 gb (1.1 %)
    mgmt_db: 0.0002 gb (0.21 %)
    mnesia: 0.0001 gb (0.09 %)
    metrics: 0.0001 gb (0.07 %)
    msg_index: 0.0 gb (0.03 %)
    quorum_ets: 0.0 gb (0.01 %)
    connection_other: 0.0 gb (0.0 %)
    connection_channels: 0.0 gb (0.0 %)
    connection_readers: 0.0 gb (0.0 %)
    connection_writers: 0.0 gb (0.0 %)
    queue_procs: 0.0 gb (0.0 %)
    queue_slave_procs: 0.0 gb (0.0 %)
    quorum_queue_procs: 0.0 gb (0.0 %)
    reserved_unallocated: 0.0 gb (0.0 %)
    
    File Descriptors
    
    Total: 2, limit: 32671
    Sockets: 0, limit: 29401
    
    Free Disk Space
    
    Low free disk space watermark: 0.05 gb
    Free disk space: 16.7942 gb
    
    Totals
    
    Connection count: 0
    Queue count: 0
    Virtual host count: 1
    
    Listeners
    
    Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    Interface: [::], port: 15672, protocol: http, purpose: HTTP API
    [root@mcw14 opt]# 
    命令执行结果


    2、重启服务:
    [root@localhost ~]# service rabbitmq-server restart
    Restarting rabbitmq-server (via systemctl):
    [ 确定 ]

     


    3、关闭服务
    [root@localhost ~]# service rabbitmq-server stop


    4、重置服务
    [root@localhost ~]# service rabbitmq-server reset
    force_reset
    强制RabbitMQ node还原到最初状态.
    不同于reset , force_reset 命令会无条件地重设node,不论当前管理数据库的状态和集群配置是什么。它只能在数据库或集群配置已损坏的情况下才可使用。
    执行reset和force_reset之前,必须停止RabbitMQ application
    将RabbitMQ node还原到最初状态.包括从所在群集中删除此node,从管理数据库中删除所有配置数据,如已配置的用户和虚拟主机,以及删除所有持久化消息.

    角色及权限

    1.RabbitMQ的用户角色分类:
    none、management、policymaker、monitoring、administrator
    2.RabbitMQ各类角色描述:
    none
    不能访问 management plugin
    management
    用户可以通过AMQP做的任何事外加:
    列出自己可以通过AMQP登入的virtual hosts
    查看自己的virtual hosts中的queues, exchanges 和 bindings
    查看和关闭自己的channels 和 connections
    查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
    policymaker
    management可以做的任何事外加:
    查看、创建和删除自己的virtual hosts所属的policies和parameters
    monitoring
    management可以做的任何事外加:
    列出所有virtual hosts,包括他们不能登录的virtual hosts
    查看其他用户的connections和channels
    查看节点级别的数据如clustering和memory使用情况
    查看真正的关于所有virtual hosts的全局的统计信息
    administrator
    policymaker和monitoring可以做的任何事外加:
    创建和删除virtual hosts
    查看、创建和删除users
    查看创建和删除permissions
    关闭其他用户的connections
    3.创建用户并设置角色
    可以创建管理员用户,负责整个MQ的运维;可以创建RabbitMQ监控用户,负责整个MQ的监控;可以创建某个项目的专用用户,只能访问项目自己的virtual hosts

     

    常用命令

    服务管理

    启动: service rabbitmq-server start 或 rabbitmq-service start
    关闭: service rabbitmq-server stop 或 rabbitmq-service stop
    重启: service rabbitmq-server restart
    状态: rabbitmqctl status

     

    [root@mcw14 ~]# rabbitmq-service stop
    -bash: rabbitmq-service: command not found
    [root@mcw14 ~]# service rabbitmq-server restart
    Redirecting to /bin/systemctl restart rabbitmq-server.service
    [root@mcw14 ~]# rabbitmqctl status
    Status of node rabbit@mcw14 ...
    Runtime
    
    OS PID: 81996
    OS: Linux
    Uptime (seconds): 28
    Is under maintenance?: false
    RabbitMQ version: 3.8.16
    Node name: rabbit@mcw14
    Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
    Erlang processes: 367 used, 1048576 limit
    Scheduler run queue: 1
    Cluster heartbeat timeout (net_ticktime): 60
    
    Plugins
    
    Enabled plugin file: /etc/rabbitmq/enabled_plugins
    Enabled plugins:
    
     * rabbitmq_management
     * amqp_client
     * rabbitmq_web_dispatch
     * cowboy
     * cowlib
     * rabbitmq_management_agent
    
    Data directory
    
    Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
    Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14
    
    Config files
    
     * /etc/rabbitmq/rabbitmq.config
    
    Log file(s)
    
     * /var/log/rabbitmq/rabbit@mcw14.log
     * /var/log/rabbitmq/rabbit@mcw14_upgrade.log
    
    Alarms
    
    (none)
    
    Memory
    
    Total memory used: 0.1046 gb
    Calculation strategy: rss
    Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb
    
    other_proc: 0.0357 gb (34.15 %)
    code: 0.0282 gb (27.01 %)
    other_system: 0.0124 gb (11.84 %)
    allocated_unused: 0.0105 gb (10.05 %)
    reserved_unallocated: 0.0069 gb (6.62 %)
    plugins: 0.0055 gb (5.27 %)
    other_ets: 0.0033 gb (3.12 %)
    atom: 0.0015 gb (1.4 %)
    binary: 0.0002 gb (0.22 %)
    mgmt_db: 0.0002 gb (0.15 %)
    mnesia: 0.0001 gb (0.09 %)
    metrics: 0.0001 gb (0.06 %)
    msg_index: 0.0 gb (0.03 %)
    quorum_ets: 0.0 gb (0.01 %)
    connection_other: 0.0 gb (0.0 %)
    connection_channels: 0.0 gb (0.0 %)
    connection_readers: 0.0 gb (0.0 %)
    connection_writers: 0.0 gb (0.0 %)
    queue_procs: 0.0 gb (0.0 %)
    queue_slave_procs: 0.0 gb (0.0 %)
    quorum_queue_procs: 0.0 gb (0.0 %)
    
    File Descriptors
    
    Total: 2, limit: 32671
    Sockets: 0, limit: 29401
    
    Free Disk Space
    
    Low free disk space watermark: 0.05 gb
    Free disk space: 16.5811 gb
    
    Totals
    
    Connection count: 0
    Queue count: 0
    Virtual host count: 1
    
    Listeners
    
    Interface: [::], port: 15672, protocol: http, purpose: HTTP API
    Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    [root@mcw14 ~]# 
    rabbitmqctl status

    或者systemctl管理

     

    用户管理

    新增账号: rabbitmqctl add_user username password
    删除用户: rabbitmqctl delete_user username
    所有用户: rabbitmqctl list_users
    修改密码: rabbitmqctl change_password username newpassword
    清除密码: rabbitmqctl clear_password {userName}

    admin登录

    创建用户和查看用户:

     rabbitmqctl add_user machangwei  123456

     页面上面也可以查看到这个用户

     刚刚创建的用户没有授权,不是管理用户,上面两个账号是管理用户,所以可以登录,好像是授权管理用户才能登录管理页面

     根据下面的角色管理,给账号赋予administrator角色后,就i你登录到管理页面了

    修改machangwei密码:

    rabbitmqctl change_password machangwei 12

     再看已经登录的页面,提示登录失败

     刷新页面就这样了

     使用新密码登录

     接下来演示清除密码

     清除密码之后

     页面又退出登录了

     admin用户登录查看,machangwei账号是没有密码了的

    重新加上密码,直接修改密码就可以

     清除了密码,跟你查看用户没有关系,照样是查到的

     清除密码必须接账号

    角色管理

    用户角色分为5中类型:
      none:无任何角色。新创建的用户的角色默认为 none。
      management:可以访问web管理页面。
      policymaker: 包含managerment所有权限,并且可以管理策略(Policy)和参数(Parameter)
      monitoring: 包含management所有权限,并且可以看到所有链接、信道及节点相关的信息
      administartor:包含monitoring所有权限,并且可以管理用户、虚拟机、权限、策略、参数等。(最高权限)
    设置用户角色: rabbitmqctl set_user_tags zhaojigang administrator
    设置多个角色: rabbitmqctl set_user_tags hncscwc monitoring policymaker
    查看用户角色: rabbitmqctl list_users

     

    设置上面创建的machangwei为管理用户

    页面上查看machangwei账号拥有的角色。

     继续执行命令,设置machangwei账号有多个角色

     页面上查看,可以看到,它这个是设置标签,每次都是重新设置,会将之前的角色清空,换成本次设置的角色,这里不是新增,这点需要注意

     我们再加上管理角色,

     这下machangwei账号有登录页面的权限了,查看页面,显示是machangwei登录。

    命令行查看用户,也可以看到用户有哪些角色

     Vhost管理

    所有虚拟主机: rabbitmqctl list_vhosts
    添加虚拟主机: rabbitmqctl add_vhost vhostname
    删除虚拟主机: rabbitmqctl delete_vhost vhostname

     

    查看虚拟主机

     页面上面查看下虚拟机主机

     添加一个虚拟主机

     可以看到刚刚创建的虚拟主机跟账号没有绑定,名字也能看到页面和命令行看到的一样

     

     

    权限管理

    命令格式如下:rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
    查询所有权限:rabbitmqctl list_permissions  [-p  VHostPath]
    查看用户权限:rabbitmqctl list_user_permissions username
    清除用户权限:rabbitmqctl clear_permissions [-p VHostPath] username

     

     rabbitmqctl set_permissions   -p mcw_vhost1  machangwei   /etc/rabbitmq/rabbitmq.config  read     [-p vhost] {user} {conf} {write} {read}

    rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
    # 表示设置用户权限。 {vhost} 表示待授权用户访问的vhost名称,默认为 "/"; {user} 表示待授权反问特定vhost的用户名称; {conf}表示待授权用户的配置权限,是一个匹配资源名称的正则表达式; {write} 表示待授权用户的写权限,是一个匹配资源名称的正则表达式; {read}表示待授权用户的读权限,是一个资源名称的正则表达式。
    # rabbitmqctl set_permissions -p /  admin "^mip-.*" ".*" ".*"
    # 例如上面例子,表示授权给用户 "admin" 具有所有资源名称以 "mip-" 开头的 配置权限;所有资源的写权限和读权限。

     

     
    设置相关 vhost 权限案例
    rabbitmqctl add_vhost /myhost # 添加 vhost
    rabbitmqctl add_user me me123 # 设置用户和密码
    rabbitmqctl set_permissions -p /myhost me ".*" ".*" ".*" # vhost 设置权限

    下面查看所有用户权限,查看用户权限:

     查看,me这个没有添加角色,列出权限的时候没有看到它,指定vhost的列出权限,就看到me了。也就是/myhost目前只有这个用户。而权限,是分为对vhost的配置权限,写权限,读权限。配置权限那里,可以用以什么开头的权限,这种来通配,就像之前在redis作为celery的bakend一样,写入的数据是以指定字符串开头,这样来区分消息,这里,不清楚是不是这样的作用。

     页面显示如下

     清除用户的vhost权限

     没有了

     

    其它几个页面

     

     

     这里有vhost

     也可以页面上添加用户

     也可以页面上添加vhost

     

    查看插件

    rabbitmq-plugins list

    如下,之前开启了管理页面的插件,才能通过页面访问rabbitmq,现在可以看到,前面是有标记的,也就是没有标记的应该就是没有开通,而且标记不同,大写的标记,应该是插件的服务端程序吧。

    复制代码
    
    [root@mcw14 ~]# rabbitmq-plugins list
    Listing plugins with pattern ".*" ...
     Configured: E = explicitly enabled; e = implicitly enabled
     | Status: * = running on rabbit@mcw14
     |/
    [  ] rabbitmq_amqp1_0                  3.8.16
    [  ] rabbitmq_auth_backend_cache       3.8.16
    [  ] rabbitmq_auth_backend_http        3.8.16
    [  ] rabbitmq_auth_backend_ldap        3.8.16
    [  ] rabbitmq_auth_backend_oauth2      3.8.16
    [  ] rabbitmq_auth_mechanism_ssl       3.8.16
    [  ] rabbitmq_consistent_hash_exchange 3.8.16
    [  ] rabbitmq_event_exchange           3.8.16
    [  ] rabbitmq_federation               3.8.16
    [  ] rabbitmq_federation_management    3.8.16
    [  ] rabbitmq_jms_topic_exchange       3.8.16
    [E*] rabbitmq_management               3.8.16
    [e*] rabbitmq_management_agent         3.8.16
    [  ] rabbitmq_mqtt                     3.8.16
    [  ] rabbitmq_peer_discovery_aws       3.8.16
    [  ] rabbitmq_peer_discovery_common    3.8.16
    [  ] rabbitmq_peer_discovery_consul    3.8.16
    [  ] rabbitmq_peer_discovery_etcd      3.8.16
    [  ] rabbitmq_peer_discovery_k8s       3.8.16
    [  ] rabbitmq_prometheus               3.8.16
    [  ] rabbitmq_random_exchange          3.8.16
    [  ] rabbitmq_recent_history_exchange  3.8.16
    [  ] rabbitmq_sharding                 3.8.16
    [  ] rabbitmq_shovel                   3.8.16
    [  ] rabbitmq_shovel_management        3.8.16
    [  ] rabbitmq_stomp                    3.8.16
    [  ] rabbitmq_top                      3.8.16
    [  ] rabbitmq_tracing                  3.8.16
    [  ] rabbitmq_trust_store              3.8.16
    [e*] rabbitmq_web_dispatch             3.8.16
    [  ] rabbitmq_web_mqtt                 3.8.16
    [  ] rabbitmq_web_mqtt_examples        3.8.16
    [  ] rabbitmq_web_stomp                3.8.16
    [  ] rabbitmq_web_stomp_examples       3.8.16
    [root@mcw14 ~]# 
    复制代码

     

     监控管理器

    这个就是前面开启和关闭rabbitmq的管理页面的命令

    rabbitmq-plugins enable rabbitmq_management #启动
    rabbitmq-plugins disable rabbitmq_management #关闭

     

    应用管理

    慎用,好像是开启和停止节点用的

     

    关闭应用:rabbitmqctl stop_app
    启动应用:rabbitmqctl start_app

     

    stop之后,就不能访问页面了,根据命令返回打印信息,好像是停止的这个节点

     stop之后,程序进程还是在的,再次start一下,又能正常访问页面了。我这里只有一个node,如果有多个node,停止一个,应该是可以在页面访问看到情况的

     

    队列管理

    查看所有队列:rabbitmqctl list_queues
    清除所有队列:rabbitmqctl reset #需要先执行rabbitmqctl stop_app
    强制清除队列:rabbitmqctl force_reset

    删除单个队列:rabbitmqctl delete_queue queue_name 

     

    目前没有队列

     添加之后,访问拒绝

     参考:https://blog.csdn.net/hefeng_aspnet/article/details/125865990

     查看权限

     有正则匹配的配置

     点击设置权限,把以mip-开头的配置,给重新设置了,为有所有权限

     再次添加队列

     成功添加队列,点击进入队列页面

     这个消息下面,我们看到了之前添加的三个键值对。

     再次查询消息队列,可以看到我们刚刚创建的队列,但是消息,显示的是0,不知道是为什么

     这里有个push消息,我们试一试

     

     消息被推送了

     之前添加的参数,不是消息,我们上面点击发布消息,才是真的往这个队列里面添加了消息,可以看到多了条消息,persistent应该是是否持久化的意思吧,而memory是指内存里面有条消息,但是没有持久化存储呢吧,持久化存储为0,总共一条消息

     此时再去Linux上查看,可以看到消息队列mcwcountage已经多了一条消息了。

     再次发布一条消息

     可以看到,两条消息了

     再次查看,两条消息了

     点击get消息,可以看到我们刚刚推送的消息

     两条消息都get一下。如果填的消息数据,大于实际 有的消息数量,那么只显示已存在的消息

     点击之后,消息被清空了

     再次push两个消息

     移除消息,没有安装插件,那么安装一下

     显示开启了两个插件在这个节点

     可以看到,这两个插件已经开启了

     刚刚的操作没有截图,我们将多出来的队列1删除掉

     看上面,可以知道此时只有一个队列。我们进入之后,点击移动,这时候会新建mcwqueue2队列,并把mcwcountage下的所有消息移动到新的队列,有点像是备份似的,不清楚,如果下面移动时,填写的是已有的队列名称,是不是只是新增到已有的队列里面,大概率是这样子吧。

     点击移动消息,跳转到展示所有消息队列的页面,可以看到新增的队列,并且两条 消息已经从旧的mcwcountage队列移走,移动到新的队列mcwqueue2里面去了。

     我们再测试一下删除,这里的删除好像是删除队列

     

     可以看到,进入队列的详情页面,点击删除,的确把该队列删除掉了,如下,已经看不到mcwqueue2队列了

    测试重启消息队列服务,对没有持久化消息的影响

     我们push两个消息进入消息队列

     我们重启之后,可以看到消息丢失。那么重启之后是否需要将消息持久化呢,怎么做持久化呢,持久化后是否上图中persistent就是2了呢

     消息如何持久化呢?

    下面看下有两条消息

     

    [root@controller ~]# rabbitmqctl  list_queues
    Listing queues ...
    q-agent-notifier-security_group-update_fanout_0134e1dc992d4f3291571f4f1150f033    0
    q-l3-plugin_fanout_67ad4da1e291429b9a561a0b03734552    0
    q-plugin    0
    conductor_fanout_e2d9de48537d4ec09edb946f2fdb5008    0
    q-agent-notifier-l2population-update_fanout_48739ab147b04dc895fe2fd340aa7eae    0
    q-agent-notifier-network-delete_fanout_ed0e5abc181c4fe0bb4a802ea928a2e8    0
    reply_20ac7e9ae5a349aea2ea680a5aec7d79    0
    consoleauth.controller    0
    q-agent-notifier-network-delete.controller    0
    q-plugin.controller    0
    compute_fanout_9f8cf7456e73451e8ce7d65460849817    0
    reply_2455b8e5c5dc40da8650f56ae4e45174    0
    q-agent-notifier-l2population-update.controller    0
    cinder-volume_fanout_8dc268f1cf7d46d8bbfe4a521da8b202    0
    cinder-volume.controller@lvm_fanout_b606cbf8a67c4600879b2a64a5320db4    0
    q-agent-notifier-network-update    0
    q-agent-notifier-port-update.controller    0
    conductor_fanout_2b33be46a0d448868f6dc32678a21500    0
    q-agent-notifier-port-update_fanout_2b5d1a5c039e4e2caddea9ed34185fe8    0
    q-agent-notifier-port-update_fanout_ebc99b8c158c48529e653e9ceb2f7760    0
    consoleauth_fanout_4964cf77ca7944f8a8d85fe555703a0c    0
    q-server-resource-versions_fanout_753834a40abd430998a0ba30955f1122    0
    cinder-volume.compute2@nfs_fanout_2868e267441e4e528f4d300f33282d19    0
    conductor.controller    0
    q-agent-notifier-l2population-update_fanout_9cae8c6abdc84202bc35ebbee373b422    0
    reply_c9910c029ba1432c98ae4c3576ef7df6    0
    conductor    0
    q-agent-notifier-network-update.controller    0
    scheduler_fanout_5eea12b3d0114d58ac0d10fc3799f0d1    0
    q-agent-notifier-security_group-update_fanout_1452a407fcd54a5ab47285254b86a8d3    0
    q-agent-notifier-security_group-update.compute1    0
    reply_52a5090b02f5436c9e7aa541e732ef52    0
    cinder-volume    0
    q-agent-notifier-network-update_fanout_a4a9e02adfc748fd9a8835996e802bfa    0
    cinder-volume_fanout_75d1c7f92a394bd088b7f65e121d6f9a    0
    q-server-resource-versions    0
    neutron-vo-Trunk-1.1_fanout_138759917974485faad26891827e7447    0
    compute.compute2    0
    neutron-vo-Trunk-1.1_fanout_676122d00eb34a59be324d0604c9111c    0
    scheduler.controller    0
    neutron-vo-SubPort-1.0_fanout_e49ddc715b1943389e7e0ddd6dfb5032    0
    cinder-volume.compute2@nfs.compute2    0
    dhcp_agent.controller    0
    neutron-vo-Trunk-1.1.controller    0
    q-agent-notifier-port-update.compute1    0
    neutron-vo-SubPort-1.0.compute2    0
    dhcp_agent_fanout_54b2d45e5d0d490f8d62283a9abcb0b8    0
    cinder-volume.compute2@nfs    0
    q-reports-plugin.controller    0
    neutron-vo-SubPort-1.0_fanout_496e5f4cee6445af8a1edcb5c312abe6    0
    q-reports-plugin_fanout_d4e460c745f445c8a22c39cbc15b9f92    0
    q-agent-notifier-network-update.compute1    0
    cinder-scheduler    0
    consoleauth    0
    q-reports-plugin_fanout_89d519d055464103a1ec5029a36390a8    0
    dhcp_agent    0
    q-agent-notifier-security_group-update.compute2    0
    q-agent-notifier-l2population-update.compute1    0
    q-agent-notifier-network-delete_fanout_d518f4f111774991971c7cdc5302fb0d    0
    q-reports-plugin    0
    q-agent-notifier-network-delete_fanout_4368adffe18e4742a12e2c4559b07985    0
    neutron-vo-SubPort-1.0_fanout_2a542add26d8456396efe5ef3cb4c23f    0
    cinder-volume.controller@lvm.controller    0
    q-agent-notifier-l2population-update_fanout_5e726a5d09f942e1afb8b02d89cdf9fe    0
    q-agent-notifier-network-update.compute2    0
    reply_f3414e08cc93430a914663877436085d    0
    cinder-volume.controller@lvm    0
    scheduler    0
    neutron-vo-SubPort-1.0    0
    q-l3-plugin.controller    0
    neutron-vo-Trunk-1.1.compute1    0
    q-agent-notifier-l2population-update.compute2    0
    neutron-vo-SubPort-1.0.controller    0
    q-agent-notifier-port-update.compute2    0
    reply_4961a61a307a428fb0b03043c7654f7d    0
    q-agent-notifier-security_group-update.controller    0
    q-agent-notifier-security_group-update    0
    compute_fanout_b980ee35b47c47688d4fda1a1eb9fe34    0
    cinder-scheduler_fanout_3b662fb1565b4a9f8c1224380cf8c8da    0
    neutron-vo-Trunk-1.1    0
    compute    0
    q-agent-notifier-network-delete.compute1    0
    cinder-scheduler.controller    0
    compute.compute1    0
    q-l3-plugin_fanout_31c332c953be4c9194085590c844f934    0
    neutron-vo-Trunk-1.1_fanout_a8dca45b75d8436a9ba44f8832d04d58    0
    q-agent-notifier-l2population-update    0
    q-agent-notifier-network-update_fanout_84a4aa333c7e435482fc64b04d120051    0
    q-server-resource-versions.controller    0
    neutron-vo-SubPort-1.0.compute1    0
    q-agent-notifier-port-update    0
    q-agent-notifier-security_group-update_fanout_49d673aeec024d8fa36a49523cf335fd    0
    q-agent-notifier-network-delete.compute2    0
    q-agent-notifier-port-update_fanout_e3f806abc57744b290540a21aba6626f    0
    q-l3-plugin    0
    reply_80e9590155d64a2480314f86613ea532    0
    neutron-vo-Trunk-1.1.compute2    0
    q-agent-notifier-network-delete    0
    q-agent-notifier-network-update_fanout_c60c9bcfa3eb40c7b5c025980bece3f5    0
    q-plugin_fanout_a07db20790814d52ba25d914e01fe82f    0
    [root@controller ~]# 
    查看openstack中有的消息队列

     

    集群管理

    查看集群状态: rabbitmqctl cluster_status
    摘除节点: rabbitmqctl forget_cluster_node [--offline]
    组成集群命令: rabbitmqctl join_cluster [--ram]
    修改节点存储形式: rabbitmqctl change_cluster_node_type disc | ram
    修改节点名称: rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2] [oldnode3 newnode3...]

     

    [root@mcw14 ~]# rabbitmqctl cluster_status
    Cluster status of node rabbit@mcw14 ...
    Basics
    
    Cluster name: rabbit@mcw14
    
    Disk Nodes
    
    rabbit@mcw14
    
    Running Nodes
    
    rabbit@mcw14
    
    Versions
    
    rabbit@mcw14: RabbitMQ 3.8.16 on Erlang 23
    
    Maintenance status
    
    Node: rabbit@mcw14, status: not under maintenance
    
    Alarms
    
    (none)
    
    Network Partitions
    
    (none)
    
    Listeners
    
    Node: rabbit@mcw14, interface: [::], port: 15672, protocol: http, purpose: HTTP API
    Node: rabbit@mcw14, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Node: rabbit@mcw14, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    
    Feature flags
    
    Flag: drop_unroutable_metric, state: disabled
    Flag: empty_basic_get_metric, state: disabled
    Flag: implicit_default_bindings, state: enabled
    Flag: maintenance_mode_status, state: enabled
    Flag: quorum_queue, state: enabled
    Flag: user_limits, state: enabled
    Flag: virtual_host_metadata, state: enabled
    [root@mcw14 ~]# 
    rabbitmqctl cluster_status

    看下我们部署的openstack,它的集群状态是什么样的,也就一个节点,跟上面命令访问结果不一样,那么上面可能是没有形成集群,只是一个单节点。

     加入集群失败

     

     

     

     

    信息查看

    rabbitmqadmin list connections #查看所有连接
    rabbitmqadmin show overview #概览 Overview
    rabbitmqadmin list nodes #查看所有节点 Node
    rabbitmqadmin list channels #查看所有通道 Channel
    rabbitmqadmin list consumers #查看所有消费者 Consumer
    rabbitmqadmin list exchanges #查看所有路由 Exchange
    rabbitmqadmin list bindings #查看所有路由与队列的关系绑定 Binding

     
    找到这个命令,然后加个软连接或者复制一份。软连接的话,也是要将源文件添加执行权限的,不然无法执行,没有权限。

     这样就可以了,之前节点被关闭了。开启节点后,然后查看有哪些节点,可以正常看到

     查看所有连接

     概览

     查看所有节点

     查看所有通道 

     查看所有消费者

     查看所有路由

     查看所有路由与队列的关系绑定

     

    python操作rabbitmq

    RabbitMq生产者消费者模型

    生产者(producter) 队列消息的产生者,复制生产消息,并将消息传入队列
    生产者代码:

    复制代码
    import pika
    import json
    
    credentials = pika.PlainCredentials('admin','admin')#mq用户名和密码,用于认证
    #虚拟队列需要指定参数virtual_host,如果是默认的可以不填
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.24',port=5672,virtual_host='/',credentials=credentials))
    channel = connection.channel()# 创建一个AMQP信道
    
    #声明队列,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True
    channel.queue_declare(queue='1',durable=True)
    for i in range(10):   # 创建10个q
        message = json.dumps({'OrderId':"1000%s"%i})
        # exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容
        channel.basic_publish(exchange='',routing_key='1',body=message)
        print(message)
    connection.close()
    复制代码
    操作前

    通过pika生命一个认证用的凭证,然后用pika创建rabbitmq的块连接,再用上面的连接创建一个AMQP信道 。创建消息队列的连接时,需要指定ip,断开,虚拟主机,凭证。

    然后根据上面的信道,声明一个队列,

    我们可以看到,下面信道点队列声明里的queue参数值就队列的名字。这里是遍历0到9,然后打印了下消息,这里的生成的消息,是json序列化后的数据。然后将数据作为i,信道点基础发布的body参数的值。上面信道点队列声明是创建一个队列,队列名字是’1‘,下面我们用信道点基本发布,是将我们创建的消息体发送到队列中,路由_key就是指定队列名称,指定发布消息到哪个队列,消息是作为body的参数,

    最后,需要将这个消息队列的连接关闭。

     我们通过页面可以看到,已经创建好了这个队列,队列名字为1,并且已经通过遍历生成的10个消息,调用十次信道点基础发布方法,将这十个产生的消息发布到消息队列中

     

     我们可以再看下,可以看到我们创建的消息的具体内容。

     消费者(consumer):队列消息的接收者,扶着接收并处理消息队列中的消息

    复制代码
    import pika
    credentials = pika.PlainCredentials('admin','admin')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.0.0.24',
        port=5672,
        virtual_host='/',
        credentials=credentials
    ))
    channel = connection.channel()
    #声明消息队列,消息在这个队列中传递,如果不存在,则创建队列
    channel.queue_declare(queue='1',durable=True)
    # 定义一个回调函数来处理消息队列中消息,这里是打印出来
    def callback(ch,method,properties,body):
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(body.decode())
    #告诉rabbitmq,用callback来接收消息
    channel.basic_consume('1',callback)
    #开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()
    复制代码

    获取消息,创建凭证,连接,信道,然后什么一下队列。指定我们要获取哪个队列中的消息,如果没有这个队列,就会创建这个队列,存在,那么后面使用这个信道,就会从这个队列中获取数据。信道是通过rabbitmq的连接对象来生成的,连接对象中放了连接用的凭证。所以,信道点基础消费方法,指定是哪个消息队列,那么就会从这个队列中获取消息。然后传参回调函数。而回调函数中,

     我们可以看到,基础消费方法里面有消息回调,就是上面我们自定义的回调函数

     这个方法定义了回调函数的写法。第一个参数是信道

     第二个参数是方法,第三个参数是属性,第四个是body,这些不用管,只需要按如下格式,就可以从body,做个解码,就将信道点基础消费中指定的队列中的消息,取出来了,我们是用回调函数来接收消息,当需要获取消息的时候,就需要执行信道点开始消费的方法。这里好像是遍历队列一个一个的将消息获取出来。那么怎样实现,实时监听消息,实时消费呢

     

    RabbitMq持久化

     

    RabbitMq持久化
    MQ默认建立的临时的queue和exchange,如果不声明持久化,一旦rabbitmq挂掉,queue,exchange将会全部丢失,所以我们一般在创建queue或者exchange的时候会声明持久化
    1.queue声明持久化

    # 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
    result = channel.queue_declare(queue = 'python-test',durable = True)

    使用True

     重启消息队列服务

    消息队列还在,但是消息被清空了

     当我改为false的时候,因为队列1已经存在,并且是Tue声明的,所以这里就报错了

     我们设置为false,然后声明一个不存在的队列2

     创建好了队列,并且10个消息

     重启一下消息队列服务

     刚刚上面创建的队列2已经不存在,这已经不是消息被清空了,而是队列直接被清除了

     也就是这个Ture,是保留队列用的,持久化队列的。

    channel.queue_declare(queue='2',durable=True)

    2、exchange声明持久化

    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test', durable = True)

    注意:如果已存在一个非持久化的queue或exchange,执行上述代码会报错,因为当前状态不能更该queue 或 exchange存储属性,需要删除重建,如果queue和exchange中一个声明了持久化,另一个没有声明持久化,则不允许绑定

     

    我们在1处改了,但是在2处没有修改。结果有问题。

     队列2不存在,所以没有将消息放进去

     而exchange这里,没有写将消息推送到声明的python-test里面,所以里面也没有消息

     这次是声明的exchange,并且将消息推送到python-test里面

     还是没有看到有东西呀

     我们这里发布个消息,可以看到,是需要路由的

     加上路由,再次执行程序

     由于队列2 不存在,好像还是不行

     我在这里给它bind一个路由

     感觉还是没有弄明白,先放弃了

    原来是如下方式呀。

    首先,在python-test2里面,

     给exchange绑定队列1和2

     1和2目前的消息数量

     我往路由1里面push一个消息

     push成功

     然后再看队列1里面,可以看到多了一条刚刚push的消息

     接下来用程序实现,声明exchange,然后发布方法不变,发布到exchage中,因为已经绑定了两个路由了,这里指定路由key,根据路由key,可以将消息push到对应的队列中去

     我们可以看到,之前是页面点击push了一条,上面程序push了十条到exchange,现在这个队列就有11条数据。可是这个exchange和队列的绑定,是我自己在页面上绑定的,这个应该不合理。以后有时间看下,怎么用程序绑定。

     我们可以看到,应该是程序中缺少使用这个绑定方法吧

     

    3、消息持久化
    虽然exchange和queue都声明了持久化,但如果消息只存在内存里,rabbitmq重启后,内存里的东西还是会丢失,所以必须声明消息也是持久化,从内存转存到到硬盘

    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))

    4、acknowledgement消息不丢失
    消费者(consume)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息会丢失,但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。

    channel.basic_consume(callback,queue = 'python-test',
    # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
                 no_ack = False)

     



     

     
     
     
     
     
     
     


    参考链接:https://www.jianshu.com/p/cc322adf060c
    https://blog.csdn.net/weixin_45144837/article/details/104335115

     https://blog.csdn.net/weixin_45144837/article/details/104335115

  • 相关阅读:
    git自动pull同步远程若干分支与本地若干分支
    C# 委托
    Android Studio展示Activty生命周期
    10.网络编程套接字Socket
    计算机毕业设计选什么题目好?springboot 医院门诊在线预约挂号系统
    Jenkins自动化部署
    【畅购商城】订单模块之收货人列表
    人工神经网络理论设计及应用课后题答案韩力群
    Java自定义注解以及Spring的AOP详解,通过AOP和自定义注解实现日志记录
    数据结构(二叉树)——Java实现
  • 原文地址:https://www.cnblogs.com/machangwei-8/p/17471499.html