• RabbitMQ快速入门


    介绍

    RabbitMQ是一款成熟可靠的消息中间件,现在已经被全世界几亿用户使用。

    可互操作的(Interoperable)

    RabbitMQ支持了多个开放的标准协议,不同系统、语言可以按照这个协议进行消息传递和交互。RabbitMQ本身是使用Erlang语言写的,但提供了其他各种语言版本:Python、Java、Go........

    灵活的(Flexible)

    RabbitMQ提供了多种选项来进行配置消息转发。在路由方式中:支持简单模式、工作模式、发布/订阅模式和主题模式,在筛选中,通过routineKey进行筛选,主要有Direct、Fanout、Topic、Headers.

    可靠的(Reliable)

    RabbitMQ通过一系列机制能够保证消息的可靠性和安全性。如:消息持久化、消息确认、死信队列等,数据需要进行二进制化.


    在RabbitMQ中有这几个重要的角色:

    1.虚拟主机virtualHost:类似数据库中database的作用,主要用来进行隔离交换机、队列

    2.交换机exchange:主要用于消息的转发。

    3.队列queue:用来存放消息的地方。

    4.绑定bind:维护交换机和队列之间的关系。

    5.消息message:传递过程中的数据。

    消息队列主要是用来实现生产者消费者模型,在RabbitMQ中仅支持消息推送的方式(pull),即消费者通过订阅某个队列,当有消息来的时候,将消息发送给消费者。

    常用API

    在RabbitMQ中最基础最常用的API,大致有如下几种:

    连接相关、交换机、队列、绑定、发布消息、消费消息


    ConnectionFactory:

    ConnectionFactory负责的是配置当前连接的一些信息。

    方法功能
    setHost(String host)设置连接服务器ip
    setPort(int port)设置连接服务器的端口
    setUsername(String username)设置登录服务器的用户名
    setPassword(String password)设置登录服务器的密码
    setVirtualHost(String virtualHost)设置访问服务器的虚拟主机(用来隔离数据)
    newConnection()创建与服务器的连接,一次TCP通信

    Connection:

    Connection本质就是一次TCP通信,持有本次通信的socket,用来管理channel。

    方法功能
    createChannel()创建channel,复用每一次TCP连接
    close()关闭本次连接

    Channel:

    Channel并不是真正物理上的连接,只是逻辑上的连接,我们要操作消息队列,需要去调用API,而这些API大部分都是在channel下的,在下面讲解。


    Exchange:

    方法功能
    exchangeDeclare(String exchange, String type);创建交换机
    exchangeDeclare(String exchange, BuiltinExchangeType type);创建交换机

    exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments    

    exchangeDelete(String exchange)

    exchangeDeclare(String exchange, String type);

    exchange -> 交换机的名字

    Type -> 交换机的类型。有如下几种:"direct", "fanout", "topic", "headers"

    exchangeDeclare(String exchange, BuiltinExchangeType type);

    此处使用的是枚举类型,和上述字符串形式是一样的,底层都是字符串。

    exchangeDeclare( String exchange,

                                    String type,

                                    boolean durable,

                                    boolean autoDelete,

                                    boolean internal,

                                    Map arguments)

    durable -> 是否进行持久化,当服务器重启,会进行加载

    autoDelete -> 是否自动删除,当交换机不再被使用会进行删除

    internal -> 是否为内部交换机,即不能被用户推送消息

    arguments -> 一些额外的配置参数


    Queue:

    方法功能
    queueDeclare()创建一个匿名队列
    queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)创建一个队列
    queueDelete()删除一个队列

    Bind:

    方法攻能
    queueBind(String queue, String exchange, String routingKey)将一个队列和一个交换机进行绑定
    queueUnbind(String queue, String exchange, String routingKey)解除绑定

    发布消息:

    方法        功能
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);发布一个消息到指定交换机中

    routingKey -> 指的是与Bind中的routingKey相同,类似一个口令

    Properties -> 消息的一些属性

    body -> 消息本体

    在RabbitMQ中,一个消息大概是由三部分组成:Envelope、Properties、body

    Envelop:“信封”,描述的是消息的目的地(交换机)、消息的标识、routingKey等

    Properties:“特性”,描述的是消息的一些是否持久、内容编码、优先级等

    body:“内容”,描述的是这个消息的二进制形式


    消费消息:

    消费消息主要可以使用两种API:

    方法功能
    basicConsumer(String queue, boolean autoAck, DeliverCallback deliverCallback, CannelCallback cancelCallback);消费消息
    basicConsumer(String queue, boolean autoAck, Consumer consumer);消费消息

    deliverCallback -> 当消息被运送到客户端,这个回调接口将被执行

    canncelCallback -> 当消费者取消时执行。

    第二种则是需要写一个接口,而这个接口中有很多需要实现的方法,但我们一般使用的方法是handleDelivery,因此我们可以使用一个实现类DefaultConsumer,在这个类中对Consumer的方法都进行了重写,我们可以再将handleDelivery进行重写,自定义内容即可。

    六种消息发送模型

    一、Hello World!(基本模型:一对一)

    官方称作是"Hello World!"。涉及到的角色:一个生产者、一个消费者、一个队列。

    Producer生产者,将消息发送到Queue队列中,Consumer消费者再订阅队列,从而接收到消息。

    由于不区分生产者或者消费者谁先谁后的问题,因此一在两边都会去申明队列。

    生产者:

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. public class Producer {
    7. private final static String QUEUE_NAME = "hello";
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. //1.创建连接工厂并配置
    10. ConnectionFactory connectionFactory = new ConnectionFactory();
    11. connectionFactory.setHost("localhost");
    12. connectionFactory.setPort(5672);
    13. connectionFactory.setUsername("guest");
    14. connectionFactory.setPassword("guest");
    15. connectionFactory.setVirtualHost("/");
    16. //2.通过工厂创建连接,并获取channel
    17. Connection connection = connectionFactory.newConnection();
    18. Channel channel = connection.createChannel();
    19. //3.创建队列
    20. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    21. //4.发送消息给队列
    22. System.out.println("===生产者开始生产消息===");
    23. long startTime = System.currentTimeMillis();
    24. channel.basicPublish("", QUEUE_NAME, null, "hello world".getBytes());
    25. long endTime = System.currentTimeMillis();
    26. System.out.println("===生产者完成生产消息===");
    27. System.out.println("生产时间:" + (endTime - startTime) + "ms");
    28. }
    29. }

    消费者:

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeoutException;
    4. public class Consumer {
    5. private final static String QUEUE_NAME = "hello";
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. //1.创建连接工厂并配置
    8. ConnectionFactory connectionFactory = new ConnectionFactory();
    9. connectionFactory.setHost("localhost");
    10. connectionFactory.setPort(5672);
    11. connectionFactory.setUsername("guest");
    12. connectionFactory.setPassword("guest");
    13. connectionFactory.setVirtualHost("/");
    14. //2.通过工厂创建连接,并获取channel
    15. Connection connection = connectionFactory.newConnection();
    16. Channel channel = connection.createChannel();
    17. //3.创建队列
    18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    19. //4.消费消息
    20. //处理消息的回调
    21. DeliverCallback deliverCallback = new DeliverCallback() {
    22. @Override
    23. public void handle(String consumerTag, Delivery message) throws IOException {
    24. System.out.println("消息是:" + new String(message.getBody()));
    25. }
    26. };
    27. channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag -> {}));
    28. }
    29. }

    在这种模型下,我们可以很方便的进行数据的传输。但是我们来细看生产者的代码,在routineKey的参数上我们填写的是队列名,这本质上用到了默认交换机的消息转发。

    默认交换机:

    1.在RabbitMQ管理面板中,有一个交换机叫 AMQP Default,它是一个Direct类型的交换机。

    2.当我们创建了一个新的队列,这个队列会绑定到着这个默认交换机(后面可以通过API修改绑定到其他交换机),绑定的routineKey是该队列的名称。

    3.默认交换机不能通过调用API来进行绑定,也不能解除绑定

    4.默认交换机也不能被删除

    RabbitMQ这样做的意图可能是为了简化代码、快速上手,开发人员可以聚焦在其他方面,但我们究其所以然,可以得出,这个模式本质上使用的是后面的Routing模式。

    二、Work Queues(基本模型:一对多)

    涉及的角色:一个生产者、一个队列、多个消费者。

    当有多个消费者去订阅一个队列,那数据该怎么传递呢?通过轮训的方式!

    例如:C1和C2订阅了Queue,此时生产者生产了1-10个数,C1就会获取里面所有的奇数,C2就会获取到里面所有的偶数。

    当然,我们也可以通过一些配置,不通过轮训的方式。

    生产者:

    1. import com.example.rabbitmqtest02.constant.Constant;
    2. import com.example.rabbitmqtest02.utils.ConnectionUtils;
    3. import com.rabbitmq.client.Channel;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. public class Producer {
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. //将之前的连接进行封装
    9. Channel channel = ConnectionUtils.getChannel();
    10. //创建队列
    11. channel.queueDeclare(Constant.QUEUE_NAME_1, false, false, false, null);
    12. //生产消息
    13. for(int i = 1; i <= 10; i++) {
    14. String message = i + "";
    15. channel.basicPublish("", Constant.QUEUE_NAME_1, null, message.getBytes());
    16. }
    17. System.out.println("====消息生产完毕===");
    18. }
    19. }

    消费者1 和 消费者2:

    1. public class Consumer01 {
    2. private static final String CONSUMER_TAG = "Consumer01";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //建立连接
    5. Channel channel = ConnectionUtils.getChannel();
    6. //声明队列
    7. channel.queueDeclare(Constant.QUEUE_NAME_1, false, false, false, null);
    8. //消费消息
    9. channel.basicConsume(Constant.QUEUE_NAME_1, true, new DefaultConsumer(channel) {
    10. @Override
    11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    12. System.out.println(CONSUMER_TAG + "的消息:" + new String(body));
    13. }
    14. });
    15. }
    16. }
    17. public class Consumer02 {
    18. private static final String CONSUMER_TAG = "Consumer02";
    19. public static void main(String[] args) throws IOException, TimeoutException {
    20. //建立连接
    21. Channel channel = ConnectionUtils.getChannel();
    22. //声明队列
    23. channel.queueDeclare(Constant.QUEUE_NAME_1, false, false, false, null);
    24. //消费消息
    25. channel.basicConsume(Constant.QUEUE_NAME_1, true, new DefaultConsumer(channel) {
    26. @Override
    27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    28. System.out.println(CONSUMER_TAG + "的消息:" + new String(body));
    29. }
    30. });
    31. }
    32. }

    三、Publish/Subscribe(Fanout 交换机)

    从现在开始,我们才需要开始注意交换机。

    主要角色:一个生产者、一个fanout类型的交换机、多个队列、(多个消费者)

    我们创建了多个队列,可以将这多个队列与创建的交换机建立绑定关系,当一条消息被发送到交换机上,交换机会将消息转发给与之绑定的所有队列中。

    生产者:

    1. import com.example.rabbitmqtest02.constant.Constant;
    2. import com.example.rabbitmqtest02.utils.ConnectionUtils;
    3. import com.rabbitmq.client.Channel;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. public class Producer {
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. //建立连接
    9. Channel channel = ConnectionUtils.getChannel();
    10. //创建交换机
    11. channel.exchangeDeclare(Constant.EXCHANGE_NAME_1, "fanout");
    12. //创建队列并建立绑定
    13. channel.queueDeclare(Constant.QUEUE_NAME_1, false, false, false, null);
    14. channel.queueBind(Constant.QUEUE_NAME_1, Constant.EXCHANGE_NAME_1, "xxx");
    15. channel.queueDeclare(Constant.QUEUE_NAME_2, false, false, false, null);
    16. channel.queueBind(Constant.QUEUE_NAME_2, Constant.EXCHANGE_NAME_1, "xxxxxx");
    17. //生产消息
    18. for(int i = 1; i <= 10; i++) {
    19. String message = i + "";
    20. channel.basicPublish(Constant.EXCHANGE_NAME_1, "", null, message.getBytes());
    21. }
    22. System.out.println("====消息生产完毕===");
    23. }
    24. }

    四、Routing(Direct交换机)

    这种方式主要是通过routineKey来进行消息转发的。routineKey类似于一个口令,当我们发送消息时的routineKey要与一开始绑定的时候的routineKey对得上(一模一样)才能进行转发。

    生产者:

    1. import com.example.rabbitmqtest02.constant.Constant;
    2. import com.example.rabbitmqtest02.utils.ConnectionUtils;
    3. import com.rabbitmq.client.Channel;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. public class Producer {
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. //建立连接
    9. Channel channel = ConnectionUtils.getChannel();
    10. //创建交换机
    11. channel.exchangeDeclare(Constant.EXCHANGE_NAME_1, "direct");
    12. //创建队列并建立绑定
    13. channel.queueDeclare(Constant.QUEUE_NAME_1, false, false, false, null);
    14. channel.queueBind(Constant.QUEUE_NAME_1, Constant.EXCHANGE_NAME_1, "111");
    15. channel.queueDeclare(Constant.QUEUE_NAME_2, false, false, false, null);
    16. channel.queueBind(Constant.QUEUE_NAME_2, Constant.EXCHANGE_NAME_1, "222");
    17. channel.queueDeclare(Constant.QUEUE_NAME_3, false, false, false, null);
    18. channel.queueBind(Constant.QUEUE_NAME_3, Constant.EXCHANGE_NAME_1, "111");
    19. //生产消息
    20. for(int i = 1; i <= 10; i++) {
    21. String message = i + "";
    22. channel.basicPublish(Constant.EXCHANGE_NAME_1, "111", null, message.getBytes());
    23. }
    24. System.out.println("====消息生产完毕===");
    25. }
    26. }

    五、Topics(Topic交换机)

    Topic模式下的交换机与Direct模式的交换机有点类似,不同的是Topic采用了特定形式的routineKey,因此路由的功能更加强大,可以支持通配符,当然也能进行“模糊匹配”了~

    特定形式形如:aaa.bbb.ccc.*.ddd.#

    1.单词列表通过 点 来分隔。

    2.* 表示能匹配上任意一个单词

    3.#表示能匹配 零个或任意多个单词

    举例:

    被匹配:aaa.bbbb.ccc

    需匹配:# ->匹配成功

                  *.bbb.* -> 匹配成功

                  *.aaa.* ->匹配失败

    生产者:

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. import java.util.concurrent.TimeoutException;
    8. public class Producer {
    9. private static final String EXCHANGE_NAME = "TopicExchange";
    10. private static final String QUEUE_NAME01 = "Queue01";
    11. private static final String QUEUE_NAME02 = "Queue02";
    12. public static void main(String[] args) throws IOException, TimeoutException {
    13. //创建工厂类并使用默认配置
    14. ConnectionFactory factory = new ConnectionFactory();
    15. Connection connection = factory.newConnection();
    16. Channel channel = connection.createChannel();
    17. //创建Topic类型的交换机
    18. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    19. //创建队列
    20. channel.queueDeclare(QUEUE_NAME01, false, false, false, null);
    21. channel.queueBind(QUEUE_NAME01, EXCHANGE_NAME, "*.aaa.bbb.#");
    22. channel.queueDeclare(QUEUE_NAME02, false, false, false, null);
    23. //可以匹配任意的
    24. channel.queueBind(QUEUE_NAME02, EXCHANGE_NAME, "#");
    25. Map pair = new HashMap<>();
    26. pair.put("bbb.aaa.bbb", "1"); //queue1 queue2 都接收
    27. pair.put("bbb.aaa.bbb.ccc.ddd", "2"); //queue2 接收
    28. pair.put("aaa.aaa.ccc", "3"); //queue1 queue2 都接收
    29. pair.put("aaa.bbb.ccc", "4"); //queue2 接收
    30. pair.put("aaa.bbb", "5"); //queue2 接收
    31. pair.put("aaa", "6"); //queue2 接收
    32. for(Map.Entry e : pair.entrySet()){
    33. String routineKey = e.getKey();
    34. String message = e.getValue();
    35. channel.basicPublish(EXCHANGE_NAME, routineKey, null, message.getBytes());
    36. }
    37. System.out.println("发送成功~");
    38. }
    39. }

    六、RPC

    在RPC这个模式中,就并不特意区分消费者和生产者了。因为一个客户端既是生产者又是消费者。

    RPC即远程程序调用,在这个模式下,主要分为客户端和服务器。

    客户端将服务器上需要调用的函数的参数通过网络传输过去,服务器接受并调用函数计算,将结果返回给客户端。

    流程如下:

    1.客户端创建连接,并声明队列

    2.客户端创建corrlationID,这个用于标识每一次RPC的。在客户端与服务器交互中,可能需要多次进行远程调用,为了提高效率,就采用异步的方式,需要使用corrlationID来区分每次调用。

    3.客户端还需要创建回调队列,这个队列里面存放的是服务器端计算的数据。

    4.客户端将消息发送到消息队列服务器

    5.客户端订阅回调队列,等待服务器端计算完的数据


    1.服务器创建连接,并声明队列

    2.服务器订阅上述声明的队列

    3.服务器需要在basicConsume方法的回调函数中,将响应结果发布到回调队列。

    客户端:

    1. import com.rabbitmq.client.AMQP;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.UUID;
    7. import java.util.concurrent.CompletableFuture;
    8. import java.util.concurrent.ExecutionException;
    9. import java.util.concurrent.TimeoutException;
    10. public class Client {
    11. private static ConnectionFactory factory;
    12. private static Connection connection;
    13. private static Channel channel;
    14. private static final String QUEUE = "queue";
    15. //用来存放lambda的计算结果
    16. private static int ret;
    17. public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
    18. factory = new ConnectionFactory();
    19. connection = factory.newConnection();
    20. channel = connection.createChannel();
    21. //创建队列
    22. channel.queueDeclare(QUEUE, false, false, false, null);
    23. //这里让服务器计算一下sum(1, num)
    24. int ans = Integer.parseInt(rpcCall(100));
    25. System.out.println("计算结果:" + ans);
    26. }
    27. private static String rpcCall(int num) throws IOException, ExecutionException, InterruptedException {
    28. //生成本次调用的唯一请求ID
    29. String corrID = UUID.randomUUID().toString();
    30. //生成响应回调队列
    31. String replyQueueName = channel.queueDeclare().getQueue();
    32. //配置消息的属性
    33. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
    34. .Builder()
    35. .correlationId(corrID)
    36. .replyTo(replyQueueName)
    37. .build();
    38. //发送消息
    39. channel.basicPublish("", QUEUE, basicProperties, ("" + num).getBytes());
    40. //由于消费消息会创建一个单独的线程,需要进行阻塞main线程
    41. CompletableFuture response = new CompletableFuture<>();
    42. //消费计算的消息
    43. channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
    44. if(delivery.getProperties().getCorrelationId().equals(corrID)){
    45. response.complete(new String(delivery.getBody()));
    46. }
    47. }, (consumerTag) -> {
    48. });
    49. return response.get();
    50. }
    51. }

    服务器:

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. public class Server {
    7. private static ConnectionFactory factory;
    8. private static Connection connection;
    9. private static Channel channel;
    10. private static final String QUEUE = "queue";
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. //创建连接
    13. factory = new ConnectionFactory();
    14. connection = factory.newConnection();
    15. channel = connection.createChannel();
    16. //创建队列
    17. channel.queueDeclare(QUEUE, false, false, false, null);
    18. //接收消息
    19. //此处需要做的是,对客户端那边的消息计算并响应
    20. //将响应结果发送到客户端那边的响应回调队列
    21. channel.basicConsume(QUEUE, true, (consumerTag, delivery) -> {
    22. //1.接收消息并计算响应
    23. String message = new String(delivery.getBody());
    24. int ans = Sum(Integer.parseInt(message));
    25. //2.将响应发送到回调队列中
    26. channel.basicPublish("", delivery.getProperties().getReplyTo(), delivery.getProperties(), ("" + ans).getBytes());
    27. }, (consumerTag) -> {
    28. });
    29. }
    30. private static int Sum(int num) {
    31. int tmp = 0;
    32. for(int i = 1; i <= num; i++){
    33. tmp += i;
    34. }
    35. return tmp;
    36. }
    37. }

  • 相关阅读:
    git:一、GIT介绍+安装+全局配置+基础操作
    string(讲解)
    一文了解企业云盘和大文件传输哪个更适合企业传输
    【无标题】scp的使用
    【字符串匹配算法】KMP、哈希
    【MySQL】数据库服务器硬件优化与实战详解(调优篇)(实战篇)(MySQL专栏启动)
    净重新分类指数NRI的计算
    河北大学选择ZStack Cube超融合一体机打造实训云平台
    郑州分销系统开发|电商行业能做分销系统吗?
    【LLM教程】为什么做大语言模型fine tuning时,要将 drop_last_batch设置为True?
  • 原文地址:https://blog.csdn.net/Smarmot/article/details/139576691