msk提供了两种加密方式
创建集群时可以指定加密方式,参数如下
aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3
// encryptioninfo.json
{
"EncryptionAtRest": {
"DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"
},
"EncryptionInTransit": {
"InCluster": true,
"ClientBroker": "TLS"
}
}
查看证书位置
$ pwd
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
$ ls -al ../../../../../../../etc/pki/java/cacerts
lrwxrwxrwx 1 root root 40 May 8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
$ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks
测试tls加密,创建client.properties
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
列出端点
$ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
{
"BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092",
"BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094",
"BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096",
"BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
}
测试tls连接
$ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets
first
# 连接string端点报错
$ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list
[2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)
指定iam的客户端配置,之后连接tls端口会报错
Unauthenticated
用的,并且如果开了iam认证会失败./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
[2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
[2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html
kafka客户端配置,https://kafka.apache.org/documentation/#security_configclients
msk可选的访问控制/加密组合如下,访问控制方式决定了能够选择的加密方式
Authentication | Client-broker encryption options | Broker-broker encryption |
---|---|---|
Unauthenticated | TLS, PLAINTEXT, TLS_PLAINTEXT | Can be on or off |
mTLS | TLS, TLS_PLAINTEXT | Must be on |
SASL/SCRAM | TLS | Must be on |
SASL/IAM | TLS | Must be on |
集群完毕后提供了多种连接终端节点
端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html
采取Unauthenticated方式,客户端使用PLAINTEXT
查找bootstrap-server端点
(可选)在bin/client.properties
中加入客户端配置
security.protocol=PLAINTEXT
测试连接,不需要特意配置tls连接
./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list
java客户端开启ssl连接
// 开启 tls 连接
properties.put("security.protocol", "SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");
// 创建kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
msk对kafka的源码进行了修改,允许使用iam进行认证,访问事件会发送到cloudtrail中。注意事项
不适用于zk节点
开启iam认证后,allow.everyone.if.no.acl.found
配置无效
使用iam认证后创建的kafka acl(存储在zk中),对iam认证无效
client和broker之间必须启用tls加密
和连接kafka相关的权限以kafka-cluster
作为前缀,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html
需要使用9098和9198端口
需要在客户端配置如下参数
# config/client.properties
# ssl.truststore.location= # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
awsProfileName="admin";
下载客户端依赖jar到/libs目录下
https://github.com/aws/aws-msk-iam-auth/releases
aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .
- 1
测试连接
./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list
可能出现以下报错
./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics
(kafka.admin.TopicCommand$)
[2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
at java.lang.Thread.run(Thread.java:750)
指定client配置后成功连接
[profile prod]
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --command-config client.properties --list
[2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__amazon_msk_canary
__consumer_offsets
创建topic
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --command-config client.properties --topic first --create --partitions 2 --replication-factor 2
发送消息
./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --producer.config client.properties --topic first
消费信息
./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first
加入依赖
software.amazon.msk aws-msk-iam-auth 1.0.0
- 1
- 2
- 3
- 4
- 5
// 完整配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "AWS_MSK_IAM");
properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
properties.put("awsProfileName","admin");
相关报错
// 没有导上面包的报错如下
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
... 4 more
// 如果没有找到凭证
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,
目前中国区不可用,需要依赖于Private CA
https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html
使用secret manager保存username和password
创建secret
名称必须以AmazonMSK_
开头
不能使用默认kms加密secret
密钥内容必须为以下格式
{
"username": "alice",
"password": "alice-secret"
}
创建配置文件users_jaas.conf
,导出为环境变量
# KafkaClient首字母大写
cat > /tmp/users_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
EOF
export KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf
bin目录下创建客户端配置文件
cat > client_sasl.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
EOF
链接集群
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096 --command-config client_sasl.properties --list
相关报错
# 密码错误
[2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
java代码连接配置
System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512"); //仅支持SCRAM-SHA-512