• CVE-2023-34040 Kafka 反序列化RCE


    漏洞描述

    Spring Kafka 是 Spring Framework 生态系统中的一个模块,用于简化在 Spring 应用程序中集成 Apache Kafka 的过程,记录 (record) 指 Kafka 消息中的一条记录。

    受影响版本中默认未对记录配置 ErrorHandlingDeserializer,当用户将容器属性 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 设置为 true(默认为 false),并且允许不受信任的源发布到 Kafka 主题中时,攻击者可将恶意 payload 注入到 Kafka 主题中,当反序列化记录头时远程执行任意代码。

    影响版本

    2.8.1 <= Spring-Kafka <= 2.9.10 3.0.0 <= Spring-Kafka <= 3.0.9

    漏洞复现

    这一个漏洞所影响的组件其实是 Spring-Kafka,严格意义上来说并不算是 kafka 的漏洞,应该算是 Spring 的漏洞。

    漏洞前置知识

    先来看一看 SpringBoot 和 Kafka 是怎么完成通讯/消费的

    图片

    工作流程如下

    1、生产者将消息发送到 Kafka 集群中的某个 Broker(也可以是多个) 2、Kafka 集群将消息存储在一个或多个分区中,并为每个分区维护一个偏移量 3、消费者订阅一个或多个主题,并从 Kafka 集群中读取消息。4、消费者按顺序读取每个分区中的消息,并跟踪每个分区的偏移量。

    • • ErrorHandlingDeserializer:是 Kafka中的一种反序列化器(Deserializer),它可以在反序列化过程中处理异常和错误。

    • • checkDeserExWhenKeyNull && checkDeserExWhenValueNull:是 Kafka 中的一种序列化器(Serializer),它可以在序列化过程中检查键(key/value)是否为 null,并在发现值为 null 时抛出异常。

    再简单整理一下漏洞条件

    在受到影响的版本中,默认未对记录配置 ErrorHandlingDeserializer 容器属性 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 设置为 true

    环境搭建

    其中需要我们起一个 Kafka 的服务,用来接收消息,本机上起比较麻烦,可以在 vps 上用 docker 迅速搭建,且需注意,Kafka 要能够接受外连,docker-compose.yml 如下

    1. version: '2'
    2. services:
    3.   zookeeper:
    4.     image: zookeeper
    5.     restart: always
    6.     ports:
    7.       - "2181:2181"
    8.     container_name: zookeeper
    9.   kafka:
    10.     image: wurstmeister/kafka
    11.     restart: always
    12.     ports:
    13.       - "9092:9092"
    14.       - "9094:9094"
    15.     depends_on:
    16.       - zookeeper
    17.     environment:
    18.       KAFKA_ADVERTISED_HOST_NAME: 124.222.21.138
    19.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    20.       KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9094
    21.       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://124.222.21.138:9092,SSL://124.222.21.138:9094
    22.       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL
    23.       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    24.     container_name: kafka

    Spring Kafka 的生产者和消费者可以通过使用 Spring Kafka 提供的 KafkaTemplate 和 ``@KafkaListener` 注解来编写。

    生产者可以使用 KafkaTemplate 来发送消息到 Kafka 集群:

    1. package com.drunkbaby.springkafkatest.controller;  
    2.   
    3. import com.drunkbaby.springkafkatest.common.KafkaInfo;  
    4. import org.springframework.beans.factory.annotation.Autowired;  
    5. import org.springframework.kafka.core.KafkaTemplate;  
    6. import org.springframework.kafka.support.SendResult;  
    7. import org.springframework.util.concurrent.ListenableFuture;  
    8. import org.springframework.web.bind.annotation.PostMapping;  
    9. import org.springframework.web.bind.annotation.RequestMapping;  
    10. import org.springframework.web.bind.annotation.RestController;  
    11.   
    12. import java.time.LocalDateTime;  
    13. import java.util.concurrent.ExecutionException;  
    14.   
    15. @RestController  
    16. @RequestMapping("/producer")  
    17. public class ProducerController {  
    18.     @Autowired  
    19.     private KafkaTemplate kafkaTemplate;  
    20.   
    21.     @PostMapping("/fireAndForget")  
    22.     public String fireAndForget() {  
    23.         kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now());  
    24.         return "success";  
    25.     }  
    26. }

    消费者可以使用 @KafkaListener 注解来监听 Kafka 集群中的消息:

    1. package com.drunkbaby.springkafkatest.consumer;  
    2.   
    3. import com.drunkbaby.springkafkatest.common.KafkaInfo;  
    4. import org.springframework.kafka.annotation.KafkaListener;  
    5. import org.springframework.messaging.MessageHeaders;  
    6. import org.springframework.messaging.handler.annotation.Headers;  
    7. import org.springframework.messaging.handler.annotation.Payload;  
    8. import org.springframework.stereotype.Component;  
    9.   
    10. @Component  
    11. public class Consumer {  
    12.     @KafkaListener(topics = KafkaInfo.TOPIC_WELCOME)  
    13.     public String consumer2(@Payload String message, @Headers MessageHeaders headers) {  
    14.         System.out.println("消费者(注解方式):收到消息==> ");  
    15.         System.out.println("  message:" + message);  
    16.         System.out.println("  headers:");  
    17.         headers.keySet().forEach(key -> System.out.println("    " + key + ":" + headers.get(key)));  
    18.         return "success";  
    19.     }

    连接成功

    图片

    访问 http://localhost:8083/producer/sync 发送一条记录

    图片

    构造 payload

    实际影响到的是 Consumer,且 Consumer 要设置 checkDeserExWhenKeyNull 或 checkDeserExWhenValueNull 为 true

    1. ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();  
    2. factory.getContainerProperties().setCheckDeserExWhenValueNull(true);  
    3. factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);

    payload 参考 https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040

    漏洞分析

    主要是来看反序列化的部分

    断点会先走到 org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader 方法,它这里面会获取到 PoC 中的 KEY_DESERIALIZER_EXCEPTION_HEADER,并将其作为 headers

    图片

    往下跟进 byteArrayToDeserializationException() 方法,这里就直接到反序列化的部分了,而在反序列化之前做了一次 resolveClass() 的校验。

    图片

    而这里的 resolveClass() 校验是一次性的,这就代表我们可以构造其他的 Payload,如 CC 链等,证实是可以打通的

    图片

    之后便会进入到对应类的 readObject() 方法

    漏洞修复

    https://github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9d

    相当于把这里的 header 头加黑了

  • 相关阅读:
    Spring Boot项目中使用jdbctemplate 操作MYSQL数据库
    AWS SAA C003 S3 Type
    rem布局
    Linux添加root权限:is not in the sudoers file解决方法
    【AtomicReference、AtomicStampedReference】常规用法
    信奥中的数学:集合与子集
    艾美捷热转移稳定性检测试剂盒参数&文献参考
    uni-app小程序使用DCloud(插件市场)流程
    Mysql(数据库)知识详解终章~{日志,Mycat}
    代码随想录算法训练营第三十八天【动态规划part01】 | 动态规划理论基础、509. 斐波那契数、70. 爬楼梯、746. 使用最小花费爬楼梯
  • 原文地址:https://blog.csdn.net/YJ_12340/article/details/134251002