博主最近在做安全治理,遇到了相当多的坑,现在我进行整理,提供一个可行的kafka的SSL访问
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
这一步会生成一个有效期为365天,别名localhost,存有你的私钥和证书的文件
接着调用
keytool -importkeystore -srckeystore server.keystore.jks -destkeystore server.keystore.jks -deststoretype pkcs12
这个主要是为了转为pkcs12格式,便于后续转为X.509格式,不然之后windows平台的java无法导入证书
博主跳过了ca签名证书的步骤,有需要可自己添加,并将ca对应添加到truststore中(这一步可以不做,但是安全性会降低),详情自行搜索。
在server.properties中添加如下配置
ssl.keystore.location=/home/wusipeng/kafka_2.13-3.2.0/server.keystore.jks //生成文件的路径
ssl.keystore.password=123456 //文件密码
ssl.key.password=123456 //文件内部的key密码
ssl.endpoint.identification.algorithm= //忽略主机信息认证
ssl.client.auth=none //忽略客户端认证
listeners=PLAINTEXT://172.21.63.219:9091,SSL://172.21.63.219:9092 //开启SSL访问端口
advertised.listeners=PLAINTEXT://172.21.63.219:9091,SSL://172.21.63.219:9092
配置好后启动(先启动zookeeper)
首先转为X.509格式
sudo openssl pkcs12 -clcerts -nokeys -out abb.pem -in server.keystore.jks
sudo openssl x509 -outform der -in abb.pem -out abb.der
在kafka服务端生成了名称为abb.der的文件
拷贝到客户端主机
然后导入cacerts文件中
keytool.exe -import -alias loclahost -file C:\Users\****\Desktop\abb.der -keystore C:\Users、****\.jdks\azul-1.8.0_302\jre\lib\security\cacerts
注意这里提示
输入的密码不是你keystore的密码,请输入“changeit”,这是固定的
到这一步配置就差不多了。
public class KafkaTest implements Serializable {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.63.219:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test123");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
Map<String, String> map = new HashMap<>();
map.put("name", "abb");
kafkaProducer.send(new ProducerRecord<String, String>("wusipeng_test_abcd", "boy", "abc"));
kafkaProducer.flush();
kafkaProducer.close();
}
}
这两个是关键
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
一个是使用SSL安全协议,一个是跳过主机信息认证。必须加