百度云文档地址:
链接:https://pan.baidu.com/s/1gwKHzMiwUB-hw9ZFIJ9SxQ
提取码:y07h
gitee基本代码地址:
https://gitee.com/liushanshan126/rabbitmq-test
gitee整合springboot代码地址:
https://gitee.com/liushanshan126/rabbitmq-springboot-test
/sbin/service rabbitmq-server start
rabbitmq-plugins enable rabbitmq_management
package com.bear.简单模式;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <简述> 简单模式测试
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
public class HelloWorldTest {
private final static String QUEUE_NAME = "chijiuhua";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.48.77.231");
factory.setUsername("admin");
factory.setPassword("123");
//channel实现了自动close接口自动关闭不需要显示关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/***生成一个队列
* *1.队列名称*
* 2.队列里面的消息是否 持久化 默认消息存储在内存中
* *3.该队列是否只供一个消费者进行消费是否进行共享 false可以多个消费者消费
* 注意:exclusive:有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,
* 没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常。
* *4.是否自动删除最后一个消费者端开连接以后该队列是否自动删除 true自动删除
* *5.其他参数*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message="helloworld测试";
/***发送一个消息
* *1.发送到那个交换机
* *2.路由的key是哪个
* *3.其他的参数信息
* *4.发送消息的消息体*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
// 关闭连接
// connection.close();
}
}
package com.bear.简单模式;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 测试多个消费者消费一个队列
* 测试结果:多个消费者可以消费同一个队列,但是队列里面的数据只能给一个消费者,多个消费者得进行争抢
* <简述>
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
public class Consumer2 {
private final static String QUEUE_NAME="hello3";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("120.48.77.231");
factory.setUsername("admin");
factory.setPassword("123");
//channel实现了自动close接口自动关闭不需要显示关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
// 成功回调的函数式接口
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String message = new String(delivery.getBody());
System.out.println("获取的队列里面的参数为:" + message);
} ;
// 失败的函数式接口
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("失败的参数consumerTag:" + consumerTag);
};
/***消费者消费消息
* *1.消费哪个队列
* *2.消费成功之后是否要自动应答true代表自动应答false手动应答
* *3.消费者成功消费的回调
* 4.消费者失败的回调*/
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
注意:channel.basicQos(2); 代表一次可以有多个消息进来等待被消费,叫做不公平分发,这里是2。2可以换成其他的数字
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
Channel channel = RabbitMqUtil.getConnection().createChannel();
String queueName= UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
// 开启异步方式
channel.addConfirmListener((deliveryTag, multiple) -> {
System.out.println("成功处理:" + deliveryTag);
}, (deliveryTag, multiple) -> {
System.out.println("未成功处理的数据:" + deliveryTag);
});
long begin=System.currentTimeMillis();
for(int i = 0; i< 1000 ;i ++){
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
}
long end=System.currentTimeMillis();
System.out.println("发布" + 10+ "个单独确认消息,耗时" + (end-begin) +"ms"); //耗时290ms
package com.bear.发布确认.异步发布确认;
import com.bear.utils.RabbitMqUtil;
import com.rabbitmq.client.Channel;
import jdk.management.resource.internal.inst.SocketOutputStreamRMHooks;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/** 异步发布确认 最常用的发布确认方式
* <简述>
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
public class AsynConfirm {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getConnection().createChannel();
String queueName= UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
// 线程安全的哈希表保存信道中的标识+传输数据
ConcurrentSkipListMap<Long,String> outstandingConfirms=new ConcurrentSkipListMap<>();
// 开启异步方式
channel.addConfirmListener((deliveryTag, multiple) -> {
// 批量回调和单个回调
if(!multiple){
System.out.println("单个数据回调:" + deliveryTag);
outstandingConfirms.remove(deliveryTag);
}else {
System.out.println("多个数据回调:" + deliveryTag);
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
outstandingConfirms.headMap(deliveryTag);
longStringConcurrentNavigableMap.clear();
}
System.out.println("成功处理:" + deliveryTag);
}, (deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未成功处理的发送的数据为:" + message);
System.out.println("未成功处理的数据:" + deliveryTag);
});
long begin=System.currentTimeMillis();
for(int i = 0; i< 1000 ;i ++){
String message = i + "";
// 将信道中的数据的标识 和 放入的数据放入线程安全的map中
/***channel.getNextPublishSeqNo()获取下一个消息的序列号 *通过序列号与消息体进行一个关联 *全部都是未确认的消息体 */
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("",queueName,null,message.getBytes());
}
long end=System.currentTimeMillis();
System.out.println("发布" + 10+ "个单独确认消息,耗时" + (end-begin) +"ms"); //耗时290ms
}
}
消费者代码:
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
/***生成一个临时的队列队列的名称是随机的 *当消费者断开和该队列的连接时队列自动删除 */
String queueName=channel.queueDeclare().getQueue();
//把该临时队列绑定我们的exchange其中routingkey(也称之为bindingkey)为空字符串
channel.queueBind(queueName, "fanout-test", "fanout");
System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
// 成功回调的函数式接口
DeliverCallback deliverCallback = (consumerTag, delivery)->{
System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
} ;
// 获取队列中待消费的数据
channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {});
}
生产者代码:
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
/***声明一个exchange*1.exchange的名称 *2.exchange的类型 */
channel.exchangeDeclare("fanout-test", "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while(sc.hasNext()){
String message=sc.nextLine();
channel.basicPublish("fanout-test", "",null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);}
}
代码:
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//交换机定义
channel.exchangeDeclare("direct-test", BuiltinExchangeType.DIRECT);
// 队列名称
String queueName="test1";
// 队列定义
channel.queueDeclare(queueName, false, false, false, null);
// 队列绑定交换机,设置routingkey
channel.queueBind(queueName, "direct-test","error");
channel.queueBind(queueName, "direct-test","warning");
System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
// 成功回调的函数式接口
DeliverCallback deliverCallback = (consumerTag, delivery)->{
System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
} ;
// 获取队列中待消费的数据
channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {});
}
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
/***声明一个exchange*1.exchange的名称 *2.exchange的类型 */
channel.exchangeDeclare("direct-test", BuiltinExchangeType.DIRECT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while(sc.hasNext()){
String message=sc.nextLine();
channel.basicPublish("direct-test", "error",null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);}
}
queue中的某些消息无法被消费,然后消息就会放入死信队列中
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//声明死信和普通交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue1";
// 队列定义
channel.queueDeclare(deadQueue,false,false,false,null);
//死信队列绑定死信交换机与routingkey
channel.queueBind(deadQueue, DEAL_EXCHANGE,"lisi");
// 正常队列
//正常队列绑定死信队列信息
Map<String,Object> params=new HashMap<>();
//正常队列设置死信交换机参数key是固定值
params.put("x-dead-letter-exchange",DEAL_EXCHANGE);
//正常队列设置死信routing-key参数key是固定值
params.put("x-dead-letter-routing-key","lisi");
//声明正常队列
String deadNormalQueue = "dead-normal-queue";
// 队列定义 ---如果有死信,则设置arguments里面的值
channel.queueDeclare(deadNormalQueue,false,false,false,params);
//正常队列与交换机绑定routingkey
channel.queueBind(deadNormalQueue, NORMAL_EXCHANGE,"zhangsan");
System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
// 成功回调的函数式接口
DeliverCallback deliverCallback = (consumerTag, delivery)->{
System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
} ;
// 获取队列中待消费的数据
channel.basicConsume(deadNormalQueue, true, deliverCallback, (consumerTag) -> {});
重点信息红框标出来的图片:
消费者重要截图:
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bear</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ测试依赖-->
<dependency><groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
properties:
spring.rabbitmq.host=120.48.77.231
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
package com.bear.configure;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* <简述> 定义队列,定义交换机,定义队列和交换机的关系的配置类
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
@Configuration
public class TtlQueueConfig {
// 正常的1个交换机和2个队列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B="QB";
// 没有ttl的队列
public static final String QUEUE_C="QC";
// 死信交换机和队列
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE="QD";
// 声明direct交换机 X
@Bean
public DirectExchange xExchange(){
// 广播交换机
// FanoutExchange fanoutExchange = new FanoutExchange(X_EXCHANGE);
// topic交换机
// TopicExchange topicExchange = new TopicExchange(X_EXCHANGE);
return new DirectExchange(X_EXCHANGE);
}
// 声明direct交换机 Y
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列 QA ,ttl时间为10s
@Bean
public Queue queueA(){
Map<String,Object> args=new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key","YD");
//声明队列的TTL
args.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列A绑定X交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列 QB , ttl时间为40s
@Bean
public Queue queueB(){
Map<String,Object> args=new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key","YD");
//声明队列的TTL
args.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 声明队列B绑定X交换机
@Bean
public Binding queueaBindingX2(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明队列 QC , 没有ttl
@Bean
public Queue queueC(){
Map<String,Object> args=new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key","YD");
//不声明队列的TTL
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
// 声明队列C绑定X交换机
@Bean
public Binding queueaBindingX3(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
//声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列QD绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
package com.bear.configure;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* <简述> 死信队列接收消息的地方
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues="QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg=new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
package com.bear.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* <简述> 生产者生产信息
* <详细描述>
*
* @author LiuShanshan
* @version $Id$
*/
@Slf4j
@RequestMapping("ttl")
@RestController
public class TtlController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个 TTL队列:{}", new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自 ttl为 10S的队列: "+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl为 40S的队列: "+message);
}
@GetMapping("sendMsgTtl/{message}/{ttlTime}")
public void sendMsgTtl(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime){
log.info("当前时间:{},发送一条设置了ttl的信息给队列:{}", new Date(),message);
rabbitTemplate.convertAndSend("X","XC","消息来自 ttl为" + ttlTime +"S的队列: "+message, correlationData ->{
// 设置信息的过期时间
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;});
log.info("当前时间:{},发送一条时长{}毫秒 TTL信息给队列 C:{}", new Date(),ttlTime, message);
}
}
因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
代码跟上面的没有啥不同,改的有,修改交换机的类型:
在发送消息的时候修改:
其他的都一样
增加了:
其他的跟上面的代码一样,这样就可以处理交换机是否收到消息和队列是否收到消息,双重处理.
声明正常队列时绑定备份交换机:
声明备份交换机为fanout模式,然后绑定2个交换机:
其他的跟上面的3.6.2没有区别
使用redis中的原子性来处理
使用场景:
如何使用: