• (二)丶RabbitMQ的六大核心


    一丶什么是MQ

            Message Queue(消息队列)简称MQ,是一种应用程序对应用程序的消息通信机制。在MQ中,消息以队列形式存储,以便于异步传输,在MQ中,发布者(生产者)将消息放入队列,而消费者从队列中读取并处理这些消息,这种设计允许生产者和消费者之间解耦,提高系统的响应速度和吞吐量,MQ常用于解耦系统之间的依赖关系,提高系统的稳定性和可扩展性,MQ还支持消峰,即以稳定的系统资源应对突发的流量冲剂,然而使用MQ也可能带来一些挑战,如:系统可用性降低、系统复杂度提高、以及消息一致性问题等;

    二丶常见MQ有哪些

            目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等,也有直接使用Redis充当消息队列的案列,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ的产品特征等,综合考虑;

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是Erlang语言编写,而集群和故障转移是构建在开放电信平台框架上的。所有的主要变成语言均有与代理接口通讯的客户端库。

    三丶RabbitMQ六种消息模式

            1.Simple Work Queue(简单工作队列):常见的一对一模式,一条消息由一个消费者进行消费。如果有多个消费者,默认是使用轮询的方式将消息分配消费者

            2.Work Queues(工作队列模式):也叫公屏队列,能者多劳的消息队列模型。队列必须收到来自消费者的手动ACK(消息确认机制)才可以继续往消费者发送消息。

            3.Publish/Subscribe(发布订阅模式):一条消息被多和消费者消费。

            4.Ruoting(路由模式):有选择的接收消息。

            5.Topics(主题模式):通过一定的规则来选择性的接收消息

            6.RPC模式:发布者发布消息,并且通过RPC方式等待结果。

    (1)Simple Work Queue(简单工作队列)

            消息生产后将消息放入队列,消息的消费者(consumer)监听消息队列嘛,如果队列中有消息就消费掉,消息被消费后自动从消息队列中删除。(也可能出现异常)

    1. /**
    2. * @Description:获取RabbitMQ连接
    3. * @Author: xy丶
    4. */
    5. public class RabbitMQConnection {
    6. public final static String RABBITMQ_SERVER_HOST = "192.168.0.122";
    7. public final static int RABBITMQ_SERVER_PORT = 5672;
    8. public final static String VIRTUAL_HOST = "/XY_HOST";
    9. public static Connection getConnection() throws IOException, TimeoutException {
    10. //1、创建连接
    11. ConnectionFactory factory = new ConnectionFactory();
    12. //2、设置主机名
    13. factory.setHost(RABBITMQ_SERVER_HOST);
    14. //3、设置通讯端口,默认是5672,不专门设置也可以
    15. factory.setPort(RABBITMQ_SERVER_PORT);
    16. //4、设置账号和密码
    17. factory.setUsername("admin");
    18. factory.setPassword("admin");
    19. //4、设置Virtual Host
    20. factory.setVirtualHost(VIRTUAL_HOST);
    21. //5、创建连接
    22. return factory.newConnection();
    23. }
    24. }

           消费者

    1. /**
    2. * @Description:简单工作队列模式消费者
    3. * @Author: xy丶
    4. */
    5. @Slf4j
    6. public class Consumer {
    7. private final static String SIMPLE_WORK_QUEUE= "simple_work_queue";
    8. public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
    9. //获取连接
    10. Connection connection = RabbitMQConnection.getConnection();
    11. //获取通道
    12. Channel channel = connection.createChannel();
    13. channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
    14. QueueingConsumer consumer = new QueueingConsumer(channel);
    15. channel.basicConsume(SIMPLE_WORK_QUEUE, true, consumer);
    16. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    17. String message = new String(delivery.getBody());
    18. log.info("Consumer reception "+message);
    19. }
    20. }

    生产者

    1. /**
    2. * @Description:简单工作队列模式生产者
    3. * @Author: xy丶
    4. */
    5. @Slf4j
    6. public class Producer {
    7. private final static String SIMPLE_WORK_QUEUE = "simple_work_queue";
    8. /**
    9. * 简单工作队列模式
    10. * @param args
    11. * @throws IOException
    12. * @throws TimeoutException
    13. */
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //创建连接
    16. Connection connection = RabbitMQConnection.getConnection();
    17. //创建通道
    18. Channel channel = connection.createChannel();
    19. // 声明队列 String var1, 是否持久化
    20. // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
    21. // boolean var3, 是否自动删除
    22. // boolean var4, 消费完删除
    23. // Map<String, Object> var5 其他属性
    24. channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
    25. // 消息内容 String var1, 是否持久化
    26. // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
    27. // boolean var3, 是否自动删除
    28. // boolean var4, 消费完删除
    29. // Map<String, Object> var5 其他属性
    30. String message = "Hello Word!!!";
    31. channel.basicPublish("", SIMPLE_WORK_QUEUE,null,message.getBytes());
    32. log.info("Producer send "+message);
    33. //最后关闭通关和连接
    34. channel.close();
    35. connection.close();
    36. }
           (2).Work Queues(工作队列模式) 创建生产者两个消费者看看效果

            生产者

    1. package com.puwang.MQ.workQueue;
    2. import com.puwang.MQ.config.RabbitMQConnection;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import lombok.extern.slf4j.Slf4j;
    6. import java.io.IOException;
    7. import java.util.Scanner;
    8. import java.util.concurrent.TimeoutException;
    9. /**
    10. * @Description:工作队列模式生产者
    11. * @Author: xy丶
    12. */
    13. @Slf4j
    14. public class Producer {
    15. private final static String QUEUE_WORK = "QUEUE_WORK";
    16. public static void main(String[] args) throws Exception {
    17. Connection connection = RabbitMQConnection.getConnection();
    18. Channel channel = connection.createChannel();
    19. channel.queueDeclare(QUEUE_WORK, false, false, false, null);
    20. for(int i = 0; i < 20; i++){
    21. String message = "娃哈哈" + i;
    22. channel.basicPublish("", QUEUE_WORK, null, message.getBytes());
    23. System.out.println("send=============="+message);
    24. Thread.sleep(i*10);
    25. }
    26. channel.close();
    27. connection.close();
    28. }
    29. }
    1. @Slf4j
    2. public class WorkQueueConsumer1 {
    3. private final static String QUEUE_WORK = "QUEUE_WORK";
    4. /**
    5. * 结果:
    6. *
    7. * 1、一条消息只会被一个消费者接收;
    8. *
    9. * 2、rabbit采用轮询的方式将消息是平均发送给消费者的;
    10. *
    11. * 3、消费者在处理完某条消息后,才会收到下一条消息。
    12. * @param args
    13. * @throws IOException
    14. * @throws TimeoutException
    15. */
    16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    17. Connection connection = RabbitMQConnection.getConnection();
    18. Channel channel = connection.createChannel();
    19. channel.queueDeclare(QUEUE_WORK, false,false, false,null);
    20. //同一时刻服务器只会发送一条消息给消费者
    21. channel.basicQos(1);
    22. QueueingConsumer consumer = new QueueingConsumer(channel);
    23. //关于手工确认 待之后有时间研究下
    24. channel.basicConsume(QUEUE_WORK, false, consumer);
    25. while(true){
    26. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    27. String message = new String(delivery.getBody());
    28. log.info("[消费者1] Received1 '"+message+"'");
    29. Thread.sleep(10);
    30. //返回确认状态
    31. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    32. }
    33. }
    34. }
    35. @Slf4j
    36. public class WorkQueueConsumer2 {
    37. private final static String QUEUE_WORK = "QUEUE_WORK";
    38. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    39. Connection connection = RabbitMQConnection.getConnection();
    40. Channel channel = connection.createChannel();
    41. channel.queueDeclare(QUEUE_WORK, false,false, false,null);
    42. //同一时刻服务器只会发送一条消息给消费者
    43. channel.basicQos(1);
    44. QueueingConsumer consumer = new QueueingConsumer(channel);
    45. //关于手工确认 待之后有时间研究下
    46. channel.basicConsume(QUEUE_WORK, false, consumer);
    47. while(true){
    48. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    49. String message = new String(delivery.getBody());
    50. log.info("[消费者2] Received1 '"+message+"'");
    51. Thread.sleep(10);
    52. //返回确认状态
    53. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    54. }
    55. }
    56. }
           (3).Publish/Subscribe(发布订阅模式)

            生产者

    1. /**
    2. * 订阅模式 生产者
    3. * 订阅模式:一个生产者发送的消息会被多个消费者获取。
    4. * 消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
    5. * 相关场景:邮件群发,群聊天,广播(广告)
    6. * @Description:发布订阅模式生产者
    7. * @Author: xy丶
    8. */
    9. @Slf4j
    10. public class Producer {
    11. private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
    12. /**
    13. * 交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
    14. * 相关场景:邮件群发,群聊天,广播(广告)
    15. * @param args
    16. */
    17. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    18. //获取连接
    19. Connection connection = RabbitMQConnection.getConnection();
    20. //从连接中获取一个通道
    21. Channel channel = connection.createChannel();
    22. 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
    23. channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
    24. //发送消息
    25. for (int i = 0; i < 10; i++) {
    26. String message = "哇哈哈哈!!!"+i;
    27. log.info("send message:" + message);
    28. //发送消息
    29. channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE, "", null, message.getBytes("utf-8"));
    30. Thread.sleep(100 * i);
    31. }
    32. channel.close();
    33. connection.close();
    34. }
    35. }

    消费者

    1. /**
    2. * @Description:发布订阅模式消费者
    3. * @Author: xy丶
    4. */
    5. @Slf4j
    6. public class PublishSubscribeConsumer1 {
    7. //交换机名称
    8. private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
    9. //队列名称
    10. private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. //获取连接
    13. Connection connection = RabbitMQConnection.getConnection();
    14. //从连接中获取一个通道
    15. final Channel channel = connection.createChannel();
    16. //声明交换机(分发:发布/订阅模式)
    17. channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
    18. //声明队列
    19. channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
    20. //将队列绑定到交换机
    21. channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
    22. //保证一次只分发一个
    23. int prefetchCount = 1;
    24. channel.basicQos(prefetchCount);
    25. //定义消费者
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. //当消息到达时执行回调方法
    28. @Override
    29. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    30. String message = new String(body, "utf-8");
    31. log.info("[PublishSubscribeConsumer1] Receive message:" + message);
    32. try {
    33. //消费者休息2s处理业务
    34. Thread.sleep(1000);
    35. }
    36. catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. finally {
    40. System.out.println("[1] done");
    41. //手动应答
    42. channel.basicAck(envelope.getDeliveryTag(), false);
    43. }
    44. }
    45. };
    46. //设置手动应答
    47. boolean autoAck = false;
    48. //监听队列
    49. channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    50. }
    51. }
    52. /**
    53. * @Description:发布订阅模式消费者
    54. * @Author: xy丶
    55. */
    56. @Slf4j
    57. public class PublishSubscribeConsumer1 {
    58. //交换机名称
    59. private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
    60. //队列名称
    61. private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
    62. public static void main(String[] args) throws IOException, TimeoutException {
    63. //获取连接
    64. Connection connection = RabbitMQConnection.getConnection();
    65. //从连接中获取一个通道
    66. final Channel channel = connection.createChannel();
    67. //声明交换机(分发:发布/订阅模式)
    68. channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
    69. //声明队列
    70. channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
    71. //将队列绑定到交换机
    72. channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
    73. //保证一次只分发一个
    74. int prefetchCount = 1;
    75. channel.basicQos(prefetchCount);
    76. //定义消费者
    77. DefaultConsumer consumer = new DefaultConsumer(channel) {
    78. //当消息到达时执行回调方法
    79. @Override
    80. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    81. String message = new String(body, "utf-8");
    82. log.info("[PublishSubscribeConsumer1] Receive message:" + message);
    83. try {
    84. //消费者休息2s处理业务
    85. Thread.sleep(1000);
    86. }
    87. catch (InterruptedException e) {
    88. e.printStackTrace();
    89. }
    90. finally {
    91. System.out.println("[1] done");
    92. //手动应答
    93. channel.basicAck(envelope.getDeliveryTag(), false);
    94. }
    95. }
    96. };
    97. //设置手动应答
    98. boolean autoAck = false;
    99. //监听队列
    100. channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    101. }
    102. }
    103. /**
    104. * @Description:发布订阅模式消费者
    105. * @Author: xy丶
    106. */
    107. @Slf4j
    108. public class PublishSubscribeConsumer2 {
    109. //交换机名称
    110. private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
    111. //队列名称
    112. private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
    113. public static void main(String[] args) throws IOException, TimeoutException {
    114. //获取连接
    115. Connection connection = RabbitMQConnection.getConnection();
    116. //从连接中获取一个通道
    117. final Channel channel = connection.createChannel();
    118. //声明交换机(分发:发布/订阅模式)
    119. channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
    120. //声明队列
    121. channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
    122. //将队列绑定到交换机
    123. channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
    124. //保证一次只分发一个
    125. int prefetchCount = 1;
    126. channel.basicQos(prefetchCount);
    127. //定义消费者
    128. DefaultConsumer consumer = new DefaultConsumer(channel) {
    129. //当消息到达时执行回调方法
    130. @Override
    131. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    132. String message = new String(body, "utf-8");
    133. log.info("[PublishSubscribeConsumer2] Receive message:" + message);
    134. try {
    135. //消费者休息2s处理业务
    136. Thread.sleep(1000);
    137. }
    138. catch (InterruptedException e) {
    139. e.printStackTrace();
    140. }
    141. finally {
    142. System.out.println("[1] done");
    143. //手动应答
    144. channel.basicAck(envelope.getDeliveryTag(), false);
    145. }
    146. }
    147. };
    148. //设置手动应答
    149. boolean autoAck = false;
    150. //监听队列
    151. channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    152. }
    153. }

           (4).Ruoting(路由模式)

    消费者

    1. /**
    2. * 1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
    3. * 2)根据业务功能定义路由字符串
    4. * 3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;
    5. * 客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
    6. * @Description:发布订阅模式生产者
    7. * @Author: xy丶
    8. */
    9. @Slf4j
    10. public class Producer {
    11. //路由交换机名称
    12. static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    13. //队列名称1 发送
    14. static final String QUEUE_SEND = "queue_send";
    15. //队列名称2 接收
    16. static final String QUEUE_RECEIVE = "queue_receive";
    17. public static void main(String[] args) throws Exception {
    18. //创建连接
    19. Connection connection = RabbitMQConnection.getConnection();
    20. //创建频道
    21. Channel channel = connection.createChannel();
    22. /**
    23. * 声明交换机
    24. * 参数1:交换机名称
    25. * 参数2:交换机类型,fanout,toppic,direct,headers
    26. */
    27. channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE, "direct");
    28. /**
    29. * 声明(创建)队列
    30. * 参数1:队列名称
    31. * 参数2:是否定义持久化队列
    32. * 参数3:是否独占本次连接
    33. * 参数4:是否在不使用的时候自动删除队列
    34. * 参数5:队列其它参数
    35. */
    36. channel.queueDeclare(QUEUE_SEND,true,false,false,null);
    37. channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);
    38. /**
    39. * 队列绑定交换机
    40. * 参数1:队列名称
    41. * 参数2:是否定义持久化队列
    42. * 参数3:是否独占本次连接
    43. */
    44. channel.queueBind(QUEUE_SEND,ROUTING_DIRECT_EXCHANGE,"send");
    45. channel.queueBind(QUEUE_RECEIVE,ROUTING_DIRECT_EXCHANGE,"receive");
    46. //发送消息
    47. String message = "路由模式:routing key 为 send";
    48. /**
    49. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
    50. * 参数2:路由key,简单模式可以传递队列名称
    51. * 参数3:消息其它属性
    52. * 参数4:消息内容
    53. */
    54. channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"send",null,message.getBytes());
    55. log.info("已发送消息:"+message);
    56. //发送消息
    57. message = "路由模式:routing key 为 receive";
    58. /**
    59. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
    60. * 参数2:路由key,简单模式可以传递队列名称
    61. * 参数3:消息其它属性
    62. * 参数4:消息内容
    63. */
    64. channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"receive",null,message.getBytes());
    65. log.info("已发送消息:"+message);
    66. //关闭资源
    67. channel.close();
    68. connection.close();
    69. }
    70. }

    消费者

    1. /**
    2. *路由消费者
    3. */
    4. @Slf4j
    5. public class RoutingConsumer1 {
    6. //路由交换机名称
    7. static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    8. //队列名称1 发送
    9. static final String QUEUE_SEND = "queue_send";
    10. public static void main(String[] args) throws Exception {
    11. //创建连接
    12. Connection connection = RabbitMQConnection.getConnection();
    13. //创建通道(频道)
    14. Channel channel = connection.createChannel();
    15. //声明交换机
    16. channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
    17. /**
    18. * 声明(创建)队列
    19. * 参数1:队列名称
    20. * 参数2:是否定义持久化队列
    21. * 参数3:是否独占本次连接
    22. * 参数4:是否在不使用的时候自动删除队列
    23. * 参数5:队列其它参数
    24. */
    25. channel.queueDeclare(QUEUE_SEND,true,false,false,null);
    26. //队列绑定交换机
    27. channel.queueBind(QUEUE_SEND, ROUTING_DIRECT_EXCHANGE,"send");
    28. //创建消费这,并设置消息处理
    29. DefaultConsumer consumer = new DefaultConsumer(channel) {
    30. /**
    31. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
    32. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
    33. * 消息和重传标志(收到消息失败后是否需要重新发送)
    34. * properties 属性信息
    35. * body 消息
    36. */
    37. @Override
    38. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    39. //路由key
    40. log.info("路由key为:" + envelope.getRoutingKey());
    41. //交换机
    42. log.info("交换机为:" + envelope.getExchange());
    43. //消息id
    44. log.info("消息id为:" + envelope.getDeliveryTag());
    45. //收到的消息
    46. log.info("消费者1-接收到的消息为:" + new String(body, "utf8"));
    47. }
    48. };
    49. /**
    50. * 监听消息
    51. * 参数1:队列名称
    52. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
    53. * 参数3:消息接收到后回调
    54. */
    55. channel.basicConsume(QUEUE_SEND, true, consumer);
    56. }
    57. /**
    58. *路由消费者
    59. */
    60. @Slf4j
    61. public class RoutingConsumer2 {
    62. //路由交换机名称
    63. static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    64. //队列名称1 发送
    65. static final String QUEUE_RECEIVE = "queue_receive";
    66. public static void main(String[] args) throws Exception {
    67. //创建连接
    68. Connection connection = RabbitMQConnection.getConnection();
    69. //创建通道(频道)
    70. Channel channel = connection.createChannel();
    71. //声明交换机
    72. channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
    73. /**
    74. * 声明(创建)队列
    75. * 参数1:队列名称
    76. * 参数2:是否定义持久化队列
    77. * 参数3:是否独占本次连接
    78. * 参数4:是否在不使用的时候自动删除队列
    79. * 参数5:队列其它参数
    80. */
    81. channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);
    82. //队列绑定交换机
    83. channel.queueBind(QUEUE_RECEIVE, ROUTING_DIRECT_EXCHANGE,"receive");
    84. //创建消费这,并设置消息处理
    85. DefaultConsumer consumer = new DefaultConsumer(channel) {
    86. /**
    87. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
    88. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
    89. * 消息和重传标志(收到消息失败后是否需要重新发送)
    90. * properties 属性信息
    91. * body 消息
    92. */
    93. @Override
    94. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    95. //路由key
    96. log.info("路由key为:" + envelope.getRoutingKey());
    97. //交换机
    98. log.info("交换机为:" + envelope.getExchange());
    99. //消息id
    100. log.info("消息id为:" + envelope.getDeliveryTag());
    101. //收到的消息
    102. log.info("消费者2-接收到的消息为:" + new String(body, "utf8"));
    103. }
    104. };
    105. /**
    106. * 监听消息
    107. * 参数1:队列名称
    108. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
    109. * 参数3:消息接收到后回调
    110. */
    111. channel.basicConsume(QUEUE_RECEIVE, true, consumer);
    112. }
    113. }
           (5).Topics(主题模式/路由模式的一种)

    生产者

    1. /**
    2. * 跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系
    3. * 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;
    4. *
    5. *要求
    6. * Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。
    7. * 分隔符
    8. * “*(星号)”:可以代替一个单词
    9. * “#(井号)”:可以替代零个或多个单词
    10. * @Description:主题模式
    11. * @Author: xy丶
    12. */
    13. @Slf4j
    14. public class TopicProducer {
    15. public static final String TOPIC_EXCHANGE = "topic_exchange";
    16. public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
    17. public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
    18. public static void main(String[] args) throws Exception {
    19. //声明用作全局变量的队列变量和交换价变量
    20. //创建连接
    21. Connection connection = RabbitMQConnection.getConnection();
    22. //创建信道
    23. Channel channel = connection.createChannel();
    24. //声明队列
    25. channel.queueDeclare(TOPIC_QUEUE_ONE,true,false,false,null);
    26. channel.queueDeclare(TOPIC_QUEUE_TWO,true,false,false,null);
    27. //声明交换机
    28. channel.exchangeDeclare(TOPIC_EXCHANGE, "topic",true);
    29. //绑定队列
    30. channel.queueBind(TOPIC_QUEUE_ONE,TOPIC_EXCHANGE,"*.orange.*");
    31. channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"*.*.rabbit");
    32. channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"lazy.#");
    33. //发生消息
    34. for (int i = 0; i <10 ; i++) {
    35. String msg="goodnight!My love world===>"+i;
    36. channel.basicPublish(TOPIC_EXCHANGE,"ag.we.rabbit",null,msg.getBytes());
    37. }
    38. }
    39. }

    消费者

    1. @Slf4j
    2. public class TopicCustomer1 {
    3. public static final String TOPIC_QUEUE_ONE="topic_queue_one";
    4. public static void main(String[] args) throws Exception{
    5. //创建连接
    6. Connection connection = RabbitMQConnection.getConnection();
    7. //创建信道
    8. Channel channel = connection.createChannel();
    9. DefaultConsumer consumer = new DefaultConsumer(channel){
    10. @Override
    11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    12. log.info("TopicCustomer1=====>:"+new String(body));
    13. }
    14. };
    15. channel.basicConsume(TOPIC_QUEUE_ONE,true,consumer);
    16. }
    17. }
    18. @Slf4j
    19. public class TopicCustomer2 {
    20. public static final String TOPIC_QUEUE_TWO="topic_queue_two";
    21. public static void main(String[] args) throws Exception{
    22. //创建连接
    23. Connection connection = RabbitMQConnection.getConnection();
    24. //创建信道
    25. Channel channel = connection.createChannel();
    26. DefaultConsumer consumer = new DefaultConsumer(channel){
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. log.info("TopicCustomer22=====>:"+new String(body));
    30. }
    31. };
    32. channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
    33. }
    34. }
    35. @Slf4j
    36. public class TopicCustomer3 {
    37. public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
    38. public static void main(String[] args) throws Exception{
    39. //创建连接
    40. Connection connection = RabbitMQConnection.getConnection();
    41. //创建信道
    42. Channel channel = connection.createChannel();
    43. DefaultConsumer consumer = new DefaultConsumer(channel){
    44. @Override
    45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    46. log.info("TopicCustomer2=====>:"+new String(body));
    47. }
    48. };
    49. channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
    50. }
    51. }

           (6).RPC模式
  • 相关阅读:
    位运算(超详细)
    java 集合处理:
    【数据结构】手撕归并排序(含非递归)
    爬虫----js逆向某宝h5的sign参数
    Spring OAuth2 Resource Server 配置
    JMeter使用方法
    搞定面试官 - 可以介绍一下 MySQL InnoDB 引擎的索引模型嘛?
    Django笔记四十四之Nginx+uWSGI部署Django以及Nginx负载均衡操作
    操作系统权限提升(三十)之数据库提权-SQL Server sp_oacreate+sp_oamethod(dba权限)提权
    【Set】Set集合遍历与Set集合转数组(133)
  • 原文地址:https://blog.csdn.net/weixin_52466601/article/details/136674559