• rabbitmq动态创建监听


    rabbitmq动态创建监听

    目录

    • DynamicContainerFactory: 工厂类
    • DynamicContainerFactoryBean: 创建 SimpleMessageListenerContainer
    • DynamicListenerManager: 管理动态新增的 SimpleMessageListenerContainer
    • MyMessageListener: 业务处理监听
    • DynamicController:Api接口
    DynamicContainerFactoryBean
    @Builder
    public class DynamicContainerFactoryBean implements FactoryBean<SimpleMessageListenerContainer> {
    
    
        @Getter
        private String queue;
        private String exchangeType;
        @Getter
        private String exchange;
        private String binding;
        private ConnectionFactory connectionFactory;
        private AmqpAdmin amqpAdmin;
        private boolean durable = true;
        private String routeKey;
        private boolean autoAck;
        private MessageListener listener;
    
    
        @Override
        public SimpleMessageListenerContainer getObject() throws Exception {
    
            //声名队列,交换机,binding
            amqpAdmin.declareQueue(declareQueue());
            amqpAdmin.declareExchange(declareExchange());
            amqpAdmin.declareBinding(declareBinding(declareQueue(), declareExchange()));
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setQueueNames(queue);
            container.setAutoStartup(false);
            container.setMessageListener(listener);
            container.setAmqpAdmin(amqpAdmin);
            container.setAutoDeclare(true);
            container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
            container.setConnectionFactory(connectionFactory);
            return container;
        }
    
        private Binding declareBinding(Queue declareQueue, Exchange declareExchange) {
            return BindingBuilder.bind(declareQueue).to(declareExchange).with(routeKey).noargs();
        }
    
    
        private Exchange declareExchange() {
            switch (exchangeType) {
                case ExchangeTypes.DIRECT:
                    return ExchangeBuilder.directExchange(exchange).durable(true).build();
                case ExchangeTypes.TOPIC:
                    return ExchangeBuilder.topicExchange(exchange).durable(true).build();
                case ExchangeTypes.FANOUT:
                    return ExchangeBuilder.fanoutExchange(exchange).durable(true).build();
                case ExchangeTypes.HEADERS:
                    return ExchangeBuilder.headersExchange(exchange).durable(true).build();
            }
            throw new RuntimeException("不支持的交换器类型");
        }
    
        private Queue declareQueue() {
            return QueueBuilder.durable(queue).build();
        }
    
    
        @Override
        public Class<?> getObjectType() {
            return SimpleMessageListenerContainer.class;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    DynamicListenerManager
    @Component
    public class DynamicListenerManager {
    
    
        private Map<String, SimpleMessageListenerContainer> map = new ConcurrentHashMap<>();
    
        public void add(String name, SimpleMessageListenerContainer container) {
            map.put(name, container);
        }
    
        public void start() {
            for (SimpleMessageListenerContainer container : map.values()) {
                container.start();
            }
        }
    
        public void stop() {
            for (SimpleMessageListenerContainer container : map.values()) {
                container.stop();
            }
        }
    
    
        public void purge() {
            map.clear();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    MyMessageListener
    public class MyMessageListener implements ChannelAwareMessageListener {
    
        private SimpleMessageListenerContainer container;
        private boolean autoAck;
    
        public MyMessageListener(SimpleMessageListenerContainer container) {
            this.container = container;
            this.autoAck = container.getAcknowledgeMode().isAutoAck();
        }
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            if (!autoAck) {
                try {
                    doOnMessage(message, channel);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            } else {
                doOnMessage(message, channel);
            }
        }
    
        private void doOnMessage(Message message, Channel channel) {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("dynamic msg: " + msg);
        }
    
        @Override
        public void onMessage(Message message) {
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    DynamicContainerFactory
    @Component
    public class DynamicContainerFactory {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Autowired
        private ConnectionFactory factory;
    
        @Autowired
        private DynamicListenerManager dynamicListenerManager;
    
    
        private Map<String, DynamicContainerFactoryBean> map = new ConcurrentHashMap<>();
    
        public void dynamicCreate(DynamicContainerDto dynamicContainerDto) {
            try {
                DynamicContainerFactoryBean factoryBean = new DynamicContainerFactoryBean.DynamicContainerFactoryBeanBuilder()
                        .amqpAdmin(rabbitAdmin)
                        .autoAck(dynamicContainerDto.isAutoAck())
                        .durable(dynamicContainerDto.isDurable())
                        .routeKey(dynamicContainerDto.getRouteKey())
                        .connectionFactory(factory)
                        .exchangeType(dynamicContainerDto.getExchangeType())
                        .queue(dynamicContainerDto.getQueue())
                        .exchange(dynamicContainerDto.getExchange())
                        .binding(dynamicContainerDto.getBinding())
                        .build();
                SimpleMessageListenerContainer container = factoryBean.getObject();
                MyMessageListener myMessageListener = new MyMessageListener(container);
                container.setMessageListener(myMessageListener);
                dynamicListenerManager.add(dynamicContainerDto.getContainerName(), container);
                map.put(dynamicContainerDto.getContainerName(), factoryBean);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        public void start() {
            dynamicListenerManager.start();
        }
    
        public void stop() {
            dynamicListenerManager.stop();
        }
    
        public void purge() {
            for (DynamicContainerFactoryBean factoryBean : map.values()) {
                rabbitAdmin.deleteQueue(factoryBean.getQueue());
                rabbitAdmin.deleteExchange(factoryBean.getExchange());
            }
            map.clear();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    DynamicController
    @RestController
    public class DynamicController {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        @Autowired
        private DynamicContainerFactory dynamicContainerFactory;
    
    
        /**
         * 创建队列
         * @param name
         * @return
         */
        @GetMapping("/queue/create/{name}")
        public String createQueue(@PathVariable("name") String name) {
            Queue queue = QueueBuilder.durable(name).build();
            String s = rabbitAdmin.declareQueue(queue);
            if (StringUtils.hasText(s)) {
                System.out.println("create queue: " + s);
                return "success";
            } else {
                return "failed";
            }
        }
    
        /**
         * 创建交换器
         * @param name
         * @return
         */
        @GetMapping("/ex/create/{name}")
        public String createExchange(@PathVariable("name") String name) {
            Exchange exchange = ExchangeBuilder.directExchange(name).durable(true).build();
            rabbitAdmin.declareExchange(exchange);
            System.out.println("create exchange: " + exchange);
            return "success";
        }
    
    
        /**
         * 创建 binding
         * @param bindingDto
         * @return
         */
        @GetMapping("/binding/create")
        public String createBinding(BindingDto bindingDto) {
    
            Binding binding = BindingBuilder.bind(new Queue(bindingDto.getQueue()))
                    .to(new DirectExchange(bindingDto.getExchange()))
                    .with(bindingDto.getRouteKey());
            rabbitAdmin.declareBinding(binding);
            System.out.println("create binding: " + binding);
            return "success";
        }
    
        /**
         * 创建容器监听
         * @param req
         * @return
         */
        @GetMapping("/container/create")
        public String createContainer(DynamicContainerDto req){
            dynamicContainerFactory.dynamicCreate(req);
            return "success";
        }
    
        /**
         * 启动监听
         * @return
         */
        @GetMapping("/container/start")
        public String containerStart(){
            dynamicContainerFactory.start();
            return "success";
        }
    
        /**
         * 关闭监听
         * @return
         */
        @GetMapping("/container/stop")
        public String containerStop(){
            dynamicContainerFactory.stop();
            return "success";
        }
    
        /**
         * 删除容器
         * @return
         */
        @GetMapping("/container/purge")
        public String containerPurge(){
            dynamicContainerFactory.purge();
            return "success";
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    good luck!

  • 相关阅读:
    基于Matlab实现自动泊车(垂直泊车)
    你和工博会观展达人也许只差一篇攻略
    uni-app中返回顶部的方法
    POSIX信号量
    matlab 遗传算法GA优化汽车四分之一车体主动悬架PID
    2023最新SSM计算机毕业设计选题大全(附源码+LW)之java创新实践学分管理系统08a30
    如何使用CSS实现一个带有动画效果的折叠面板(Accordion)?
    腾讯:《智能科技 跨界相变——2024数字科技前沿应用趋势》
    windows安装wsl2以及ubuntu
    n皇后学习
  • 原文地址:https://blog.csdn.net/u013887008/article/details/127649977