• rabbit使用动态队列


    在使用rabbitmq的过程中,因业务需要,队列名需要动态生成,本应用只消费这个动态生成的队列数据,而绑定队列则由其它应用进行绑定,

    配置

    使用springboot进行配置,集成看其它文章

    spring:
      rabbitmq:
        host: 192.168.178.100
        port: 5672
        virtual-host: xxxx
        username: xxxx
        password: xxxx
    

    创建消费者

    这么没有使用rabbitmq的@RabbitListener,因为这里需要动态指定消费队列,在下面会指定

    @Slf4j
    @Component
    public class RawDataConsumer implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
             String messageBody=null;
            try {
                log.info("收到数据:{}",new String(message.getBody()));
                messageBody= new String(message.getBody(), StandardCharsets.UTF_8.name());
    			//业务处理
    			.....
            } catch (Exception e) {
                log.error("处理数据失败:{},{}",messageBody,e);
            }
    	}
    }
    
    

    动态指定队列

    在这里动态指定上面的消费者消费的队列名,把队列名存入数据库,供其它应用读取并绑定

    @Slf4j
    @Configuration
    public class RabbitMQConfig {
    
        private ConnectionFactory connectionFactory;
    
        private RawDataConsumer rawDataConsumer;
    
        @Bean
        public Queue ipQueue(){
            return new Queue(IPUtils.getQueue(), true); //使用IP做队列名
        }
    
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer(){
            String queueName=IPUtils.getQueue();
            ForwardQueues forwardQueues=new ForwardQueues();
            forwardQueues.setIp(IPUtils.getIPAndPort());
            forwardQueues.setQueueName(queueName);
            Boolean flag=iForwardQueuesService.create(forwardQueues);
            if(flag){
                SimpleMessageListenerContainer simpleMessageListenerContainer=new SimpleMessageListenerContainer(connectionFactory);
                simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
                simpleMessageListenerContainer.addQueueNames(queueName);
                simpleMessageListenerContainer.setMessageListener(rawDataConsumer);
                simpleMessageListenerContainer.setConcurrency("3");
                simpleMessageListenerContainer.setMaxConcurrentConsumers(12);
                return simpleMessageListenerContainer;
            }else{
                log.error("写入队列信息表失败{},{}",IPUtils.getIPAndPort(),queueName);
                return null;
            }
        }
    
        @Autowired
        public void setRawDataConsumer(RawDataConsumer rawDataConsumer) {
            this.rawDataConsumer = rawDataConsumer;
        }
        @Autowired
        public void setConnectionFactory(ConnectionFactory connectionFactory) {
            this.connectionFactory = connectionFactory;
        }
    

    绑定

    其它绑定交换机和队列绑定

        @PostMapping("/v1")
        public HttpResult create(@Valid @RequestBody final LinkRequestBean linkRequestBean) {
                Queue queue=new Queue(linkRequestBean.getForwardQueue(), true);
                Exchange exchange=SpringUtil.getBean("evForwardRawDataChange");
                rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("forwardRoutingKey_"+linkRequestBean.getForwardServer()).noargs());
                return DefaultHttpResultFactory.success("创建平台链路成功。", Boolean.TRUE);
        }
    
  • 相关阅读:
    C笔记:引用调用,通过指针传递
    新一代蒸馏算法
    简单2D几何求交点
    第二十四章·策略模式
    毕业设计——基于OpenCV和数字图像处理的图像识别项目——停车场车位识别
    【youcans 的 OpenCV 例程 300篇】244. 特征检测之 BRIEF 特征描述
    记一次 处理MIUI 13 疯狂杀进程问题
    Unity3D下如何实现跨平台低延迟的RTMP、RTSP播放
    QT在scrollArea中添加按钮,可滚动
    前端工作小结80-title写活
  • 原文地址:https://blog.csdn.net/u010833154/article/details/127119843