整合背景:
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starterartifactId>
- <version>2.7.13version>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
可根据不同的交换机,进行队列,路由的定义。也可以对队列路由进行扩充。目前这里只用到直连类型的交换机。扩充的属性可以看(三)中对应的实体类
- # 通过yaml配置文件,初始化rabbitMQ的交换机-路由-队列。
- # 交换机(exchange)通过路由(routing-key)来确定发送到那个队列(queue)
- # 对于 Direct直连类型交换机。匹配路由的规则为全匹配。可以理解为routing-key与queue一对一
- # 对于 Fanout扇形类型交换机。则直接分发给所有绑定的队列,此时可以忽略routing-key, 多用于广播消息
- # 对于 Topic 主题类型交换机。根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
- # 在spring cloud多应用中。不同的应用可能需要不同的类型。该配置只是该应用下创建的一个或多个类型。
- rabbit:
- modules:
- - exchange:
- name: default.exchange
- #type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct
- type: DIRECT
- queue:
- name: default.queue
- arguments:
- # 队列中所有消息的最大存活时间。单位毫秒。 1分钟
- x-message-ttl: 60000
- # routing-key可以为空
- routing-key: default.queue.key
YamlAndPropertySourceFactory.class:该类为了解决读取yaml以及yml配置文件无法映射的问题。如果可以映射配置文件,可以忽略这个
- /**
- * 读取rabbitMQ配置中的交换机,队列等信息
- *
- * @author byp
- * @date 2023/7/27 11:30
- */
- @Component
- @ConfigurationProperties(prefix = "rabbit")
- @PropertySource(value = "${property.source.prefix:classpath}:rabbitmq/rabbit.yaml", factory =YamlAndPropertySourceFactory.class)
- @Data
- public class RabbitProperties {
- private List
modules; - }
RabbitModuleInfo对象:
- /**
- * RabbitMQ 队列和交换机机绑定关系实体对象
- *
- * @author byp
- * @date 2023/7/27 11:31
- */
- @Data
- public class RabbitModuleInfo {
-
- /**
- * 路由Key
- */
- private String routingKey;
- /**
- * 队列信息
- */
- private Queue queue;
- /**
- * 交换机信息
- */
- private Exchange exchange;
-
- /**
- * 交换机信息类
- */
- @Data
- public static class Exchange {
- /**
- * 交换机类型
- * 默认直连交换机
- */
- private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;
- /**
- * 交换机名称
- */
- private String name;
- /**
- * 是否持久化
- * 默认true持久化,重启消息不会丢失
- */
- private boolean durable = true;
- /**
- * 当所有队绑定列均不在使用时,是否自动删除交换机
- * 默认false,不自动删除
- */
- private boolean autoDelete = false;
- /**
- * 交换机其他参数
- */
- private Map
arguments; - }
-
- /**
- * 队列信息类
- */
- @Data
- public static class Queue {
- /**
- * 队列名称
- */
- private String name;
- /**
- * 是否持久化
- * 默认true持久化,重启消息不会丢失
- */
- private boolean durable = true;
- /**
- * 是否具有排他性
- * 默认false,可多个消费者消费同一个队列
- */
- private boolean exclusive = false;
- /**
- * 当消费者均断开连接,是否自动删除队列
- * 默认false,不自动删除,避免消费者断开队列丢弃消息
- */
- private boolean autoDelete = false;
- /**
- * 绑定死信队列的交换机名称
- */
- private String deadLetterExchange;
- /**
- * 绑定死信队列的路由key
- */
- private String deadLetterRoutingKey;
- private Map
arguments; - }
-
- }
YamlAndPropertySourceFactory.class
- /**
- * 获取yaml配置
- *
- * @author byp
- * @date 2023/5/22 10:51
- */
- @Configuration
- public class YamlAndPropertySourceFactory extends DefaultPropertySourceFactory {
-
- private static final String YML_RESOURCE_FILE = ".yml";
-
- private static final String YAML_RESOURCE_FILE = ".yaml";
-
- @NotNull
- @Override
- public PropertySource> createPropertySource(String name, EncodedResource resource) throws IOException {
-
- Resource resourceResource = resource.getResource();
- if (Objects.requireNonNull(resourceResource.getFilename()).endsWith(YML_RESOURCE_FILE) || Objects.requireNonNull(resourceResource.getFilename()).endsWith(YAML_RESOURCE_FILE)) {
- List
> sources = new YamlPropertySourceLoader().load(resourceResource.getFilename(), resourceResource); - return sources.get(0);
- }
- return super.createPropertySource(name, resource);
- }
- }
动态参数配置校验(校验可不配置):可根据不同的交换机类型,做校验,这里是对直连交换机的参数做的校验。
- /**
- * rabbitMQ初始化交换机,队列 容器
- *
- * @author byp
- * @date 2023/7/27 11:05
- */
- @Slf4j
- public class RabbitModuleInitializer implements SmartInitializingSingleton {
-
- AmqpAdmin amqpAdmin;
- RabbitProperties rabbitProperties;
-
- public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
- this.amqpAdmin = amqpAdmin;
- this.rabbitProperties = rabbitProperties;
- }
-
- @Override
- public void afterSingletonsInstantiated() {
- log.info("RabbitMQ 动态创建和绑定队列、交换机开始");
- declareRabbitModule();
- log.info("RabbitMQ 动态创建和绑定队列、交换机完成");
- }
-
- /**
- * RabbitMQ 根据配置动态创建和绑定队列、交换机
- */
- private void declareRabbitModule() {
- List
rabbitModuleInfos = rabbitProperties.getModules(); - if (CollectionUtils.isEmpty(rabbitModuleInfos)) {
- return;
- }
- for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
- configParamValidate(rabbitModuleInfo);
- // 队列
- Queue queue = convertQueue(rabbitModuleInfo.getQueue());
- // 交换机
- Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
- // 绑定关系
- String routingKey = rabbitModuleInfo.getRoutingKey();
- String queueName = rabbitModuleInfo.getQueue().getName();
- String exchangeName = rabbitModuleInfo.getExchange().getName();
- Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
- // 创建队列
- amqpAdmin.declareQueue(queue);
- // 创建交换机
- amqpAdmin.declareExchange(exchange);
- // 队列 绑定 交换机
- amqpAdmin.declareBinding(binding);
- }
- }
-
- /**
- * RabbitMQ动态配置参数校验
- *
- * @param rabbitModuleInfo 队列和交换机机绑定关系实体对象
- */
- public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
- String routingKey = rabbitModuleInfo.getRoutingKey();
- Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");
- Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
- Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
- Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
- Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
-
- }
-
- /**
- * 转换生成RabbitMQ队列
- *
- * @param queue 队列
- * @return Queue
- */
- public Queue convertQueue(RabbitModuleInfo.Queue queue) {
- Map
arguments = queue.getArguments(); -
- // 转换ttl的类型为long
- if (arguments != null && arguments.containsKey("x-message-ttl")) {
- arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
- }
- // 是否需要绑定死信队列
- String deadLetterExchange = queue.getDeadLetterExchange();
- String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
- if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {
- if (arguments == null) {
- arguments = new HashMap<>(4);
- }
- arguments.put("x-dead-letter-exchange", deadLetterExchange);
- arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
-
- }
- return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
- }
-
-
- /**
- * 转换生成RabbitMQ交换机
- *
- * @param exchangeInfo 交换机信息
- * @return Exchange
- */
- public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {
-
- AbstractExchange exchange = null;
- RabbitExchangeEnum exchangeType = exchangeInfo.getType();
- String exchangeName = exchangeInfo.getName();
- boolean isDurable = exchangeInfo.isDurable();
- boolean isAutoDelete = exchangeInfo.isAutoDelete();
-
- Map
arguments = exchangeInfo.getArguments(); -
- switch (exchangeType) {
- case DIRECT:
- // 直连交换机
- exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
- break;
- case TOPIC:
- // 主题交换机
- exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
- break;
- case FANOUT:
- //扇形交换机
- exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
- break;
- case HEADERS:
- // 头交换机
- exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
- break;
- }
- return exchange;
- }
-
- }
- /**
- * rabbitMQ配置
- *
- * @author byp
- * @date 2023/7/27 9:14
- */
- @Configuration
- public class RabbitConfig {
-
- /**
- * 通过yaml配置,创建队列、交换机初始化器
- */
- @Bean
- @ConditionalOnMissingBean
- public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
- return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);
- }
-
- /**
- * 动态监听所有队列
- *
- * @param rabbitConsumerManager 用于消息监听的类
- * @param connectionFactory MQ连接
- * @return SimpleMessageListenerContainer
- */
- @Bean
- public SimpleMessageListenerContainer mqMessageContainer(RabbitConsumerManager rabbitConsumerManager,
- ConnectionFactory connectionFactory) {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
- //所有的队列
- container.setQueueNames(Objects.requireNonNull(RabbitQueueEnum.getAllQueues()));
- container.setExposeListenerChannel(true);
- //设置每个消费者获取的最大的消息数量
- container.setPrefetchCount(100);
- //消费者个数
- container.setConcurrentConsumers(1);
- //设置确认模式为手工确认
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- //监听处理类
- container.setMessageListener(rabbitConsumerManager);
- return container;
- }
-
- }
以上五个步骤就完成了根据配置文件初始化rabbitMQ的交换机-路由-队列
- /**
- * rabbit MQ 发送消息
- *
- * @author byp
- * @date 2023/7/27 8:58
- */
- @Component
- @RequiredArgsConstructor
- @Slf4j
- public class RabbitProducerManager {
- private final RabbitTemplate rabbitTemplate;
- private final AmqpAdmin amqpAdmin;
-
- /**
- * 指定队列发送消息
- * 【注】 rabbitQueueEnum 与 rabbitRoutingEnum 的值可以考虑使用相同的,方便发送消息与接受消息。
- *
- * @param message 消息体
- * @param rabbitQueueEnum 队列枚举
- */
- public void sendMessage(RabbitQueueEnum rabbitQueueEnum, RabbitRoutingEnum rabbitRoutingEnum, String message) {
- if (!isExistQueue(rabbitQueueEnum)) {
- //可以改成没有队列的话,自动创建队列。
- throw new BusinessException("发送消息失败,队列为空");
- }
- rabbitTemplate.convertAndSend("default.exchange", rabbitRoutingEnum.getCode(), message);
- log.info("向路由:{}, 发送消息成功:{}", rabbitRoutingEnum.getCode(), message);
- }
-
-
- /**
- * 判断队列是否存在
- *
- * @param rabbitQueueEnum 队列枚举
- * @return boolean true-存在
- */
- public boolean isExistQueue(RabbitQueueEnum rabbitQueueEnum) {
- boolean falg = true;
- if (StringUtils.isBlank(rabbitQueueEnum.getCode())) {
- throw new BusinessException("队列名称为空");
- }
- Properties queueProperties = amqpAdmin.getQueueProperties(rabbitQueueEnum.getCode());
- if (queueProperties == null) {
- falg = false;
- }
- return falg;
- }
-
- }
- /**
- * rabbitMQ 消费者
- * 不统一处理消费,各应用自己创建监听消息消费
- *
- * @author byp
- * @date 2023/7/27 9:39
- */
- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class RabbitConsumerManager implements ChannelAwareMessageListener {
- private final ApplicationEventPublisher publisher;
- @Override
- public void onMessage(Message message, Channel channel) throws IOException {
- MessageProperties messageProperties = message.getMessageProperties();
- //消息唯一标识
- long deliveryTag = messageProperties.getDeliveryTag();
- try {
- //路由
- String routingKey = messageProperties.getReceivedRoutingKey();
- //渠道
- String consumerQueue = messageProperties.getConsumerQueue();
- //消息体
- String msg = new String(message.getBody());
- log.info("RabbitConsumerManager监听到消息标识:{}, 队列:{}, 路由:{}, 的消息:{}", deliveryTag, consumerQueue,
- routingKey, msg);
- //将消息发送到spring的事件监听中
- JSONObject json = new JSONObject();
- json.put("deliveryTag", deliveryTag);
- json.put("routingKey", routingKey);
- json.put("consumerQueue", consumerQueue);
- json.put("msg", msg);
- json.put("channel", channel);
- //这一步可以根据routingKey进行事件分类。在监听中根据不同的类进行事件处理
- publisher.publishEvent(json);
- //这里确认消息消费不合适,这里并非是真实的业务消费场景。
- //channel.basicAck(deliveryTag, true);
- } catch (Exception e) {
- channel.basicReject(deliveryTag, false);
- log.error("获取rabbitMQ消息报错");
- e.printStackTrace();
- }
- }
- }
- /**
- * rabbitMQ consumer事件监听
- *
- * @author byp
- * @date 2023/7/28 11:11
- */
- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class MessageListener {
-
- @EventListener
- public void handleMessage(JSONObject json) throws IOException {
- log.info("接受到消息json:{}", json);
- //标识消息在队列中的顺序
- long deliveryTag = json.getLong("deliveryTag");
- Channel channel = (Channel) json.get("channel");
- try {
- //TODO 业务逻辑
- //标记消费成功, false-只确认当前消息 ture-批量确认,当前标记之前的都确认消费
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- //拒绝消费并放入queue中,这时消息会进入循环,直到消费掉。 第二个参数表示是否放回队列中
- channel.basicReject(deliveryTag, true);
- log.error("消息确认失败,消息标识:{}", deliveryTag);
- }
- }
-
- }