• aws msk加密方式和问控制连接方式


    msk加密方式

    msk提供了两种加密方式

    • 静态加密
    • 传输中加密

    创建集群时可以指定加密方式,参数如下

    aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3
    
    // encryptioninfo.json
    {
       "EncryptionAtRest": {
           "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"
        },
       "EncryptionInTransit": {
            "InCluster": true,
            "ClientBroker": "TLS"
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    查看证书位置

    $ pwd
    /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
    $ ls -al ../../../../../../../etc/pki/java/cacerts
    lrwxrwxrwx 1 root root 40 May  8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
    $ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试tls加密,创建client.properties

    security.protocol=SSL
    ssl.truststore.location=/tmp/kafka.client.truststore.jks
    
    • 1
    • 2

    列出端点

    $ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
    {
        "BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092",
        "BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094",
        "BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096",
        "BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    测试tls连接

    $ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
    __amazon_msk_canary
    __consumer_offsets
    first
    
    # 连接string端点报错
    $ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list
    
    [2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    指定iam的客户端配置,之后连接tls端口会报错

    • 可见这个tls broker连接仅仅是给Unauthenticated用的,并且如果开了iam认证会失败
    ./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
    
    [2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
    [2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
    
    • 1
    • 2
    • 3
    • 4

    msk访问控制

    https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html

    kafka客户端配置,https://kafka.apache.org/documentation/#security_configclients

    msk可选的访问控制/加密组合如下,访问控制方式决定了能够选择的加密方式

    AuthenticationClient-broker encryption optionsBroker-broker encryption
    UnauthenticatedTLS, PLAINTEXT, TLS_PLAINTEXTCan be on or off
    mTLSTLS, TLS_PLAINTEXTMust be on
    SASL/SCRAMTLSMust be on
    SASL/IAMTLSMust be on

    集群完毕后提供了多种连接终端节点

    端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html

    在这里插入图片描述

    plaintext

    采取Unauthenticated方式,客户端使用PLAINTEXT

    在这里插入图片描述

    查找bootstrap-server端点

    在这里插入图片描述

    (可选)在bin/client.properties中加入客户端配置

    security.protocol=PLAINTEXT
    
    • 1

    测试连接,不需要特意配置tls连接

    ./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list
    
    • 1

    java客户端开启ssl连接

    // 开启 tls 连接
    properties.put("security.protocol", "SSL");
    properties.put("sasl.mechanism", "SCRAM-SHA-512");
    
    // 创建kafka对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    IAM认证

    msk对kafka的源码进行了修改,允许使用iam进行认证,访问事件会发送到cloudtrail中。注意事项

    • 不适用于zk节点

    • 开启iam认证后,allow.everyone.if.no.acl.found配置无效

    • 使用iam认证后创建的kafka acl(存储在zk中),对iam认证无效

    • client和broker之间必须启用tls加密

    • 和连接kafka相关的权限以kafka-cluster作为前缀,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html

    • 需要使用9098和9198端口

    shell连接

    需要在客户端配置如下参数

    # config/client.properties
    # ssl.truststore.location= # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    awsProfileName="admin";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    下载客户端依赖jar到/libs目录下

    https://github.com/aws/aws-msk-iam-auth/releases

    aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .
    
    • 1

    测试连接

    ./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list
    
    • 1

    可能出现以下报错

    ./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list                     Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
    [2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
    Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics
     (kafka.admin.TopicCommand$)
    [2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
    java.lang.OutOfMemoryError: Java heap space
            at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
            at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
            at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
            at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
            at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
            at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
            at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
            at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
            at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
            at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
            at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
            at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
            at java.lang.Thread.run(Thread.java:750)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    指定client配置后成功连接

    • .aws/config中的profile需要写成[profile prod]
    ./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --list
    [2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
    __amazon_msk_canary
    __consumer_offsets
    
    • 1
    • 2
    • 3
    • 4

    创建topic

    ./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --topic first --create --partitions 2 --replication-factor 2
    
    • 1

    发送消息

    ./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --producer.config client.properties --topic first
    
    • 1

    消费信息

    ./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first
    
    • 1

    java代码连接

    加入依赖

    
     software.amazon.msk
     aws-msk-iam-auth
     1.0.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    // 完整配置
    properties.put("security.protocol", "SASL_SSL");
    properties.put("sasl.mechanism", "AWS_MSK_IAM");
    properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
    properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
    properties.put("awsProfileName","admin");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    相关报错

    // 没有导上面包的报错如下
    Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModule
            at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
            at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
            at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
            at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
            at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
            at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
            ... 4 more
    
    // 如果没有找到凭证
    [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
    [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    mTLS

    目前中国区不可用,需要依赖于Private CA

    SASL/SCRAM

    https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html

    使用secret manager保存username和password

    在这里插入图片描述

    创建secret

    • 名称必须以AmazonMSK_开头

    • 不能使用默认kms加密secret

      在这里插入图片描述

    • 密钥内容必须为以下格式

      {
        "username": "alice",
        "password": "alice-secret"
      }
      
      • 1
      • 2
      • 3
      • 4

      在这里插入图片描述

    shell连接

    创建配置文件users_jaas.conf,导出为环境变量

    # KafkaClient首字母大写
    cat > /tmp/users_jaas.conf << EOF
    KafkaClient {
       org.apache.kafka.common.security.scram.ScramLoginModule required
       username="alice"
       password="alice-secret";
    };
    EOF
    
    export KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    bin目录下创建客户端配置文件

    cat > client_sasl.properties << EOF
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    ssl.truststore.location=/tmp/kafka.client.truststore.jks
    EOF
    
    • 1
    • 2
    • 3
    • 4
    • 5

    链接集群

    ./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096  --command-config client_sasl.properties --list
    
    • 1

    相关报错

    # 密码错误
    [2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
    
    • 1
    • 2

    java代码连接

    java代码连接配置

    System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
    properties.put("security.protocol", "SASL_SSL");
    properties.put("sasl.mechanism", "SCRAM-SHA-512"); //仅支持SCRAM-SHA-512
    
    • 1
    • 2
    • 3
  • 相关阅读:
    C语言第二十七弹---内存函数
    leetcode 698. 划分为k个相等的子集
    vue - 解决vue : 无法加载文件 C:\Users\hp\AppData\Roaming\npm\vue.ps1,因为在此系统上禁
    [附源码]Python计算机毕业设计大学生社团管理系统
    ffmpeg常用命令
    1081 Rational Sum
    开源软件安全与应对策略探讨 - Java 机密计算技术应用实践
    【C++】侯捷:C++面向对象-笔记-02
    2023.11.22使用flask做一个简单的图片浏览器
    阻塞队列实现
  • 原文地址:https://blog.csdn.net/sinat_41567654/article/details/139204791