• rabbitMQ:绑定Exchange发送和接收消息(fanout)


    编写fanout消息接收类

    由于fanout是广播机制,所以需要先有接收方接收,发送方才不会丢失数据

    1.编写接收类Receive1

    1. package com.it.rabbitmq.fanout;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receive1 {
    6. public static void main(String[] args) {
    7. //创建链接工厂对象
    8. ConnectionFactory factory=new ConnectionFactory();
    9. factory.setUsername("root");
    10. factory.setPassword("123456");
    11. factory.setHost("192.168.174.129");
    12. factory.setPort(5672);
    13. Connection connection=null;//定义链接对象
    14. Channel channel=null;//定义通道对象
    15. try {
    16. connection=factory.newConnection();//实例化链接对象
    17. channel=connection.createChannel();//实例化通道对象
    18. /**
    19. * 由于fanout类型的交换机的消息是类似于广播的模式,它不需要绑定RoutingKey
    20. * 而又有可能会有很多个消费来接收这个交换机中的数据
    21. * 因此我们创建队列时,要创建一个随机的队列名称
    22. * 没有参数的方法会创建一个名称随机的队列
    23. * 这个队列的数据是非持久化的,是排外的,自动删除的
    24. */
    25. String name=channel.queueDeclare().getQueue();
    26. channel.exchangeDeclare("fanoutExchange", "fanout", true);
    27. channel.queueBind(name,"fanoutExchange","");
    28. channel.basicConsume(name,true, "",new DefaultConsumer(channel) {
    29. public void handleDelivery(String consumerTag,
    30. Envelope envelope,
    31. AMQP.BasicProperties properties,
    32. byte[] body) throws IOException {
    33. //获取消息数据
    34. String bodyStr = new String(body);
    35. System.out.println("消费者1:"+bodyStr);
    36. }
    37. });
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. } catch (TimeoutException e) {
    41. e.printStackTrace();
    42. }
    43. }
    44. }

    编写接收类Receive2

    1. package com.it.rabbitmq.fanout;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receive2 {
    6. public static void main(String[] args) {
    7. //创建链接工厂对象
    8. ConnectionFactory factory=new ConnectionFactory();
    9. factory.setUsername("root");
    10. factory.setPassword("123456");
    11. factory.setHost("192.168.174.129");
    12. factory.setPort(5672);
    13. Connection connection=null;//定义链接对象
    14. Channel channel=null;//定义通道对象
    15. try {
    16. connection=factory.newConnection();//实例化链接对象
    17. channel=connection.createChannel();//实例化通道对象
    18. /**
    19. * 由于fanout类型的交换机的消息是类似于广播的模式,它不需要绑定RoutingKey
    20. * 而又有可能会有很多个消费来接收这个交换机中的数据
    21. * 因此我们创建队列时,要创建一个随机的队列名称
    22. * 没有参数的方法会创建一个名称随机的队列
    23. * 这个队列的数据是非持久化的,是排外的,自动删除的
    24. */
    25. String name=channel.queueDeclare().getQueue();
    26. channel.exchangeDeclare("fanoutExchange", "fanout", true);
    27. channel.queueBind(name,"fanoutExchange","");
    28. channel.basicConsume(name,true, "",new DefaultConsumer(channel) {
    29. public void handleDelivery(String consumerTag,
    30. Envelope envelope,
    31. AMQP.BasicProperties properties,
    32. byte[] body) throws IOException {
    33. //获取消息数据
    34. String bodyStr = new String(body);
    35. System.out.println("消费者2:"+bodyStr);
    36. }
    37. });
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. } catch (TimeoutException e) {
    41. e.printStackTrace();
    42. }
    43. }
    44. }

    2.分别启动这两个接收类

     

    3.进入控制台查看交换机和队列

     

     

     

    注意:

    1、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。

    2、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchang即可让所有接收者同时接收同一个消息是一种广播的消息机制

    编写fanout消息发送类

    1.发送类

    1. package com.it.rabbitmq.exchange.fanout;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class Send {
    8. public static void main(String[] args) {
    9. //创建连接工厂
    10. ConnectionFactory factory=new ConnectionFactory();
    11. //配置rabbitMQ的连接信息
    12. factory.setHost("192.168.174.129");
    13. factory.setPort(5672);
    14. factory.setUsername("root");
    15. factory.setPassword("123456");
    16. //定义连接
    17. Connection connection=null;
    18. //定义通道
    19. Channel channel=null;
    20. try {
    21. connection=factory.newConnection();
    22. channel=connection.createChannel();
    23. /**
    24. * 由于使用fanout类型的交换机,因此消息的接收方可能会有多个
    25. * 因此不建议在消息发送时来创建队列,以及绑定交换机
    26. * 但是发送消息时,至少保证交换机是存在的
    27. */
    28. // channel.queueDeclare("myDirectQueue",true,false,false,null);
    29. channel.exchangeDeclare("fanoutExchange","fanout",true);
    30. // channel.queueBind("myDirectQueue","directExchange","directRoutingKey");
    31. String message="fanout的测试消息!";
    32. channel.basicPublish("fanoutExchange","",null,message.getBytes("utf-8"));
    33. System.out.println("消息发送成功,direct");
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. } catch (TimeoutException e) {
    37. e.printStackTrace();
    38. }finally {
    39. if (channel!=null){
    40. try {
    41. channel.close();
    42. } catch (IOException e) {
    43. e.printStackTrace();
    44. } catch (TimeoutException e) {
    45. e.printStackTrace();
    46. }
    47. }
    48. if (connection!=null){
    49. try {
    50. connection.close();
    51. } catch (IOException e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. }
    56. }
    57. }

    2.测试发送

    3.查看两个接收类的情况

    注意:

    fanout模式的消息需要将一个消息同时绑定到多个队列中因此这里不能创建并指定某个队列

  • 相关阅读:
    Flutter 处理异步操作并根据异步操作状态动态构建界面的方法FutureBuilder
    因果系列文章(9)——反事实(下)
    常见荧光染料修饰多种基团及其激发和发射波长数据一览数据
    32.同步FIFO-IP核的调用
    微服务负载均衡器LoadBalancer实战
    vue2中怎么使用css样式实现给元素穿衣的功能
    CDN加速怎么实现缓存Range请求
    如何将拥抱脸模型用于 NLP、音频分类和计算机视觉
    element show-overflow-tooltip 复制
    【cfeng-work】架构演进和漫谈
  • 原文地址:https://blog.csdn.net/weixin_59334478/article/details/127721763