项目中,作为生产者自定义消息发送到RabbitMq。
1.引入rmq依赖
- <!-- rabbitmq 依赖 -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.9.0</version>
- </dependency>
2.创建链接、断开连接工具类。
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * RabbitMq 工具类
- *
- * @author Klay
- * @date 2023/6/25
- */
- public class RabbitmqUtils {
- private static Channel channel = null;
- private static Connection connection = null;
-
- /**
- * 获取连接
- *
- * @author Klay
- * @date 2023/6/25 10:37
- */
- public static Channel getChannel() {
- //定义连接池
- ConnectionFactory factory = new ConnectionFactory();
- //设置主机地址
- factory.setHost("127.0.0.1");
- //设置端口
- factory.setPort(5672);
- //设置用户名
- factory.setUsername("guest");
- //密码
- factory.setPassword("guest");
- //虚拟机路径
- factory.setVirtualHost("/");
- try {
- connection = factory.newConnection();
- //创建信道
- channel = connection.createChannel();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- return channel;
- }
-
- /**
- * 关闭连接
- *
- * @author Klay
- * @date 2023/6/25 10:37
- */
- public static void closeConnection() {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
-
- }
- }
3.发送消息。大部分业务应用场景中,只需要发送消息到指定的交换机(exchange)中。如果业务需要创建交换机,则将注释的代码打开,创建交换机、队列,并绑定。发送消息时,将对应的交换机、路由进行替换即可。
-
- import com.hikvision.ardatatormq.utils.RabbitmqUtils;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import java.io.IOException;
-
- /**
- * @author Klay
- * @date 2023/10/16 016
- */
- @Slf4j
- public class SendMessageTest {
- public static void main(String[] args){
- //利用写好工具类获取信道连接
- Channel channel = RabbitmqUtils.getChannel();
- try {
- /**
- *创建一个交换机
- *1.交换机名称
- *2.交换机类型有fanout,direct,topic,headers
- *3.是否持久化
- *4.设置是自动删除,当没有队列与当前交换机绑定时自动删除
- *5.设置是否内置,表示内置的交换机
- *6.设置其他的一些结构化参数
- */
- // channel.exchangeDeclare("text_pubsub", BuiltinExchangeType.FANOUT, false, false, false, null);
- /**
- *1.队列名称
- *2.是否持久化,持久化会存盘,重启也还存在
- *3.exclusive 是否排他如果一个队列被声明为排他的队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除这里需要注意三点:排他的队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其它连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
- *4.是否自动删除至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
- *5.设置队列的其他配置参数
- */
- // channel.queueDeclare("pubsub_queue1", false, false, false, null);
- /**
- *将交换机与队列绑定
- *1.队列名称
- *2.交换机名称
- *3.routerkey(路由key)
- *4.其他的绑定参数
- */
- // channel.queueBind("pubsub_queue1", "text_pubsub", "routingKeyTest");
-
- /**
- *发送消息
- *1.交换机名称
- *2.routerkey路由key,目前没有指定双引号即可
- *3.无额外配置写null
- *4.消息体
- */
- String msg = "发布订阅模式!!!";
- channel.basicPublish("amq.topic", "routingKeyTest", null, msg.getBytes());
- log.info("消息发送成功!:{}", msg);
- } catch (IOException e) {
- log.error("发送消息IOException:{}", e);
- } finally {
- //关闭连接
- RabbitmqUtils.closeConnection();
- }
-
- }
- }
4.进行测试。
4.1为交换机amq.topic创建一个测试队列并绑定。


4.2发送消息
