• rabbitMQ:绑定Exchange发送和接收消息(fanout)


    编写fanout消息接收类

    由于fanout是广播机制,所以需要先有接收方接收,发送方才不会丢失数据

    1.编写接收类Receive1

    1. package com.it.rabbitmq.fanout;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receive1 {
    6. public static void main(String[] args) {
    7. //创建链接工厂对象
    8. ConnectionFactory factory=new ConnectionFactory();
    9. factory.setUsername("root");
    10. factory.setPassword("123456");
    11. factory.setHost("192.168.174.129");
    12. factory.setPort(5672);
    13. Connection connection=null;//定义链接对象
    14. Channel channel=null;//定义通道对象
    15. try {
    16. connection=factory.newConnection();//实例化链接对象
    17. channel=connection.createChannel();//实例化通道对象
    18. /**
    19. * 由于fanout类型的交换机的消息是类似于广播的模式,它不需要绑定RoutingKey
    20. * 而又有可能会有很多个消费来接收这个交换机中的数据
    21. * 因此我们创建队列时,要创建一个随机的队列名称
    22. * 没有参数的方法会创建一个名称随机的队列
    23. * 这个队列的数据是非持久化的,是排外的,自动删除的
    24. */
    25. String name=channel.queueDeclare().getQueue();
    26. channel.exchangeDeclare("fanoutExchange", "fanout", true);
    27. channel.queueBind(name,"fanoutExchange","");
    28. channel.basicConsume(name,true, "",new DefaultConsumer(channel) {
    29. public void handleDelivery(String consumerTag,
    30. Envelope envelope,
    31. AMQP.BasicProperties properties,
    32. byte[] body) throws IOException {
    33. //获取消息数据
    34. String bodyStr = new String(body);
    35. System.out.println("消费者1:"+bodyStr);
    36. }
    37. });
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. } catch (TimeoutException e) {
    41. e.printStackTrace();
    42. }
    43. }
    44. }

    编写接收类Receive2

    1. package com.it.rabbitmq.fanout;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receive2 {
    6. public static void main(String[] args) {
    7. //创建链接工厂对象
    8. ConnectionFactory factory=new ConnectionFactory();
    9. factory.setUsername("root");
    10. factory.setPassword("123456");
    11. factory.setHost("192.168.174.129");
    12. factory.setPort(5672);
    13. Connection connection=null;//定义链接对象
    14. Channel channel=null;//定义通道对象
    15. try {
    16. connection=factory.newConnection();//实例化链接对象
    17. channel=connection.createChannel();//实例化通道对象
    18. /**
    19. * 由于fanout类型的交换机的消息是类似于广播的模式,它不需要绑定RoutingKey
    20. * 而又有可能会有很多个消费来接收这个交换机中的数据
    21. * 因此我们创建队列时,要创建一个随机的队列名称
    22. * 没有参数的方法会创建一个名称随机的队列
    23. * 这个队列的数据是非持久化的,是排外的,自动删除的
    24. */
    25. String name=channel.queueDeclare().getQueue();
    26. channel.exchangeDeclare("fanoutExchange", "fanout", true);
    27. channel.queueBind(name,"fanoutExchange","");
    28. channel.basicConsume(name,true, "",new DefaultConsumer(channel) {
    29. public void handleDelivery(String consumerTag,
    30. Envelope envelope,
    31. AMQP.BasicProperties properties,
    32. byte[] body) throws IOException {
    33. //获取消息数据
    34. String bodyStr = new String(body);
    35. System.out.println("消费者2:"+bodyStr);
    36. }
    37. });
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. } catch (TimeoutException e) {
    41. e.printStackTrace();
    42. }
    43. }
    44. }

    2.分别启动这两个接收类

     

    3.进入控制台查看交换机和队列

     

     

     

    注意:

    1、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。

    2、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchang即可让所有接收者同时接收同一个消息是一种广播的消息机制

    编写fanout消息发送类

    1.发送类

    1. package com.it.rabbitmq.exchange.fanout;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class Send {
    8. public static void main(String[] args) {
    9. //创建连接工厂
    10. ConnectionFactory factory=new ConnectionFactory();
    11. //配置rabbitMQ的连接信息
    12. factory.setHost("192.168.174.129");
    13. factory.setPort(5672);
    14. factory.setUsername("root");
    15. factory.setPassword("123456");
    16. //定义连接
    17. Connection connection=null;
    18. //定义通道
    19. Channel channel=null;
    20. try {
    21. connection=factory.newConnection();
    22. channel=connection.createChannel();
    23. /**
    24. * 由于使用fanout类型的交换机,因此消息的接收方可能会有多个
    25. * 因此不建议在消息发送时来创建队列,以及绑定交换机
    26. * 但是发送消息时,至少保证交换机是存在的
    27. */
    28. // channel.queueDeclare("myDirectQueue",true,false,false,null);
    29. channel.exchangeDeclare("fanoutExchange","fanout",true);
    30. // channel.queueBind("myDirectQueue","directExchange","directRoutingKey");
    31. String message="fanout的测试消息!";
    32. channel.basicPublish("fanoutExchange","",null,message.getBytes("utf-8"));
    33. System.out.println("消息发送成功,direct");
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. } catch (TimeoutException e) {
    37. e.printStackTrace();
    38. }finally {
    39. if (channel!=null){
    40. try {
    41. channel.close();
    42. } catch (IOException e) {
    43. e.printStackTrace();
    44. } catch (TimeoutException e) {
    45. e.printStackTrace();
    46. }
    47. }
    48. if (connection!=null){
    49. try {
    50. connection.close();
    51. } catch (IOException e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. }
    56. }
    57. }

    2.测试发送

    3.查看两个接收类的情况

    注意:

    fanout模式的消息需要将一个消息同时绑定到多个队列中因此这里不能创建并指定某个队列

  • 相关阅读:
    会员营销中,数字会员模式如何打造差异化会员服务
    百望云携手华为发布金融信创与数电乐企联合方案 创新金融合规变革
    Flux脚本语言基础使用-数据类型(InFluxDB 查询语言)
    spring复习(第二天下午)(黑马版)
    df = pd.read_xxx(“xxx“, dtype=xxx)dtype问题
    手写简单版SpringMVC
    PanNet: A deep network architecture for pan-sharpening(ICCV 2017)
    百度地图API-初步使用(2)
    迄今为止算是汇总了全网高级Java岗位面试题(附答案)建议收藏
    7.代理模式
  • 原文地址:https://blog.csdn.net/weixin_59334478/article/details/127721763