• kafka消费消息并对消息进行RSA公钥解密


    SpringBoot版本2.x.x 具体是几,忘记了,是支持application.yml或者是application.properties配置的。当然也可以使用Java配置类。

    以下是使用Java配置类来配置的。

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig  {
        //指定kafka 代理集群地址,多个地址用英文逗号隔开
        private String bootstrapServers="x.x.x.x:xxxx,x.x.x.x:xxxx";
        
        //指定默认消费者group id,消费者监听到的也是这个
        private String groupId="xxx";
        
        //消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读
        private String autoOffsetReset="earliest";
        
        //是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效
        private boolean  enableAutoCommit=true;
        
        //自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)
        private String autoCommitInterval="1000";
        
        //指定消息key和消息体的编解码方式
        private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer";
        
        private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer";
        
        //批量消费每次最多消费多少条信息 可以自己根据业务来配置
        //private String maxPollRecords="50";
        
        //协议类型,为SASL类型
        private String securityProtocol="SASL_PLAINTEXT";
        
        //协议
        private String saslMechanism="SCRAM-SHA-512";
        
        //用户名密码配置
        private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxx\";";
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            //factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
            //props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
            props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
            props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas);
    
            return new DefaultKafkaConsumerFactory<>(props);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    消息的代码

    @Slf4j
    @Service
    public class KafkaConsumerService {
    
    	//单条消费
        @KafkaListener(topics = "xxxx", groupId = "xxxx")
        public void consume(ConsumerRecord<String, String> record) {
            String value = record.value();
    		//业务逻辑
    		...
        }
    
    
    	//批量消费
    	@KafkaListener(topics = "xxxx", groupId = "xxxx")
        public void consume(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
            //...
            //db.batchSave(consumerRecords);//批量插入或者批量更新数据
    		
    		//手动提交
            ack.acknowledge();
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    注意点:spring-kafka,和 kafka-clients 的版本兼容问题。

    RSA公钥解密:

    	
    	 /**
         * 加密算法RSA
         */
        public static final String KEY_ALGORITHM = "RSA";
    
        /**
         * RSA最大解密密文大小
         */
        private static final int MAX_DECRYPT_BLOCK = 128;
    
    	/**
         * RSA公钥解密
         *
         * @param encryptedData 已加密数据(base64编码)
         * @param publicKey     公钥(BASE64编码)
         * @return
         * @throws Exception
         */
        public byte[] decryptByPublicKey(byte[] encryptedData, String publicKey)
                throws Exception {
            byte[] keyBytes = Base64Utils.decode(publicKey.getBytes());
            X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes);
            KeyFactory keyFactory = KeyFactory.getInstance(KEY_ALGORITHM);
            Key publicK = keyFactory.generatePublic(x509KeySpec);
            Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm());
            cipher.init(Cipher.DECRYPT_MODE, publicK);
            int inputLen = encryptedData.length;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            int offSet = 0;
            byte[] cache;
            int i = 0;
            // 对数据分段解密
            while (inputLen - offSet > 0) {
                if (inputLen - offSet > MAX_DECRYPT_BLOCK) {
                    cache = cipher.doFinal(encryptedData, offSet, MAX_DECRYPT_BLOCK);
                } else {
                    cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet);
                }
                out.write(cache, 0, cache.length);
                i++;
                offSet = i * MAX_DECRYPT_BLOCK;
            }
            byte[] decryptedData = out.toByteArray();
            out.close();
            return decryptedData;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    下面是批量消费的实例,可以自己学习一下:
    https://cloud.tencent.com/developer/article/2223134?areaSource=102001.4&traceId=KErHitYK-asG0MQLNTqos

  • 相关阅读:
    Android ImageView视图的七种图片缩放类型
    【面试题-004】ArrayList 和 LinkList区别
    【MTK】【WIFI】手机和综测仪器误差在5db左右误差
    centos安装神通数据库
    栩栩如生,音色克隆,Bert-vits2文字转语音打造鬼畜视频实践(Python3.10)
    Spring Boot系列之条件注解
    ROS机器人应用(3)——程序修改编译与SublimeText 简析
    【STM32】学习笔记-SPI通信
    Java基础面试题【3】
    Hadoop
  • 原文地址:https://blog.csdn.net/hello_cmy/article/details/136254970