Message Queue(消息队列)简称MQ,是一种应用程序对应用程序的消息通信机制。在MQ中,消息以队列形式存储,以便于异步传输,在MQ中,发布者(生产者)将消息放入队列,而消费者从队列中读取并处理这些消息,这种设计允许生产者和消费者之间解耦,提高系统的响应速度和吞吐量,MQ常用于解耦系统之间的依赖关系,提高系统的稳定性和可扩展性,MQ还支持消峰,即以稳定的系统资源应对突发的流量冲剂,然而使用MQ也可能带来一些挑战,如:系统可用性降低、系统复杂度提高、以及消息一致性问题等;
目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等,也有直接使用Redis充当消息队列的案列,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ的产品特征等,综合考虑;

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是Erlang语言编写,而集群和故障转移是构建在开放电信平台框架上的。所有的主要变成语言均有与代理接口通讯的客户端库。
1.Simple Work Queue(简单工作队列):常见的一对一模式,一条消息由一个消费者进行消费。如果有多个消费者,默认是使用轮询的方式将消息分配消费者
2.Work Queues(工作队列模式):也叫公屏队列,能者多劳的消息队列模型。队列必须收到来自消费者的手动ACK(消息确认机制)才可以继续往消费者发送消息。
3.Publish/Subscribe(发布订阅模式):一条消息被多和消费者消费。
4.Ruoting(路由模式):有选择的接收消息。
5.Topics(主题模式):通过一定的规则来选择性的接收消息
6.RPC模式:发布者发布消息,并且通过RPC方式等待结果。

消息生产后将消息放入队列,消息的消费者(consumer)监听消息队列嘛,如果队列中有消息就消费掉,消息被消费后自动从消息队列中删除。(也可能出现异常)
- /**
- * @Description:获取RabbitMQ连接
- * @Author: xy丶
- */
- public class RabbitMQConnection {
- public final static String RABBITMQ_SERVER_HOST = "192.168.0.122";
- public final static int RABBITMQ_SERVER_PORT = 5672;
- public final static String VIRTUAL_HOST = "/XY_HOST";
-
- public static Connection getConnection() throws IOException, TimeoutException {
- //1、创建连接
- ConnectionFactory factory = new ConnectionFactory();
- //2、设置主机名
- factory.setHost(RABBITMQ_SERVER_HOST);
- //3、设置通讯端口,默认是5672,不专门设置也可以
- factory.setPort(RABBITMQ_SERVER_PORT);
- //4、设置账号和密码
- factory.setUsername("admin");
- factory.setPassword("admin");
- //4、设置Virtual Host
- factory.setVirtualHost(VIRTUAL_HOST);
- //5、创建连接
- return factory.newConnection();
- }
-
- }
消费者
- /**
- * @Description:简单工作队列模式消费者
- * @Author: xy丶
- */
- @Slf4j
- public class Consumer {
- private final static String SIMPLE_WORK_QUEUE= "simple_work_queue";
-
- public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
- //获取连接
- Connection connection = RabbitMQConnection.getConnection();
- //获取通道
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(SIMPLE_WORK_QUEUE, true, consumer);
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- log.info("Consumer reception "+message);
- }
- }
生产者
- /**
- * @Description:简单工作队列模式生产者
- * @Author: xy丶
- */
- @Slf4j
- public class Producer {
- private final static String SIMPLE_WORK_QUEUE = "simple_work_queue";
-
- /**
- * 简单工作队列模式
- * @param args
- * @throws IOException
- * @throws TimeoutException
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- // 声明队列 String var1, 是否持久化
- // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
- // boolean var3, 是否自动删除
- // boolean var4, 消费完删除
- // Map<String, Object> var5 其他属性
- channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
- // 消息内容 String var1, 是否持久化
- // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
- // boolean var3, 是否自动删除
- // boolean var4, 消费完删除
- // Map<String, Object> var5 其他属性
- String message = "Hello Word!!!";
- channel.basicPublish("", SIMPLE_WORK_QUEUE,null,message.getBytes());
- log.info("Producer send "+message);
- //最后关闭通关和连接
- channel.close();
- connection.close();
- }
生产者
- package com.puwang.MQ.workQueue;
-
- import com.puwang.MQ.config.RabbitMQConnection;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import lombok.extern.slf4j.Slf4j;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @Description:工作队列模式生产者
- * @Author: xy丶
- */
- @Slf4j
- public class Producer {
-
- private final static String QUEUE_WORK = "QUEUE_WORK";
-
- public static void main(String[] args) throws Exception {
- Connection connection = RabbitMQConnection.getConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_WORK, false, false, false, null);
- for(int i = 0; i < 20; i++){
- String message = "娃哈哈" + i;
- channel.basicPublish("", QUEUE_WORK, null, message.getBytes());
- System.out.println("send=============="+message);
- Thread.sleep(i*10);
- }
-
- channel.close();
- connection.close();
-
- }
- }
-
-
-
- @Slf4j
- public class WorkQueueConsumer1 {
- private final static String QUEUE_WORK = "QUEUE_WORK";
-
- /**
- * 结果:
- *
- * 1、一条消息只会被一个消费者接收;
- *
- * 2、rabbit采用轮询的方式将消息是平均发送给消费者的;
- *
- * 3、消费者在处理完某条消息后,才会收到下一条消息。
- * @param args
- * @throws IOException
- * @throws TimeoutException
- */
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- Connection connection = RabbitMQConnection.getConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_WORK, false,false, false,null);
- //同一时刻服务器只会发送一条消息给消费者
- channel.basicQos(1);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //关于手工确认 待之后有时间研究下
- channel.basicConsume(QUEUE_WORK, false, consumer);
-
- while(true){
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- log.info("[消费者1] Received1 '"+message+"'");
- Thread.sleep(10);
- //返回确认状态
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- }
-
- @Slf4j
- public class WorkQueueConsumer2 {
- private final static String QUEUE_WORK = "QUEUE_WORK";
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- Connection connection = RabbitMQConnection.getConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_WORK, false,false, false,null);
- //同一时刻服务器只会发送一条消息给消费者
- channel.basicQos(1);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //关于手工确认 待之后有时间研究下
- channel.basicConsume(QUEUE_WORK, false, consumer);
-
- while(true){
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- log.info("[消费者2] Received1 '"+message+"'");
- Thread.sleep(10);
- //返回确认状态
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- }
-
-
-
-
生产者
- /**
- * 订阅模式 生产者
- * 订阅模式:一个生产者发送的消息会被多个消费者获取。
- * 消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
- * 相关场景:邮件群发,群聊天,广播(广告)
- * @Description:发布订阅模式生产者
- * @Author: xy丶
- */
- @Slf4j
- public class Producer {
- private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
-
- /**
- * 交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
- * 相关场景:邮件群发,群聊天,广播(广告)
- * @param args
- */
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //获取连接
- Connection connection = RabbitMQConnection.getConnection();
- //从连接中获取一个通道
- Channel channel = connection.createChannel();
- 消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
- channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
- //发送消息
- for (int i = 0; i < 10; i++) {
- String message = "哇哈哈哈!!!"+i;
- log.info("send message:" + message);
- //发送消息
- channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE, "", null, message.getBytes("utf-8"));
- Thread.sleep(100 * i);
- }
- channel.close();
- connection.close();
- }
- }
消费者
- /**
- * @Description:发布订阅模式消费者
- * @Author: xy丶
- */
- @Slf4j
- public class PublishSubscribeConsumer1 {
- //交换机名称
- private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
-
- //队列名称
- private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- //获取连接
- Connection connection = RabbitMQConnection.getConnection();
- //从连接中获取一个通道
- final Channel channel = connection.createChannel();
- //声明交换机(分发:发布/订阅模式)
- channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
- //声明队列
- channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
- //将队列绑定到交换机
- channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
- //保证一次只分发一个
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
- //定义消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- //当消息到达时执行回调方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "utf-8");
- log.info("[PublishSubscribeConsumer1] Receive message:" + message);
- try {
- //消费者休息2s处理业务
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- finally {
- System.out.println("[1] done");
- //手动应答
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- //设置手动应答
- boolean autoAck = false;
- //监听队列
- channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
- }
- }
-
- /**
- * @Description:发布订阅模式消费者
- * @Author: xy丶
- */
- @Slf4j
- public class PublishSubscribeConsumer1 {
- //交换机名称
- private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
-
- //队列名称
- private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- //获取连接
- Connection connection = RabbitMQConnection.getConnection();
- //从连接中获取一个通道
- final Channel channel = connection.createChannel();
- //声明交换机(分发:发布/订阅模式)
- channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
- //声明队列
- channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
- //将队列绑定到交换机
- channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
- //保证一次只分发一个
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
- //定义消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- //当消息到达时执行回调方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "utf-8");
- log.info("[PublishSubscribeConsumer1] Receive message:" + message);
- try {
- //消费者休息2s处理业务
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- finally {
- System.out.println("[1] done");
- //手动应答
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- //设置手动应答
- boolean autoAck = false;
- //监听队列
- channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
- }
- }
-
-
-
- /**
- * @Description:发布订阅模式消费者
- * @Author: xy丶
- */
- @Slf4j
- public class PublishSubscribeConsumer2 {
- //交换机名称
- private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";
-
- //队列名称
- private static final String PUBLISH_SUBSCRIBE_QUEUE = "publish_subscribe_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- //获取连接
- Connection connection = RabbitMQConnection.getConnection();
- //从连接中获取一个通道
- final Channel channel = connection.createChannel();
- //声明交换机(分发:发布/订阅模式)
- channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
- //声明队列
- channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
- //将队列绑定到交换机
- channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
- //保证一次只分发一个
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
- //定义消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- //当消息到达时执行回调方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "utf-8");
- log.info("[PublishSubscribeConsumer2] Receive message:" + message);
- try {
- //消费者休息2s处理业务
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- finally {
- System.out.println("[1] done");
- //手动应答
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- //设置手动应答
- boolean autoAck = false;
- //监听队列
- channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
- }
- }
-
-
-
-
-
-
消费者
- /**
- * 1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
- * 2)根据业务功能定义路由字符串
- * 3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;
- * 客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
- * @Description:发布订阅模式生产者
- * @Author: xy丶
- */
- @Slf4j
- public class Producer {
- //路由交换机名称
- static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
- //队列名称1 发送
- static final String QUEUE_SEND = "queue_send";
- //队列名称2 接收
- static final String QUEUE_RECEIVE = "queue_receive";
-
- public static void main(String[] args) throws Exception {
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建频道
- Channel channel = connection.createChannel();
- /**
- * 声明交换机
- * 参数1:交换机名称
- * 参数2:交换机类型,fanout,toppic,direct,headers
- */
- channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE, "direct");
- /**
- * 声明(创建)队列
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_SEND,true,false,false,null);
- channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);
- /**
- * 队列绑定交换机
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- */
- channel.queueBind(QUEUE_SEND,ROUTING_DIRECT_EXCHANGE,"send");
- channel.queueBind(QUEUE_RECEIVE,ROUTING_DIRECT_EXCHANGE,"receive");
-
- //发送消息
- String message = "路由模式:routing key 为 send";
- /**
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"send",null,message.getBytes());
- log.info("已发送消息:"+message);
-
- //发送消息
- message = "路由模式:routing key 为 receive";
- /**
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"receive",null,message.getBytes());
- log.info("已发送消息:"+message);
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
-
消费者
-
- /**
- *路由消费者
- */
- @Slf4j
- public class RoutingConsumer1 {
-
- //路由交换机名称
- static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
- //队列名称1 发送
- static final String QUEUE_SEND = "queue_send";
-
- public static void main(String[] args) throws Exception {
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建通道(频道)
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
- /**
- * 声明(创建)队列
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_SEND,true,false,false,null);
-
- //队列绑定交换机
- channel.queueBind(QUEUE_SEND, ROUTING_DIRECT_EXCHANGE,"send");
-
- //创建消费这,并设置消息处理
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * consumerTag 消息者标签,在channel.basicConsume时候可以指定
- * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
- * 消息和重传标志(收到消息失败后是否需要重新发送)
- * properties 属性信息
- * body 消息
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //路由key
- log.info("路由key为:" + envelope.getRoutingKey());
- //交换机
- log.info("交换机为:" + envelope.getExchange());
- //消息id
- log.info("消息id为:" + envelope.getDeliveryTag());
- //收到的消息
- log.info("消费者1-接收到的消息为:" + new String(body, "utf8"));
- }
- };
-
- /**
- * 监听消息
- * 参数1:队列名称
- * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
- * 参数3:消息接收到后回调
- */
- channel.basicConsume(QUEUE_SEND, true, consumer);
- }
-
-
- /**
- *路由消费者
- */
- @Slf4j
- public class RoutingConsumer2 {
-
- //路由交换机名称
- static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
- //队列名称1 发送
- static final String QUEUE_RECEIVE = "queue_receive";
-
- public static void main(String[] args) throws Exception {
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建通道(频道)
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
- /**
- * 声明(创建)队列
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);
-
- //队列绑定交换机
- channel.queueBind(QUEUE_RECEIVE, ROUTING_DIRECT_EXCHANGE,"receive");
-
- //创建消费这,并设置消息处理
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * consumerTag 消息者标签,在channel.basicConsume时候可以指定
- * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
- * 消息和重传标志(收到消息失败后是否需要重新发送)
- * properties 属性信息
- * body 消息
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //路由key
- log.info("路由key为:" + envelope.getRoutingKey());
- //交换机
- log.info("交换机为:" + envelope.getExchange());
- //消息id
- log.info("消息id为:" + envelope.getDeliveryTag());
- //收到的消息
- log.info("消费者2-接收到的消息为:" + new String(body, "utf8"));
- }
- };
-
- /**
- * 监听消息
- * 参数1:队列名称
- * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
- * 参数3:消息接收到后回调
- */
- channel.basicConsume(QUEUE_RECEIVE, true, consumer);
- }
- }
生产者
- /**
- * 跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系
- * 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;
- *
- *要求
- * Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。
- * 分隔符
- * “*(星号)”:可以代替一个单词
- * “#(井号)”:可以替代零个或多个单词
- * @Description:主题模式
- * @Author: xy丶
- */
- @Slf4j
- public class TopicProducer {
-
- public static final String TOPIC_EXCHANGE = "topic_exchange";
- public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
- public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
-
- public static void main(String[] args) throws Exception {
- //声明用作全局变量的队列变量和交换价变量
-
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(TOPIC_QUEUE_ONE,true,false,false,null);
- channel.queueDeclare(TOPIC_QUEUE_TWO,true,false,false,null);
- //声明交换机
- channel.exchangeDeclare(TOPIC_EXCHANGE, "topic",true);
- //绑定队列
- channel.queueBind(TOPIC_QUEUE_ONE,TOPIC_EXCHANGE,"*.orange.*");
- channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"*.*.rabbit");
- channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"lazy.#");
- //发生消息
- for (int i = 0; i <10 ; i++) {
- String msg="goodnight!My love world===>"+i;
- channel.basicPublish(TOPIC_EXCHANGE,"ag.we.rabbit",null,msg.getBytes());
- }
- }
- }
消费者
- @Slf4j
- public class TopicCustomer1 {
-
- public static final String TOPIC_QUEUE_ONE="topic_queue_one";
-
- public static void main(String[] args) throws Exception{
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建信道
- Channel channel = connection.createChannel();
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- log.info("TopicCustomer1=====>:"+new String(body));
- }
- };
- channel.basicConsume(TOPIC_QUEUE_ONE,true,consumer);
- }
- }
-
-
- @Slf4j
- public class TopicCustomer2 {
- public static final String TOPIC_QUEUE_TWO="topic_queue_two";
-
- public static void main(String[] args) throws Exception{
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建信道
- Channel channel = connection.createChannel();
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- log.info("TopicCustomer22=====>:"+new String(body));
- }
- };
- channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
- }
- }
-
-
- @Slf4j
- public class TopicCustomer3 {
- public static final String TOPIC_QUEUE_TWO = "topic_queue_two";
-
- public static void main(String[] args) throws Exception{
- //创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //创建信道
- Channel channel = connection.createChannel();
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- log.info("TopicCustomer2=====>:"+new String(body));
- }
- };
- channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
- }
- }
-
-
-