• SpringBoot第十三篇:同时集成华为RC6.5.1安全版kafka和原生kafka,通过配置文件动态控制


    背景

    最近又收到新需求,原本项目已经集成了开源kafka进行功能开发,但是又因为开源的kafka不稳定,也不安全,所以就要求要集成华为RC6.5.1安全版kafka,并且能够跟开源的kafka进行动态切换,想用哪个用哪个。。

    注意:kafka集成是通过bean管理,所以有注册bean的操作

    废话不多说,开搞,我把我经历的踩坑和埋坑经验分享给有需要的人

    解决了哪些问题

    1.如何通过配置文件控制初始化注册哪个版本的kafka的bean

    简单说怎么解让springboot在启动时动态根据配置文件的配置项确定初始化注册指定版本的kafka的bean呢?
    
    • 1

    使用@Bean+@ConditionalOnPoroperty+@ConditionalOnMissingBean做到可通过配置文件进行动态初始化

    示例:

    @Configuration
    public KafkaInitAutoConfigure{
    
    	@Bean
    	@ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="apache")
    	@ConditionalOnMissingBean({KafkaInitTemplate.class,ApacheKafkaTemplate.class})
    	public KafkaInitTemplate apacheKafkaTemplateInit(){
    		//初始化apache开源kafka的逻辑
    	}
    
    	@Bean
    	@ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="huawei")
    	@ConditionalOnMissingBean({KafkaInitTemplate.class,HuaweiKafkaTemplate.class})
    	public HuaweiKafkaTemplate huaweiKafkaTemplateInit(){
    		//初始化华为安全版kafka的逻辑
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    kafka:
    	# 选择使用开源版的kafka
    	type: apache
    	# 选择使用华为安全版的kafka
    	# type: huawei
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.认证文件读取问题

    使用华为安全版RC6.5.1,需要使用krb5.confuser.keytab、以及jass.conf文件
    jass.conf文件可代码生成,也可自己创建填写,内容格式如下:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="src/main/resources/user.keytab"
    principal="developuser"
    useTicketCache=false
    storeKey=true
    debug=true;
    refreshKrb5Config=true;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中keyTabuser.keytab文件的绝对路径,principal是认证用户.
    就如jass.conf文件中keytab的路径要求是一个绝对路径,所以,你的项目如果打成jar包去运行的话,就得考虑把这个认证文件放在一个固定的路径。
    如果你是上k8s,也不用担心,挂载对应的路径去读取就好了。

    3.运行报错:could not login: the client is being …

    如果运行时初始化kafka生产者出现这个错误,一定是你的认证文件不正确,或者jass.conf中的配置信息填写不正确

    • 仔仔细细确认程序读取的那两个认证文件是否正确
    • 仔仔细细确认jass.conf中配置的user.keytab路径是否正确
    • 仔仔细细确认jass.conf中配置的principal是否正确

    4. 运行报错:Clock skew too great (37) - PROCESS_TGS

    原因就是:客户端和服务器系统时间相隔超过5分钟

    确认下两个系统之间的时间之差吧,通过相应的命令修改好即可
    注意:k8s启动的服务是看配置的时区是什么,即timezone,并不是所谓的系统时间。

    5. 运行报错:Server not found in Kerberos database (7) - LOOKING_UP_SERVER

    原因:是因为kafka-clients版本问题

    要使用华为提供的kafka-clients,它兼容开源的kafka-clients,不用担心使用它,就不能切换到开源版的kafka

    完整的【同时集成华为RC6.5.1安全版kafka和原生kafka,通过配置文件动态控制】的代码如下

    kafka自动装配类:KafkaInitAutoConfigure.java
    作用:用于项目启动时,根据指定配置初始化指定类型的kafkaTemple的bean,以便在各个service层使用。同一返回KafkaInitTemplate便于统一使用KafkaInitTemplate去进行kafka的生产和消费,关键在于底层的生产和消费使用不同的版本kafka即可,不需要把所有的类型都引用一遍

    @Configuration
    @EnableConfigurationProperties({KafkaInitProperties.class,HuaweiKafkaProperties.class})
    @AutoConfigureAfter(KafkaAutoConfiguration.class)
    public KafkaInitAutoConfigure{
    	
    	private static final Logger logger = LoggerFactory.getLogger(KafkaInitAutoConfigure.class)
    
    	@Bean
    	@ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="apache")
        @ConditionalOnMissingBean({KafkaInitTemplate.class,ApacheKafkaTemplate.class})
    	public KafkaInitTemplate apacheKafkaTemplateInit(KafkaTemplate kafkaTemplate, ComsumerFactory consumerFactory, KafkaInitProperties kafkaInitProperties) throws Exception{
    		//初始化apache开源kafka的逻辑
    		logger.info("初始化apache的kafka");
    		KafkaInitTemplate kafkaInitTemplate = new KafkaInitTemplate();
    		kafkaInitTemplate.setKafkaInitProperties(kafkaInitProperties);
    		kafkaInitTemplate.setKafkaTemplate(kafkaTemplate);
    		kafkaInitTemplate.setComsumerFactory(consumerFactory);
    		kafkaInitTemplate.afterPropertiesSet();
    		return  kafkaInitTemplate;
    	}
    
    	@Bean
    	@ConditionalOnPoroperty(prefix="kafka", name="type", havingValue="huawei")
    	@ConditionalOnMissingBean({KafkaInitTemplate.class,HuaweiKafkaTemplate.class})
    	public KafkaInitTemplate huaweiKafkaTemplateInit(HuaweiKafkaProperties huaweiKafkaProperties,KafkaInitProperties kafkaInitProperties) throws Exception{
    		//初始化华为安全版kafka的逻辑
    		logger.info("初始化apache的kafka");
    		KafkaInitTemplate kafkaInitTemplate = new KafkaInitTemplate();
    		kafkaInitTemplate.setKafkaInitProperties(kafkaInitProperties);
    		HuaweiKafkaTemplate huaweiKafkaTemplate = new HuaweiKafkaTemplate();
    		kafkaInitTemplate.setHuaweiKafkaTemplate(huaweiKafkaTemplate);
    		kafkaInitTemplate.afterPropertiesSet();
    		return  kafkaInitTemplate;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    统一处理的kafka处理类:KafkaInitTemplate.java
    作用:提供一个各个类型的kafka装饰类,提供各个类型kafka的get和set方法,以及配置文件的get和set。提供send(生产)和recieve(消费)两个方法,recieve(消费)提供一个监听器,你可以通过代码起一个线程异步实时监听获取kafka消息

    public class KafkaInitTemplate implements InitalizingBean{
    	private KafkaInitProperties kafkaInitProperties;
    	private KafkaTemplate<String,String> kafkaTemplate;
    	private ConsumerFactory consumerFactory;
    	private HuaweiKafkaTemplate huaweiKafkaTemplate;
    	
    	@Override
    	public void afterPropertiesSet() throws Exception{
    		switch(kafkaInitProperties.getKafkaType){
    			case "huawei":
    				Assert.state(huaweiKafkaTemplate != null, "huaweiKafkaTemplate未初始化");
    				break;
    			default:
    			    Assert.state(kafkaTemplate != null, "kafkaTemplate未初始化");
    				Assert.state(consumerFactory != null, "consumerFactory未初始化");	
    		}
    	}
    
    	public KafkaTemplate<String,String> getKafkaTemplate(){
    		Assert.state(kafkaTemplate != null, "kafkaTemplate未初始化");
    		return kafkaTemplate;
    	}
    	
    	public void setKafkaTemplate(KafkaTemplate<String,String> kafkaTemplate){
    		this.kafkaTemplate = kafkaTemplate;
    	}
    
    	public KafkaInitProperties getKafkaInitProperties(){
    		return kafkaInitProperties;
    	}
    	
    	public void setKafkaInitProperties(KafkaInitProperties kafkaInitProperties){
    		this.kafkaInitProperties = kafkaInitProperties;
    	}
    
    	public ConsumerFactory getConsumerFactory(){
    		Assert.state(consumerFactory != null, "consumerFactory未初始化");
    		return consumerFactory;
    	}
    	
    	public void setConsumerFactory(ConsumerFactory consumerFactory){
    		this.consumerFactory = consumerFactory;
    	}
    
    	public HuaweiKafkaTemplate getHuaweiKafkaTemplate(){
    		Assert.state(huaweiKafkaTemplate != null, "huaweiKafkaTemplate未初始化");
    		return huaweiKafkaTemplate;
    	}
    	
    	public void setKafkaTemplate(HuaweiKafkaTemplate huaweiKafkaTemplate){
    		this.huaweiKafkaTemplate = huaweiKafkaTemplate;
    	}
    }
    	/**
    	* 往指定topic生产发布消息
    	*/
    	public void send(String topic,String key, Object value){
    		Assert.notNull(topic,"topic不能为空");
    		Assert.notNull(key,"key不能为空");
    		Assert.notNull(value,"value不能为空");
    		switch(this.kafkaInitProperties.getKafkaType){
    			case "huawei":
    				this.huaweiKafkaTemplate.send(topic,key,JSON.toJSONString(value));
    				break;
    			default:
    				this.kafkaTemplate.send(topic,key,JSON.toJSONString(value));	
    		}
    	}
    
    	public AbstractMessageListenerContainer<String,String> receive(String topic, String groupId,MessageListener messageListener){
    		Assert.notNull(topic,"topic不能为空");
    		Assert.notNull(groupId,"groupId不能为空");
    		Assert.notNull(messageListener,"messageListener不能为空");
    		ContainerProperties containerProperties = new ContainerProperties(new String[]{topic});
    		containerProperties.setGroupId(groupId);
    		containerProperties.setMessageListener(messageListener);
    		KafkaMessageListenerContainer<String,String> messageListenerContainer;
    		switch(this.kafkaInitProperties.getKafkaType){
    			case "huawei":
    				messageListenerContainer = new KafkaMessageListenerContainer(this.huaweiKafkaTemplate.createConsumerFactory(), containerProperties);
    				break;
    			default:
    				messageListenerContainer = new KafkaMessageListenerContainer(this.consumerFactory, containerProperties);	
    		}
    		messageListenerContainer.setBeanName(topic + "_" + groupId);
    		messageListenerContainer.start();
    		return messageListenerContainer;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    至于HuaweiKafkaTemplate.java,参照从华为集群下载的kafka客户端示例中,初始化即可,下面就不一一手打了,太累了,提供一些照片给大家看看

    • 这是构造方法里面初始化了一个生产者
      在这里插入图片描述
    • 这是创建一个消费者工厂的方法
      在这里插入图片描述
    • 这是用消费者发送kafka消息的方法
      在这里插入图片描述
    • 这是安全认证的方法
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述

    在这里插入图片描述

    如果你通过自己生成jass.conf文件,就没要调用writeJassFile方法

    以上只是提供一个实现思路,和一些问题的解决方案,大家在实操过程中可以参考,切不可生搬硬套,有问题可以问我,我会及时回复大家

  • 相关阅读:
    Net Core中使用EF Core连接Mysql数据库
    AcWing 800. 数组元素的目标和——算法基础课题解
    力扣每日一题136:只出现一次的数字
    [python] 离线安装包-实践记录
    【仿牛客网笔记】Spring Boot实践,开发社区登录模块-账号设置,检查登录
    New Concept English3 Lesson 3. An unknown Goddess【精讲学习笔记】
    使用WinDbg进行动态调试
    山东大学数字图像处理实验(五)
    Linux:进程间通信
    Flowable工作流中会签节点处理回退并清除审批意见
  • 原文地址:https://blog.csdn.net/zhangtao0417/article/details/125466693