• RabbitMQ入门与进阶实战


    1 分布式消息队列认知提升

    1.1 MQ的应用场景与MQ性能衡量指标

    1.1.1 分布式消息队列MQ应用场景:

    解耦; 服务解耦

    消峰; 消峰填谷:流量高峰和低谷均衡;

    异步:   异步化缓冲; (最终一致性)

    1.1.2 应用思考点

    1、生产端可靠性投递:消息不能丢失;

    2、消费端幂等:避免消息消费多次;

    3、高可用

    4、低延迟

    5、可靠性(一般都是副本方式)

    6、堆积能力

    7、扩展性(如简单的扩容)

    1.2 主流的分布式消息队列MQ:

    ActiveMQ :  古老,强大,Apach

    RabbitMQ:

    RocketMQ: 阿里巴巴---->Apach

    Kafka:    高吞吐,海量数据的存储;

    1.2.1 技术选型:

    (集群架构模式需要考虑的几点: 分布式、可扩展、高可用、可维护性)

    ActiveMQ: 适合中小型的传统行业,不适合高并发业务;

    RabbitMQ:   横向扩展能力不好,可用性和可维护性很棒;

    Kafka和RocketMQ: 可拓展性很强,具备高可用性,但是可维护性比较麻烦一点;

    (Kafka可以在廉价服务器上有着非常高和吞吐性能,如果对消息可靠性要求不是很高,可以考虑使用Kafka,Kafka也可以保证消息一条也不丢失,但是对性能影响比较大。)

    1.3 ActiveMQ集群架构与原理解析

    1.3.1 初始JMS与其专业术语

            现在我们和大家了解一下古老而又神秘的消息中间件"ActiveMQ"。首先,说起ActiveMQ,就必须先聊聊JMS(Java Message Service)规范,也就是Java消息服务,它定义了java中访问消息中间件的接口规范。在这里注意哦,JMS只是接口,并没有给予实现,实现JMS接口的消息中间件为"JMS Provider",目前知名的开源MOM(Message Oriented Middleware,也就是消息中间件)系统包括Apache的ActiveMQ、RocketMQ、Kafka,以及RabbitMQ,可以说它们都是"基本遵循"或"参考"JMS规范,都有自己的特点和优势。

    1.3.2 专业术语

    • JMS(Java Message Service):实现JMS接口的消息中间件;
    • Provider(MessageProvider):消息的生产者;
    • Consumer(MessageConsumer):消息的消费者;
    • PTP(Point to Point):即点对点的消息模型,这也是非常经典的模型;
    • Pub/Sub(Publish/Subscribe):即发布/订阅的消息模型;
    • Queue:队列目标,也就是我们常说的消息队列,一般都是会真正的进行物理存储;
    • Topic:主题目标;
    • ConnectionFactory:连接工厂,JMS用它创建连接;
    • Connection:JMS客户端到JMS Provider的连接;
    • Destination:消息的目的地;
    • Session:会话,一个发送或接收消息的线程。

    1.3.3 JMS消息格式定义

    • StreamMessage原始值得数据流
    • MapMessage一套名称/值对
    • TextMessage一个字符串对象
    • BytesMessage一个未解释字节的数据流
    • ObjectMessage一个序列化的java对象

    1.4 了解ActiveMQ

            ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在早些年的"J2EE应用"时期扮演着特殊的地位,可以说那个年代ActiveMQ在业界应用最广泛,当然如果现在想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断地升级版本,不断地提升性能和架构设计的重构。

            就算现在我们80%以上的业务我们是用ActiveMQ已经足够满足需求,其丰富的API、多种集群构建模式使得它成为业界老牌消息中间件,在中小型企业中应用广泛。

            当然如果你想针对大规模、高并发应用服务做消息中间件技术选型,譬如淘宝、京东这种大型的电商网站,尤其是双11这种特殊时期,ActiveMQ可能就显得力不从心了,当然我们这里后续还会和大家介绍其他非常优秀的MOM。

    1.5 消息投递模式

            我们首先要了解JMS规范里最经典的两种消息投递模式。即"点对点"与"发布订阅"。

    • 点对点:生产者向队列投递一条消息,只有一个消费者能够监听到这条消息(PTP),下图所示:

    •  发布订阅:生产者向队列投递一条消息,所有监听该队列的消费者都能得到这条消息(Pub/Sub)

     1.6 ActiveMQ各项指标

    衡量一个MOM,我们主要从三方面考虑即可,即服务性能、存储堆积能力、可扩展性。

    • 服务性能

            ActiveMQ的性能一般,在早期传统行业为王的时代还是比较流行的,但现如今面对高并发、大数据的业务场景,往往力不从心!

    • 数据存储

            默认采用kahadb(索引文件形式存储),也可以使用高性能的google level db(内存数据库存储)。

    • 集群架构

            ActiveMQ可以与zookeeper进行构建 主备集群 模型,并且多套的主备模型直接采用Network的方式构建分布式集群。

    1.7 ActiveMQ集群架构模式

            ActiveMQ最经典的两种集群架构模式,Master-Slave、Network集群模式!

    • Master-Slave

     Master-Slave:顾名思义,就是主从方式,当然这里要理解为主备的方式,也就是双机热备机制;Master-Slave背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。Master-Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。

    • 架构思考:Master-Slave集群模型的关键点:

            上图(Master-Slave)绿色为主节点,灰色的则为备份节点,这两个节点都是运行状态。

            zookeeper的作用就是为了当绿色的主节点宕机时,进行及时切换到备份的灰色节点上去,使其进行主从角色的互换,用于实现高可用性的方案。

            Master-Slave集群模型的缺点也显而易见,就是不能做到分布式的topic、queue,当消息量巨大时,我们的MQ集群压力过大,没办法满足分布式的需求。

    • Network

    •  Network:这里可以理解为网络通信方式,也可以说叫Network of brokers。这种方式真正解决了分布式消息存储和故障转移、roker切换的问题。可以理解消息会进行均衡;从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的topics和queues。一个broker会相同对待所有的订阅(subscription):不管它们是来自本地的客户端连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。
    • 架构思考:Network集群模型的关键点:

            首先,这种方案需要两套或多套(Master-Slave)的集群模型才可以搞定,部署非常麻烦,需要两套或多套集群直接相互交叉配置,相互间能够感知到彼此的存在。下面我给出一段XML配置,简单来说就是在ActiveMQ的配置文件里要进行多套(Master-Slave)之间的networkConnector配置工作:


         
             
         

         
                           uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
         

            其次,Network虽然解决了分布式消息队列这个难题,但是还有很多潜在的问题,最典型的就是资源浪费问题,并且也可能达不到所预期的效果;通常采用Master-Slave模型是传统型互联网公司的首选,作为互联网公司往往会选择开箱即用的消息中间件,从运维、部署、使用各个方面都要优于ActiveMQ,当然ActiveMQ毕竟是"老牌传统强Q",Apache的顶级项目之一,目前正在进行新版本的重构(对于5.X版本)与落地,下一代"Artemis代理",也可以理解为"6.X";官网如下:https://activemq.apache.org/

    1.8 RabbitMQ集群架构
    1、主备模式:热备份(master-slave)
    master 对外提供读写,salve作为一个备份,当出现异常的时候,master宕机,做一个切换,slave节点切换为master节点
    2、远程模式:数据异地容灾,提升性能,单个节点处理不过来,可以转移到下游的集群中(架构简单、配置复杂)
    3、镜像模式:业界使用最广泛(消息可靠传递)
    4、多活模式:异地容灾、数据转储,类似于远程模式       

    1.8.1  主备模式

            warren(兔子窝),一个主、备方案(主节点如果挂掉,从系欸但提供服务,和ActiveMQ利用Zookeeper做主/备一样)

    主备模式-HaProxy配置

     主备模式-HaProxy配置

    listen  rabbitmq_cluster

    bind  0.0.0.0:5672    #配置TCP模式

    mode  tcp    #简单的轮询

    balance  roundrobin    #主节点

    server  bhz76  192.168.11.76:5672  check  inter  5000 rise  2  fall 2

    server  bhz77  192.168.11.77:5672  backup  check  5000 rise  2  fall 2    #备用节点

    1.8.2  远程模式

            远程通信和复制,可以实现双活的一种模式,简称Shovel模式

            所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,可以跨地域的让两个mq集群互联。

     

     

     

    1.8.3  镜像模式

            集群模式非常经典的就是Mirror镜像模式,保证百分之100数据据不丢失。

            在实际工作中用的最多,并且实现集群非常简单,一般互联网大厂都会构建这种镜像集群模式。

     

     

    1.8.4  多活模式

     

     

     

     

     

     

    1.9 Kafka高性能核心pageCache与zeroCopy原理解析

    1.9.1 Kafka有哪些特点?

    特点:

    1、分布式,支持消息分区

    2、跨平台,java、python,php

    3、实时性很好

    4、伸缩性(上亿)

    1.9.2 Kafka高性能原因:

    顺序写:

            顺序写盘的过程,consumer顺序消费数据,但是不从盘中删除数据,避免随机写盘。阿里云支持rocket删除某条消息,有可能进行的是打标记转储的方式,而不是物理删除。

    Page Cache空中接力:

            producer生产消息时,会使用pwrite()系统调用按偏移量写入数据,并且都会先写入page cache里。

            consumer消费消息时,会使用sendfile()系统调用,零拷贝地将数据从page cache传输到broker的Socket buffer,再通过网络传输。同时,page cache中的数据会随着内核中flusher线程的调度以及对sync()/fsync()的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。

            如果consumer要消费的消息不在page cache里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入page cache,以方便下一次读取。

    重要的结论:

            如果Kafka producer的生产速率与consumer的消费速率相差不大,那么就能几乎只靠对broker page cache的读写完成整个生产-消费过程,磁盘访问非常少。这个结论俗称为“读写空中接力”。 

    3、后台异步、主动FLUSH

    4、预读策略,IO调度

    Page Cache空中接力过程:

    普通程序,一次需要4次copy

            pageCache 是操作系统实现的一种主要的磁盘缓存机制/策略,以此减少磁盘IO操作应用程序寻找数据,先去pageCache中寻找数据,如果没有命中,再去磁盘中读。

            pageCache 就是把本应该从磁盘读取改为从内存读,把对磁盘的访问改为对内存的访问。

            kafka的零copy,如果有N个消费者进程,则只需要N+1次copy操作,1次copy到内核读取缓冲区,N次从缓冲区copy到网卡接口。

    1.10 Kafka集群模式

            kafka 生产者  消费者 速率相当时候,甚至都用不到磁盘,磁盘只是做个异步的备份而已。

            kafka一定有可靠性的考量,相同的消息可以存在集群中每个节点,只要不是整个集群挂掉,消息不回丢失。

    2.1 RabbitMQ进阶与实战

    2.1.1 初识RabbitMQ核心概念

            RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

    RabbitMQ,不太适合大量的消息堆积。

    AMQP : Advanced Message Queuing Protocol(高级消息队列协议)

     定义:具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,

    是应用层协议的一个开放标准,为面向消息中间件设计。

    server:代表rabbitMQ

    virtual host : 可以当成一个虚拟主机,划分模型域的概念。example: /order, /production

    Message Queue:消息队列。

    Exchange:主题的概念。

     
    

    2.1.2 AMQP核心概念

    • Server:又称Broker,接受客户端的连接,实现AMQP实体服务。
    • Connection:连接,应用程序与Broker的网络连接。
    • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
    • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、过期时间、延迟等高级特性;Body侧则就是消息体内容。
    • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。

            一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。

    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

    RabbitMQ的整体架构

            虽然Exchange与队列支持多对多,但是在实际业务中最好做到一个队列只对应一个Exchange,一个Exchange对应多个队列。设计复杂化了。

            一个消费者(Consumer)也可以消费多个队列(Queue)中的数据,但是实际业务中也不建议这么做。通常一个消费者只消费一个队列中的数据。

    RabbitMQ消息转换

    2.1.3 RabbitMQ急速安装与入门

    2.1.3.1 安装Erlang

    官网下载地址:Downloads - Erlang/OTP

    因为最新的RabbitMQ3.10.7版本要求的Erlang版本为25.0,所以我们下载Erlang的25.0.4版本。

     

            下载之后是一个opt_src_25.0.4.tar.gz的压缩包。我上传到自己的虚拟机上,上传路径为:/home/software/6-Erlang25.0.4

            解压安装包,并配置安装目录,这里我们预安装到/usr/local/erlang目录下:

    cd  /home/software/6-Erlang25.0.4/

    tar -zxvf otp_src_25.0.4.tar.gz

            安装依赖:

    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc tz

    cd otp_src_25.0.4

    ./configure --prefix=/usr/local/erlang

            执行./configure --prefix=/usr/local/erlang命令报错:configure: error: No curses library functions found

            

            重新执行:

    ./configure --prefix=/usr/local/erlang

            安装Erlang:

    make

    make install

            修改/etc/profile配置文件,添加下面的环境变量:

    vim /etc/profile

    # Erlang environment
    ERLANG_HOME=/usr/local/erlang/otp_src_25.0.4/
    export PATH=$PATH:$ERLANG_HOME/bin
    export ERLANG_HOME

     

            最后执行如下命令让配置文件生效:

    source /etc/profile

            可以输入erl命令来验证Erlang是否安装成功,如果出现类似以下的提示即表示安装成功:

    2.1.3.2 RabbitMQ安装

            安装包下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.10.7

            上传文件包rabbitmq-server-generic-unix-3.10.7.tar.xz到虚拟机/home/software/7-RabbitMQ/路径下,这个路径可以自己指定。

            解压tar.xz文件:先 xz -d xxx.tar.xz 将 rabbitmq-server-generic-unix-3.10.7.tar.xz解压成 rabbitmq-server-generic-unix-3.10.7.tar 然后,再用 tar xvf rabbitmq-server-generic-unix-3.10.7.tar来解包。

            将解压出来的文件夹移动到/usr/local/rabbitmq/路径下:

    mv rabbitmq_server-3.10.7 /usr/local/rabbitmq/

            修改/etc/profile文件,添加下面的环境变量:

    # RabbitMQ environment
    export PATH=$PATH:/usr/local/rabbitmq/rabbitmq_server-3.10.7/sbin/
    export RABBITMQ_HOME=/usr/local/rabbitmq/rabbitmq_server-3.10.7

            之后执行下面命令让配置文件生效:

    source /etc/profile

            修改主机名称:

    vim /etc/hostname

             修改/etc/hosts文件,添加如下配置:

    192.168.110.130 centos130

    2.1.3.3 RabbitMQ运行

            在修改完/etc/profile配置文件之后,可以任意打开Shell窗口,输入如下命令以运行RabbitMQ服务:

    rabbitmq-server -detached

            在rabbitmq-server命令后面添加一个"-detached"参数是为了让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。

            运行如下命令查看RabbitMQ是否正常启动:

    rabbitmqctl status

            如果RabbitMQ正常启动,会输出如下信息。当然也可以通过rabbitmqctl cluster_status命令来查看集群信息,目前只有一个RabbitMQ服务节点,可以看做单节点的集群:

    1. Status of node rabbit@localhost ...
    2. Runtime
    3. OS PID: 3166
    4. OS: Linux
    5. Uptime (seconds): 17
    6. Is under maintenance?: false
    7. RabbitMQ version: 3.10.7
    8. Node name: rabbit@localhost
    9. Erlang configuration: Erlang/OTP 25 [erts-13.0.4] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-thread s:1]
    10. Crypto library: OpenSSL 1.0.2k-fips 26 Jan 2017
    11. Erlang processes: 265 used, 1048576 limit
    12. Scheduler run queue: 1
    13. Cluster heartbeat timeout (net_ticktime): 60
    14. Plugins
    15. Enabled plugin file: /usr/local/rabbitmq/rabbitmq_server-3.10.7/etc/rabbitmq/enabled_plugins
    16. Enabled plugins:
    17. Data directory
    18. Node data directory: /usr/local/rabbitmq/rabbitmq_server-3.10.7/var/lib/rabbitmq/mnesia/rabbit@localhos t
    19. Raft data directory: /usr/local/rabbitmq/rabbitmq_server-3.10.7/var/lib/rabbitmq/mnesia/rabbit@localhos t/quorum/rabbit@localhost
    20. Config files
    21. Log file(s)
    22. * /usr/local/rabbitmq/rabbitmq_server-3.10.7/var/log/rabbitmq/rabbit@localhost.log
    23. * /usr/local/rabbitmq/rabbitmq_server-3.10.7/var/log/rabbitmq/rabbit@localhost_upgrade.log
    24. *
    25. Alarms
    26. (none)
    27. Memory
    28. Total memory used: 0.0808 gb
    29. Calculation strategy: rss
    30. Memory high watermark setting: 0.4 of available memory, computed to: 1.5816 gb
    31. code: 0.0294 gb (36.33 %)
    32. other_proc: 0.0192 gb (23.78 %)
    33. other_system: 0.0119 gb (14.78 %)
    34. allocated_unused: 0.0109 gb (13.48 %)
    35. reserved_unallocated: 0.0046 gb (5.74 %)
    36. other_ets: 0.0031 gb (3.78 %)
    37. atom: 0.0013 gb (1.65 %)
    38. binary: 0.0001 gb (0.18 %)
    39. mnesia: 0.0001 gb (0.11 %)
    40. metrics: 0.0001 gb (0.07 %)
    41. plugins: 0.0 gb (0.05 %)
    42. msg_index: 0.0 gb (0.04 %)
    43. quorum_ets: 0.0 gb (0.01 %)
    44. quorum_queue_dlx_procs: 0.0 gb (0.0 %)
    45. stream_queue_procs: 0.0 gb (0.0 %)
    46. stream_queue_replica_reader_procs: 0.0 gb (0.0 %)
    47. connection_channels: 0.0 gb (0.0 %)
    48. connection_other: 0.0 gb (0.0 %)
    49. connection_readers: 0.0 gb (0.0 %)
    50. connection_writers: 0.0 gb (0.0 %)
    51. mgmt_db: 0.0 gb (0.0 %)
    52. queue_procs: 0.0 gb (0.0 %)
    53. queue_slave_procs: 0.0 gb (0.0 %)
    54. quorum_queue_procs: 0.0 gb (0.0 %)
    55. stream_queue_coordinator_procs: 0.0 gb (0.0 %)
    56. File Descriptors
    57. Total: 2, limit: 65439
    58. Sockets: 0, limit: 58893
    59. Free Disk Space
    60. Low free disk space watermark: 0.05 gb
    61. Free disk space: 30.7848 gb
    62. Totals
    63. Connection count: 0
    64. Queue count: 0
    65. Virtual host count: 1
    66. Listeners
    67. Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    68. Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

            默认情况下,访问RabbitMQ服务的用户名和密码都是"guest",这个账户有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。

            添加新用户,用户名为"root",密码为"1TdhblkFcdhx2a":

    rabbitmqctl add_user root

            为root用户设置所有权限:

    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

            设置root用户为管理员角色:

    rabbitmqctl set_user_tags root administrator

            安装rabbitmq_management插件:

    cd /usr/local/rabbitmq/rabbitmq_server-3.10.7 

    rabbitmq-plugins enable rabbitmq_management

            通过浏览器访问RabbitMQ控制台:

     http://IP:15672

            安装成功!

    2.1.3.4 新建exchanges

            点击Exchanges标签页->Add a new exchange

            Name:exchange-test

            Type:topic

            Durability:Durable

            Auto delete:No

            Internal:No

             最后点击"Add exchange",然后可以看到新建的exchange-test这个exchange。

    2.1.3.4 新建Queues

            点击Exchanges标签页->Add a new queue

            Type:Classic

            Name:queue-test

            Durability:Durable

            Auto delete:No

             最后点击"Add queue",然后可以看到新建的queue-test这个queue。

    2.1.3.5 建立exchange和Queue的关联

            点击 "exchanges"标签页,找到exchange-test后点进去。

             之后点击Bindings,填写如下:

            点击Bind。

     2.1.3.6 在exchange里发条消息到Queue

             然后到队列里面看到已经有一条消息了。

     

    2.1.3.7 生产和消费消息

    maven依赖

    1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.15.0</version>
    6. </dependency>

    生产者客户端代码

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import com.rabbitmq.client.MessageProperties;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class RabbitProducer {
    8. private static final String EXCHANGE_NAME = "exchange_test";
    9. private static final String ROUTING_KEY = "text.*";
    10. private static final String QUEUE_NAME = "queue_test";
    11. private static final String IP_ADDRESS = "192.168.110.130";
    12. private static final int PORT = 5672; //RabbitMQ服务默认端口为5672
    13. public static void main(String[] args) throws IOException,
    14. TimeoutException, InterruptedException {
    15. ConnectionFactory factory = new ConnectionFactory();
    16. factory.setHost(IP_ADDRESS);
    17. factory.setPort(PORT);
    18. factory.setUsername("root");
    19. factory.setPassword("1TdhblkFcdhx2a");
    20. Connection connection = factory.newConnection();//创建连接
    21. Channel channel = connection.createChannel();//创建信道
    22. //创建一个type="topic"、持久化的、非自动删除的交换器。
    23. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
    24. //创建一个持久化、非排他的、非自动删除的队列
    25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    26. //将交换机与队列通过路由键绑定
    27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    28. //发送一条持久化消息:Hello World!
    29. String message = "Hello World!";
    30. channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
    31. MessageProperties.PERSISTENT_TEXT_PLAIN,
    32. message.getBytes());
    33. //关闭资源
    34. channel.close();
    35. connection.close();
    36. }
    37. }

    消费者客户端代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receiver {
    6. private static final String QUEUE_NAME = "queue_test";
    7. private static final String IP_ADDRESS = "192.168.110.130";
    8. private static final int PORT = 5672;
    9. public static void main(String[] args) throws IOException, TimeoutException,
    10. InterruptedException {
    11. Address[] address = new Address[]{
    12. new Address(IP_ADDRESS, PORT)
    13. };
    14. ConnectionFactory factory = new ConnectionFactory();
    15. factory.setUsername("root");
    16. factory.setPassword("1TdhblkFcdhx2a");
    17. // 这里的连接方式与生产者的demo略有不同,注意区别。
    18. Connection connection = factory.newConnection(address); //创建连接
    19. final Channel channel = connection.createChannel();//创建信道
    20. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    21. Consumer consumer = new DefaultConsumer(channel) {
    22. @Override
    23. public void handleDelivery(String consumerTag,
    24. Envelope envelope,
    25. AMQP.BasicProperties properties,
    26. byte[] body)
    27. throws IOException {
    28. System.out.println("recvive message:" + new String(body));
    29. try {
    30. TimeUnit.SECONDS.sleep(1);
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. channel.basicAck(envelope.getDeliveryTag(), false);
    35. }
    36. };
    37. channel.basicConsume(QUEUE_NAME, consumer);
    38. //等待回调函数执行完毕之后,关闭资源。
    39. TimeUnit.SECONDS.sleep(5);
    40. channel.close();
    41. connection.close();
    42. }
    43. }

    2.1.4 RabbitMQ核心API

     

     

     

    Direct Exchange消费者客户端代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. public class RabbitMQ4DirectExchangeConsumer {
    5. public static void main(String[] args) throws Exception {
    6. ConnectionFactory connectionFactory = new ConnectionFactory() ;
    7. connectionFactory.setHost("192.168.110.130");
    8. connectionFactory.setPort(5672);
    9. connectionFactory.setVirtualHost("/");
    10. connectionFactory.setUsername("root");
    11. connectionFactory.setPassword("1TdhblkFcdhx2a");
    12. connectionFactory.setAutomaticRecoveryEnabled(true);
    13. connectionFactory.setNetworkRecoveryInterval(3000);
    14. Connection connection = connectionFactory.newConnection();
    15. Channel channel = connection.createChannel();
    16. //4 声明
    17. String exchangeName = "test_direct_exchange";
    18. String exchangeType = "direct";
    19. String queueName = "test_direct_queue";
    20. String routingKey = "test_direct_routingKey";
    21. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    22. channel.queueDeclare(queueName, false, false, false, null);
    23. channel.queueBind(queueName, exchangeName, routingKey);
    24. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    25. Consumer consumer = new DefaultConsumer(channel) {
    26. @Override
    27. public void handleDelivery(String consumerTag,
    28. Envelope envelope,
    29. AMQP.BasicProperties properties,
    30. byte[] body)
    31. throws IOException {
    32. System.out.println("recvive message:" + new String(body));
    33. try {
    34. TimeUnit.SECONDS.sleep(1);
    35. } catch (InterruptedException e) {
    36. e.printStackTrace();
    37. }
    38. channel.basicAck(envelope.getDeliveryTag(), false);
    39. }
    40. };
    41. channel.basicConsume(queueName, consumer);
    42. //等待回调函数执行完毕之后,关闭资源。
    43. TimeUnit.SECONDS.sleep(50);
    44. channel.close();
    45. connection.close();
    46. }
    47. }

    Direct Exchange生产者客户端代码

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. public class RabbitMQ4DirectExchangeProducer {
    5. public static void main(String[] args) throws Exception {
    6. //1 创建ConnectionFactory
    7. ConnectionFactory connectionFactory = new ConnectionFactory();
    8. connectionFactory.setHost("192.168.110.130");
    9. connectionFactory.setPort(5672);
    10. connectionFactory.setVirtualHost("/");
    11. connectionFactory.setUsername("root");
    12. connectionFactory.setPassword("1TdhblkFcdhx2a");
    13. //2 创建Connection
    14. Connection connection = connectionFactory.newConnection();
    15. //3 创建Channel
    16. Channel channel = connection.createChannel();
    17. //4 声明
    18. String exchangeName = "test_direct_exchange";
    19. String routingKey = "test_direct_routingKey";
    20. //5 发送
    21. String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
    22. channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    23. }
    24. }

     

     

    可以通过模糊匹配进行队列和exchange多对多的链接。这种方式会乱。
    

    Topic Exchange消费者客户端1代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. public class Receiver4TopicExchange1 {
    5. public static void main(String[] args) throws Exception {
    6. ConnectionFactory connectionFactory = new ConnectionFactory() ;
    7. connectionFactory.setHost("192.168.110.130");
    8. connectionFactory.setPort(5672);
    9. connectionFactory.setVirtualHost("/");
    10. connectionFactory.setUsername("root");
    11. connectionFactory.setPassword("1TdhblkFcdhx2a");
    12. connectionFactory.setAutomaticRecoveryEnabled(true);
    13. connectionFactory.setNetworkRecoveryInterval(3000);
    14. Connection connection = connectionFactory.newConnection();
    15. Channel channel = connection.createChannel();
    16. //4 声明
    17. String exchangeName = "test_topic_exchange";
    18. String exchangeType = "topic";
    19. String queueName = "test_topic_queue";
    20. //String routingKey = "user.*";
    21. String routingKey = "user.#";
    22. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    23. channel.queueDeclare(queueName, false, false, false, null);
    24. channel.queueBind(queueName, exchangeName, routingKey);
    25. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    26. Consumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag,
    29. Envelope envelope,
    30. AMQP.BasicProperties properties,
    31. byte[] body)
    32. throws IOException {
    33. System.err.println("consumer1 start.. ");
    34. System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
    35. try {
    36. TimeUnit.SECONDS.sleep(1);
    37. } catch (InterruptedException e) {
    38. e.printStackTrace();
    39. }
    40. channel.basicAck(envelope.getDeliveryTag(), false);
    41. }
    42. };
    43. channel.basicConsume(queueName, consumer);
    44. //等待回调函数执行完毕之后,关闭资源。
    45. TimeUnit.SECONDS.sleep(50);
    46. channel.close();
    47. connection.close();
    48. }
    49. }

    Topic Exchange消费者客户端2代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. public class Receiver4TopicExchange2 {
    5. public static void main(String[] args) throws Exception {
    6. ConnectionFactory connectionFactory = new ConnectionFactory() ;
    7. connectionFactory.setHost("192.168.110.130");
    8. connectionFactory.setPort(5672);
    9. connectionFactory.setVirtualHost("/");
    10. connectionFactory.setUsername("root");
    11. connectionFactory.setPassword("1TdhblkFcdhx2a");
    12. connectionFactory.setAutomaticRecoveryEnabled(true);
    13. connectionFactory.setNetworkRecoveryInterval(3000);
    14. Connection connection = connectionFactory.newConnection();
    15. Channel channel = connection.createChannel();
    16. //4 声明
    17. String exchangeName = "test_topic_exchange";
    18. String exchangeType = "topic";
    19. String queueName = "test_topic_queue";
    20. String routingKey = "user.*";
    21. // String routingKey = "user.#";
    22. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    23. channel.queueDeclare(queueName, false, false, false, null);
    24. channel.queueBind(queueName, exchangeName, routingKey);
    25. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    26. Consumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag,
    29. Envelope envelope,
    30. AMQP.BasicProperties properties,
    31. byte[] body)
    32. throws IOException {
    33. System.err.println("consumer2 start.. ");
    34. System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
    35. try {
    36. TimeUnit.SECONDS.sleep(1);
    37. } catch (InterruptedException e) {
    38. e.printStackTrace();
    39. }
    40. channel.basicAck(envelope.getDeliveryTag(), false);
    41. }
    42. };
    43. channel.basicConsume(queueName, consumer);
    44. //等待回调函数执行完毕之后,关闭资源。
    45. TimeUnit.SECONDS.sleep(50);
    46. channel.close();
    47. connection.close();
    48. }
    49. }

    Topic Exchange生产者客户端代码

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. public class Sender4TopicExchange {
    5. public static void main(String[] args) throws Exception {
    6. //1 创建ConnectionFactory
    7. ConnectionFactory connectionFactory = new ConnectionFactory();
    8. connectionFactory.setHost("192.168.110.130");
    9. connectionFactory.setPort(5672);
    10. connectionFactory.setVirtualHost("/");
    11. connectionFactory.setUsername("root");
    12. connectionFactory.setPassword("1TdhblkFcdhx2a");
    13. //2 创建Connection
    14. Connection connection = connectionFactory.newConnection();
    15. //3 创建Channel
    16. Channel channel = connection.createChannel();
    17. //4 声明
    18. String exchangeName = "test_topic_exchange";
    19. String routingKey1 = "user.save";
    20. String routingKey2 = "user.update";
    21. String routingKey3 = "user.delete.abc";
    22. //5 发送
    23. String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
    24. channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
    25. channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
    26. channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
    27. channel.close();
    28. connection.close();
    29. }
    30. }

    Fanout Exchange消费者客户端1代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. public class Receiver4FanoutExchange1 {
    5. public static void main(String[] args) throws Exception {
    6. ConnectionFactory connectionFactory = new ConnectionFactory() ;
    7. connectionFactory.setHost("192.168.110.130");
    8. connectionFactory.setPort(5672);
    9. connectionFactory.setVirtualHost("/");
    10. connectionFactory.setUsername("root");
    11. connectionFactory.setPassword("1TdhblkFcdhx2a");
    12. connectionFactory.setAutomaticRecoveryEnabled(true);
    13. connectionFactory.setNetworkRecoveryInterval(3000);
    14. Connection connection = connectionFactory.newConnection();
    15. Channel channel = connection.createChannel();
    16. //4 声明
    17. String exchangeName = "test_fanout_exchange";
    18. String exchangeType = "fanout";
    19. String queueName = "test_fanout_queue";
    20. String routingKey = ""; // 不设置路由键
    21. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    22. channel.queueDeclare(queueName, false, false, false, null);
    23. channel.queueBind(queueName, exchangeName, routingKey);
    24. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    25. Consumer consumer = new DefaultConsumer(channel) {
    26. @Override
    27. public void handleDelivery(String consumerTag,
    28. Envelope envelope,
    29. AMQP.BasicProperties properties,
    30. byte[] body)
    31. throws IOException {
    32. System.err.println("--------------- consumer 1 --------------");
    33. System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
    34. try {
    35. TimeUnit.SECONDS.sleep(1);
    36. } catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. channel.basicAck(envelope.getDeliveryTag(), false);
    40. }
    41. };
    42. //参数:队列名称、是否自动ACK、Consumer
    43. channel.basicConsume(queueName, true, consumer);
    44. //等待回调函数执行完毕之后,关闭资源。
    45. TimeUnit.SECONDS.sleep(50);
    46. channel.close();
    47. connection.close();
    48. }
    49. }

    Fanout Exchange消费者客户端2代码

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeUnit;
    4. public class Receiver4FanoutExchange2 {
    5. public static void main(String[] args) throws Exception {
    6. ConnectionFactory connectionFactory = new ConnectionFactory();
    7. connectionFactory.setHost("192.168.110.130");
    8. connectionFactory.setPort(5672);
    9. connectionFactory.setVirtualHost("/");
    10. connectionFactory.setUsername("root");
    11. connectionFactory.setPassword("1TdhblkFcdhx2a");
    12. connectionFactory.setAutomaticRecoveryEnabled(true);
    13. connectionFactory.setNetworkRecoveryInterval(3000);
    14. Connection connection = connectionFactory.newConnection();
    15. Channel channel = connection.createChannel();
    16. //4 声明
    17. String exchangeName = "test_fanout_exchange";
    18. String exchangeType = "fanout";
    19. String queueName = "test_fanout_queue";
    20. String routingKey = ""; // 不设置路由键
    21. channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    22. channel.queueDeclare(queueName, false, false, false, null);
    23. channel.queueBind(queueName, exchangeName, routingKey);
    24. channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
    25. Consumer consumer = new DefaultConsumer(channel) {
    26. @Override
    27. public void handleDelivery(String consumerTag,
    28. Envelope envelope,
    29. AMQP.BasicProperties properties,
    30. byte[] body)
    31. throws IOException {
    32. System.err.println("--------------- consumer 2 --------------");
    33. System.out.println("recvive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
    34. try {
    35. TimeUnit.SECONDS.sleep(1);
    36. } catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. channel.basicAck(envelope.getDeliveryTag(), false);
    40. }
    41. };
    42. //参数:队列名称、是否自动ACK、Consumer
    43. channel.basicConsume(queueName, true, consumer);
    44. //等待回调函数执行完毕之后,关闭资源。
    45. TimeUnit.SECONDS.sleep(50);
    46. channel.close();
    47. connection.close();
    48. }
    49. }

    Fanout Exchange生产者客户端代码

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. public class Sender4FanoutExchange {
    5. public static void main(String[] args) throws Exception {
    6. //1 创建ConnectionFactory
    7. ConnectionFactory connectionFactory = new ConnectionFactory();
    8. connectionFactory.setHost("192.168.110.130");
    9. connectionFactory.setPort(5672);
    10. connectionFactory.setVirtualHost("/");
    11. connectionFactory.setUsername("root");
    12. connectionFactory.setPassword("1TdhblkFcdhx2a");
    13. //2 创建Connection
    14. Connection connection = connectionFactory.newConnection();
    15. //3 创建Channel
    16. Channel channel = connection.createChannel();
    17. //4 声明
    18. String exchangeName = "test_fanout_exchange";
    19. //5 发送
    20. for(int i = 0; i < 10; i ++) {
    21. String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
    22. channel.basicPublish(exchangeName, "" , null , msg.getBytes());
    23. }
    24. channel.close();
    25. connection.close();
    26. }
    27. }

    2.1.5 RabbitMQ核心API-其它关键概念讲解

  • 相关阅读:
    IOS面试题object-c 146-150
    分布式.高并发&高可用
    多目标优化算法:基于非支配排序的鱼鹰优化算法(NSOOA)MATLAB
    (swjtu西南交大)数据库实验(概念数据库设计及逻辑关系转换):音乐软件数据管理系统
    Manacher算法
    马斯克的这波神操作,让我意识到保持写代码的能力有多重要
    【图像分割】基于萤火虫算法实现图像分割附matlab代码
    【面试篇】地平线深度学习模型算法开发一面
    【zookeeper】zookeeper监控指标查看
    C Primer Plus(6) 中文版 第7章 C控制语句:分支和跳转 7.5 条件运算符 ?:
  • 原文地址:https://blog.csdn.net/Xx13624558575/article/details/126685993