目录
前面mq服务器的安装就相当于交换机与队列已经安装好了,接下来用代码的方式来编写两个程序,分别作为生产者与消费者,实现消息的通信。
- <!--指定 jdk 编译版本-->
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <!--rabbitmq 依赖客户端-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.8.0</version>
- </dependency>
- <!--操作文件流的一个依赖-->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.6</version>
- </dependency>
- </dependencies>
消息生产者端代码:
- package com.areio.rabbitmq.one;
-
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- //生产者:发消息到队列
- public class Producer {
-
- //转为大写字母的快捷键ctr+shift+u
- public static final String QUEUE_NAME="hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //2、设置工厂ip,即rabbitmq服务所在的地址
- factory.setHost("192.168.30.100");
- //3、设置rabbitmq的用户名
- factory.setUsername("admin");
- //4、设置rabbitmq的密码
- factory.setPassword("123");
- //5、创建连接
- Connection connection = factory.newConnection();
- //6、获取连接中的信道
- Channel channel = connection.createChannel();
- //7、创建队列(使用默认交换机)
- //队列名称、队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)、队列是否可供多个消费者消费、最后一个消费者端开链接后是否自动删除、其他参数
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- String message = "hello world";
- //8、发消息
- //发送到哪个交换机、路由的key(队列名)、其他参数、消息体
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println("消息发送完毕");
- }
- }
消息消费者端代码:
- package com.areio.rabbitmq.one;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
-
- public static final String QUEUE_NAME="hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //2、设置工厂ip,即rabbitmq服务所在的地址
- factory.setHost("192.168.30.100");
- //3、设置rabbitmq的用户名
- factory.setUsername("admin");
- //4、设置rabbitmq的密码
- factory.setPassword("123");
- //5、创建连接
- Connection connection = factory.newConnection();
- //6、获取连接中的信道
- Channel channel = connection.createChannel();
- //7、创建队列(使用默认交换机)
- //队列名称、队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)、队列是否可供多个消费者消费、最后一个消费者端开链接后是否自动删除、其他参数
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- //8、接受消息
- DeliverCallback deliverCallback = (consumerTag,message)->{
- System.out.println(new String(message.getBody()));
- };
- CancelCallback cancelCallback = consumerTag->{
- System.out.println("消费被中断");
- };
- //消费哪个队列、消费成功后是否自动应答、消费失败时的回调、消费者取消消费时回调
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
- System.out.println("消息接收完毕");
- }
- }
注意:即使有多个工作线程,一个消息也只会被处理一次,因此一个消息被某个工作线程接收了,其他线程就不会收到该消息了(线程之间是竞争关系)
消息生产者端代码:
- package com.areio.rabbitmq.workqueue;
-
- import com.areio.rabbitmq.utils.RabbitMqUtils;
- import com.rabbitmq.client.Channel;
-
- import java.util.Scanner;
-
- public class Producer {
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String mesg = scanner.next();
- channel.basicPublish("",QUEUE_NAME,null,mesg.getBytes());
- System.out.println("发送消息完成:"+mesg);
- }
- }
- }
消息消费者端代码:
- package com.areio.rabbitmq.workqueue;
-
- import com.areio.rabbitmq.utils.RabbitMqUtils;
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- //这是一个工作线程,即消费者
- public class Worker01 {
-
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
-
- DeliverCallback deliverCallback = (consumerTag, message)->{
- System.out.println("接收到的消息:"+new String(message.getBody()));
- };
- CancelCallback cancelCallback = consumerTag->{
- System.out.println(consumerTag+"消费者取消消费接口时回调此函数");
- };
- System.out.println("C2等待接收消息......");
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
-
- }
-
- }
此时如果我们运行两个工作线程,首先运行Worker01这个类,要再运行一次,其操作如下:
接着运行消息生产者,从控制台分别输入发送的消息AA,BB,CC,DD,此时两个工作线程轮询接收到消息:
比如有五个消息过来,第一个消息被线程1拿了,接下来第二消息就被线程2拿,第三个消息被线程3拿,一个线程分配一个消息。
①自动应答
②手动应答
multiple 的 true 和 false 代表不同意思true 代表批量应答 channel 上未应答的消息比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答(不推荐批量应答,因为8处理完成了不代表其他也完成了)false 同上面相比,只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
生产者代码:
- public class ManualAckPro01 {
- public static final String QUEUE_NAME="ack_queue";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- /**
- * 队列名称
- * 队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)
- * 队列是否可供多个消费者消费
- * 最后一个消费者端开链接后是否自动删除
- * 其他参数
- */
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String mesg = scanner.next();
- channel.basicPublish("",QUEUE_NAME,null,mesg.getBytes("UTF-8"));
- System.out.println("发送消息完成:"+mesg);
- }
- }
- }
消费者1代码:
- /*
- 自动应答:收到消息后不管任务是否处理完成都会立马应答并在队列中将消息删除
- 手动应答:任务处理完成才应答
- 实验目标:消息在手动应答模式下是不会丢失的,因为一旦消息在某个消费者处处理失败,它会将这个消息重新放回队列中,给其他队列消费,处理完成后,消费者才给出应答,此时消息才会被删除
- 异同:
- 自动应答 消费者收到消息后立马应答-->应答后删除消息(未处理完成的消息被删除则丢失消息)
- 立马应答 消费者处理完成后才会应答-->应答后删除消息(消息已处理完成,不丢失)
- */
- public class ManualAckCon01 {
-
- public static final String QUEUE_NAME="ack_queue";
-
- public static void main(String[] args) throws Exception {
-
- Channel channel = RabbitMqUtils.getChannel();
- System.out.println("C1等待接收消息处理时间较短");
-
- //采用手动应答
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME,autoAck,(consumerTag,message)->{
- SleepUtils.sleep(1);
- System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
- /**
- * 手动应答
- *
- * 1、消息的标记
- * 2、是否批量应答
- * 3、
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- },(consumerTag)->{
- System.out.println(consumerTag+":消费者取消消费时接口回调");
- });
- }
- }
消费者2代码:
- /*
- 自动应答:收到消息后不管任务是否处理完成都会立马应答并在队列中将消息删除
- 手动应答:任务处理完成才应答
- 实验目标:消息在手动应答模式下是不会丢失的,因为一旦消息在某个消费者处处理失败,它会将这个消息重新放回队列中,给其他队列消费,处理完成后,消费者才给出应答,此时消息才会被删除
- 异同:
- 自动应答 消费者收到消息后立马应答-->应答后删除消息(未处理完成的消息被删除则丢失消息)
- 立马应答 消费者处理完成后才会应答-->应答后删除消息(消息已处理完成,不丢失)
- */
- public class ManualAckCon02 {
-
- public static final String QUEUE_NAME="ack_queue";
-
- public static void main(String[] args) throws Exception {
-
- Channel channel = RabbitMqUtils.getChannel();
- System.out.println("C2等待接收消息处理时间较长");
-
- //采用手动应答
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME,autoAck,(consumerTag,message)->{
- SleepUtils.sleep(30);
- System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
- /**
- * 手动应答
- *
- * 1、消息的标记
- * 2、是否批量应答
- * 3、
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- },(consumerTag)->{
- System.out.println(consumerTag+":消费者取消消费时接口回调");
- });
- }
- }
操作:启动生产者,再启动两个消费者,接着在控制台分别输入AA、BB,看到C1收到AA,此时C2在30s后可看到BB。如果再发送CC、DD,且把C2的程序在30s内关闭,此时C1能收到CC,DD(C2没有处理完成DD,此时消息放回队列重新被消费)
预期:生产者发送AA、BB,此时消费者1间隔1s后收到AA,如果未出现错误,消费者2在30s后可收到BB,如果宕机等,那么BB回到队列,由消费者1来处理
操作:只需要在生成队列时将durable标志设为true
注意: 如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
此时这个队列在mq管理界面可以看到字母D
这里的2、5是堆积在信道的消息数,而不是最终处理的消息数,假如11 22分别进入C1、C2的信道,接着C1把11处理完成,22堆积在C2信道中,此时生产者再发送33 44 55, 33 44堆积在C1信道,(此时信道C1只有两个,因为前面的11已经被处理完成)
- //发布1000条单独确认的消息,耗时:1207ms
- public class SingleConfirmPro {
-
- //批量发消息的个数
- public static final int MESSAGE_COUNT=1000;
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName,true,false,false,null);
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long begin = System.currentTimeMillis();
- //批量发消息
- for(int i=0;i<MESSAGE_COUNT;i++){
- String message = i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- //单个消息就立马进行确认
- boolean flag = channel.waitForConfirms();
- if (flag){
- System.out.println("消息发送成功");
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"条单独确认的消息,耗时:"+(end-begin)+"ms");
- }
- }
- //发布1000条消息,间隔消息数量为100时批量确认,耗时:123ms
- public class BatchConfirmPro {
- //批量发消息的个数
- public static final int MESSAGE_COUNT=1000;
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName,true,false,false,null);
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long begin = System.currentTimeMillis();
- //多少条就批量确认一次
- int batchSize=100;
- //批量发消息并进行批量确认
- for(int i=0;i<MESSAGE_COUNT;i++){
- String message = i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- //消息积累100条再批量确认一次
- if (i%batchSize==0){
- channel.waitForConfirms();
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"条消息,间隔消息数量为100时批量确认,耗时:"+(end-begin)+"ms");
- }
- }