• 【kafka】序列化器与反序列化器


    序列化器

    Serializer接口

    在配置kafka生产者客户端参数时,必须要指定keyvalue的序列化方式

    kafka客户端序列化的接口是org.apache.kafka.common.serialization.Serializer,并且kafka为我们提供了String,ByteArray,ByteBuffer等等序列化器:

    img

    Serializer接口源码如下:

    public interface Serializer<T> extends Closeable {
    
        default void configure(Map<String, ?> configs, boolean isKey) {
            // intentionally left blank
        }
    
        byte[] serialize(String topic, T data);
    
        default byte[] serialize(String topic, Headers headers, T data) {
            return serialize(topic, data);
        }
    
        @Override
        default void close() {
            // intentionally left blank
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    主要包括三个方法:

    (1)configure() 方法用来配置当前类

    (2)serialize()方法用来执行序列化操作

    (3)close()方法用来关闭当前序列化器

    我们可以结合StringSerialize来理解一下:

    public class StringSerializer implements Serializer<String> {
        private String encoding = StandardCharsets.UTF_8.name();
        // configure方法主要用来确定编码方式,如果没有特意的配置,默认的UTF_8
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("serializer.encoding");
            if (encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        // serialize则把String类型转换为byte[]
        @Override
        public byte[] serialize(String topic, String data) {
            try {
                if (data == null)
                    return null;
                else
                    return data.getBytes(encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    自定义序列化器

    如果kafka自带的序列化器不能满足我们的需求,则我们可以实现org.apache.kafka.common.serialization.Serializer接口来自定义序列化器,下面是一个自定义类序列化器的例子:

    假设我们要发送的是User对象:

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class User {
    
        private String name;
        private Integer age;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们可以自定义UserSerializer如下:

    public class UserSerializer implements Serializer<User> {
        
        @Override
        public byte[] serialize(String topic, User data) {
            if (data==null){
                return null;
            }
            String json = JSON.toJSONString(data);
            return json.getBytes(StandardCharsets.UTF_8);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我们自定义把User对象转换为JSON字符串,然后把JSON字符串转换为字节数组返回

    要使用自定义的序列化器,配置value.serializer即可

    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
    
    • 1

    反序列化器

    和序列化器对应的,是反序列化器,消费者需要使用反序列化器把从kafka中收到的字节数组转换为相应的对象,也就是说序列化器和反序列化器要对应起来才能转换成功。

    反序列化器的接口是org.apache.kafka.common.serialization.Deserializer,同样的kafka也提供了一些常用的反序列化器:

    img

    我们也可以自定义反序列化器,以上面的User为例,自定义反序列化器将byte数组转换为User对象:

    public class UserDeserializer implements Deserializer<User> {
        @Override
        public User deserialize(String topic, byte[] data) {
            if (data==null){
                return null;
            }
            String json = new String(data, StandardCharsets.UTF_8);
            return JSON.parseObject(json, User.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后配置消费者客户端的value.deserializer参数:

    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
    
    • 1

    需要注意的是,上面只是配置了value的序列化与反序列化,key还是StringSerializer。同样的key也是可以自定义配置的

  • 相关阅读:
    npm ERR! code ERESOLVE错误解决
    Kafka3.0.0版本——消费者(Sticky分区分配策略以及再平衡)
    CSS 常用样式background背景属性
    uniapp-轮播图点击预览功能
    Compose原理-视图和数据双向绑定的原理
    大数据:Flume安装部署和配置
    工业网关它的功能是什么
    猿创征文|【算法】一文吃透常见排序算法
    隧道代理 vs 普通代理:哪种更适合您的爬虫应用?
    好用的数据恢复软件EasyRecovery2023最新版
  • 原文地址:https://blog.csdn.net/qq_43460095/article/details/126307315