• JAVA发送消息到RabbitMq


    项目中,作为生产者自定义消息发送到RabbitMq

    1.引入rmq依赖

    1. <!-- rabbitmq 依赖 -->
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.9.0</version>
    6. </dependency>

    2.创建链接、断开连接工具类。

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. /**
    7. * RabbitMq 工具类
    8. *
    9. * @author Klay
    10. * @date 2023/6/25
    11. */
    12. public class RabbitmqUtils {
    13. private static Channel channel = null;
    14. private static Connection connection = null;
    15. /**
    16. * 获取连接
    17. *
    18. * @author Klay
    19. * @date 2023/6/25 10:37
    20. */
    21. public static Channel getChannel() {
    22. //定义连接池
    23. ConnectionFactory factory = new ConnectionFactory();
    24. //设置主机地址
    25. factory.setHost("127.0.0.1");
    26. //设置端口
    27. factory.setPort(5672);
    28. //设置用户名
    29. factory.setUsername("guest");
    30. //密码
    31. factory.setPassword("guest");
    32. //虚拟机路径
    33. factory.setVirtualHost("/");
    34. try {
    35. connection = factory.newConnection();
    36. //创建信道
    37. channel = connection.createChannel();
    38. } catch (IOException e) {
    39. e.printStackTrace();
    40. } catch (TimeoutException e) {
    41. e.printStackTrace();
    42. }
    43. return channel;
    44. }
    45. /**
    46. * 关闭连接
    47. *
    48. * @author Klay
    49. * @date 2023/6/25 10:37
    50. */
    51. public static void closeConnection() {
    52. try {
    53. channel.close();
    54. connection.close();
    55. } catch (IOException e) {
    56. e.printStackTrace();
    57. } catch (TimeoutException e) {
    58. e.printStackTrace();
    59. }
    60. }
    61. }

    3.发送消息。大部分业务应用场景中,只需要发送消息到指定的交换机(exchange)中。如果业务需要创建交换机,则将注释的代码打开,创建交换机、队列,并绑定。发送消息时,将对应的交换机、路由进行替换即可。

    1. import com.hikvision.ardatatormq.utils.RabbitmqUtils;
    2. import com.rabbitmq.client.Channel;
    3. import lombok.extern.slf4j.Slf4j;
    4. import java.io.IOException;
    5. /**
    6. * @author Klay
    7. * @date 2023/10/16 016
    8. */
    9. @Slf4j
    10. public class SendMessageTest {
    11. public static void main(String[] args){
    12. //利用写好工具类获取信道连接
    13. Channel channel = RabbitmqUtils.getChannel();
    14. try {
    15. /**
    16. *创建一个交换机
    17. *1.交换机名称
    18. *2.交换机类型有fanout,direct,topic,headers
    19. *3.是否持久化
    20. *4.设置是自动删除,当没有队列与当前交换机绑定时自动删除
    21. *5.设置是否内置,表示内置的交换机
    22. *6.设置其他的一些结构化参数
    23. */
    24. // channel.exchangeDeclare("text_pubsub", BuiltinExchangeType.FANOUT, false, false, false, null);
    25. /**
    26. *1.队列名称
    27. *2.是否持久化,持久化会存盘,重启也还存在
    28. *3.exclusive 是否排他如果一个队列被声明为排他的队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除这里需要注意三点:排他的队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其它连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
    29. *4.是否自动删除至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
    30. *5.设置队列的其他配置参数
    31. */
    32. // channel.queueDeclare("pubsub_queue1", false, false, false, null);
    33. /**
    34. *将交换机与队列绑定
    35. *1.队列名称
    36. *2.交换机名称
    37. *3.routerkey(路由key
    38. *4.其他的绑定参数
    39. */
    40. // channel.queueBind("pubsub_queue1", "text_pubsub", "routingKeyTest");
    41. /**
    42. *发送消息
    43. *1.交换机名称
    44. *2.routerkey路由key,目前没有指定双引号即可
    45. *3.无额外配置写null
    46. *4.消息体
    47. */
    48. String msg = "发布订阅模式!!!";
    49. channel.basicPublish("amq.topic", "routingKeyTest", null, msg.getBytes());
    50. log.info("消息发送成功!:{}", msg);
    51. } catch (IOException e) {
    52. log.error("发送消息IOException:{}", e);
    53. } finally {
    54. //关闭连接
    55. RabbitmqUtils.closeConnection();
    56. }
    57. }
    58. }

    4.进行测试。

            4.1为交换机amq.topic创建一个测试队列并绑定。

         4.2发送消息

  • 相关阅读:
    「PAT乙级真题解析」Basic Level 1086 就不告诉你 (问题分析+完整步骤+伪代码描述+提交通过代码)
    基于CNTK/C#实现逻辑回归【附源码】
    效率提升的好物分享
    一个奇葩的线上问题,导致我排查了一天
    python基础及网络爬虫
    Java性能优化的过程方法与求职面经总结
    基于JAVA爱馨敬老院网站计算机毕业设计源码+系统+lw文档+部署
    Chrome代码分析(二)——EscapeAnalysisPhase
    【Ambari】Python调用Rest API 获取集群状态信息并发送钉钉告警
    uni-app 使用 scss 实现推荐标签区域显示效果
  • 原文地址:https://blog.csdn.net/weixin_40052298/article/details/133851433