RabbitMQ系列汇总:RabbitMQ系列
/**
* 这是一个测试的生产者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyProducerTest_xiaoxiyingda {
/**
* 这里为了方便,我们使用main函数来测试
* 纯属看你个人选择
* @param args
*/
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
for (int i=0;i<6;i++){
String message="我是生产者,我告诉你一个好消息!"+i;
Thread.sleep( 1000 );
channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
}
/**
* 这是一个测试的消费者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_xiaoxiyingda01 {
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("我是消费者A,我在等待接收消息!");
DeliverCallback deliverCallback = (String var1, Delivery var2)->{
String message= new String(var2.getBody());
System.out.println(message);
};
CancelCallback cancelCallback = (String var1)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
Thread.sleep(1000);
channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,true,deliverCallback,cancelCallback);
}
}
执行结果


结论
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
return this.basicConsume(queue, autoAck, "", this.consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
}
当autoAck为true时,为自动应答
当autoAck为false时,为手动应答
不建议使用自动应答,实际业务场景中,一般我们使用手动应答。
生产者
/**
* 这是一个测试的生产者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyProducerTest_xiaoxiyingda {
/**
* 这里为了方便,我们使用main函数来测试
* 纯属看你个人选择
* @param args
*/
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());
System.out.println("生产者发出消息" + message);
}
}
}
消费者A
/**
* 这是一个测试的消费者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_xiaoxiyingda01 {
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("我是消费者A,我在等待接收消息!");
DeliverCallback deliverCallback = (String var1, Delivery var2)->{
String message= new String(var2.getBody());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
//true 代表批量应答 channel 上未应答的消息 false 单条应答
boolean multiple = false;
channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
};
CancelCallback cancelCallback = (String var1)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
}
}
设置手动应答,设置休眠时间较短,表示处理业务非常快。
消费者B
/**
* 这是一个测试的消费者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_xiaoxiyingda02 {
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
System.out.println("我是消费者B,我在等待接收消息!");
DeliverCallback deliverCallback = (String var1, Delivery var2)->{
String message= new String(var2.getBody());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
//true 代表批量应答 channel 上未应答的消息 false 单条应答
boolean multiple = false;
channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);
};
CancelCallback cancelCallback = (String var1)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);
}
}
设置手动应答,设置休眠时间较长,表示处理业务较慢。
执行顺序
执行结果


multiple 的 true 和 false 代表不同意思

消息自动重新入队

路漫漫其修远兮,吾必将上下求索~
如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah