• 【kafka专栏】生产者客户端自定义序列化器



    kafka生产者的数据生产流程中,有三个环节是我们可以自定义的,如下图所示。本文为大家介绍如何自定义kafka生产者序列化器。

    在这里插入图片描述

    什么叫序列化与反序列化?说白了就是把对象转成可传输、可存储的格式(json、xml、二进制、甚至自定义格式)叫做序列化。反序列化顾名思义,就是将可传输、可存储的格式转换成对象。

    一、序列化器接口

    kafka客户端生产者序列化接口如下,如果我们需要实现自定义数据格式的序列化,需要自定义一个类实现该接口。

    package org.apache.kafka.common.serialization;
    
    import org.apache.kafka.common.header.Headers;
    
    import java.io.Closeable;
    import java.util.Map;
    
    /**
     * 将对象转成二进制数组的接口序列化实现类
     */
    public interface Serializer<T> extends Closeable {
    
        /**
         * 参数configs会传入生产者配置参数,
         * 序列化器实现类可以根据生产者参数配置影响序列化逻辑
         * isKey布尔型,表示当前序列化的对象是不是消息的key,如果不是key就是value
         */
        default void configure(Map<String, ?> configs, boolean isKey) {
            // intentionally left blank
        }
    
        /**
         * 重要方法将对象data转换为二进制数组
         */
        byte[] serialize(String topic, T data);
    
    
        default byte[] serialize(String topic, Headers headers, T data) {
            return serialize(topic, data);
        }
    
        /**
         * 关闭序列化器
         * 此方法的实现必须是幂等的,因为可能被调多次
         */
        @Override
        default void close() {
            // intentionally left blank
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    二、使用Jackson进行对象序列化

    本文打算使用Jackson做序列化实现,将对象转换成JSON用于网络传输。之所以使用Jackson是因为它是SpringBoot默认的JSON处理框架,本专栏后面会为大家介绍《SpringBoot集成kafka》,二者可以无缝衔接。使用Jackson实现对象序列化,通过maven坐标引入jackson

    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.9.7</version> 
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义一个类,作为我们的对象序列化的目标类。

    public class User {
        private String firstName;
        private String lastName;
        private int age;
    
        @Override
        public String toString() {
            return "User{" +
                    "firstName='" + firstName + '\'' +
                    ", lastName='" + lastName + '\'' +
                    ", age=" + age +
                    '}';
        }
    
        //省略若干get、set方法
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    下面我们实现序列化器接口Serializer,实现方式很简单,就是将kakfa生产者发送的消息对象User,通过ObjectMapper的writeValueAsBytes转换JSON格式,进而转换成二进制。

    public class MyProducerSerializer implements Serializer<User> {
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) { }
    
        @Override
        public byte[] serialize(String topic, User data) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }
    
        @Override
        public void close() { }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    三、指定生产者序列化器

    kafka生产者消息只能选择一种格式,不能上一条数据是JSON,下一条数据是XML。所以序列化器也只能配置一个,配置方法如下:

    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyProducerSerializer.class.getName());
    
    • 1

    测试方法,使用下面的测试用例完成测试。在MyProducerSerializer的serialize方法中下断点,断点断下证明自定义的序列化器生效。后续文章我们还会结合消费者实现反序列化器,进行进一步的验证。

    @Test
    void testSerializer() throws ExecutionException, InterruptedException {
        User user = new User();
        user.setAge(21);
        user.setFirstName("stephen");
        user.setLastName("curry");
        //将user发往zimug-test这个topic
        KafkaProducer<String, User> producer = new KafkaProducer<>(props);
        RecordMetadata metadata =
                producer.send(new ProducerRecord<>("json-test",user)).get();
        //消息发送成功之后,会获得消息的偏移量信息,成功打印证明消息发送成功
        System.out.println(metadata.offset());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    HTML5提供的文件API
    【JavaScript】Date对象(创建时间对象、常用Date方法总结)
    Pinia(二)了解和使用Store
    30天Python入门(第十七天:深入了解Python中的异常处理)
    CMake中file的使用
    Smart-tools 产品介绍
    ADRC Ardupilot代码分析
    爬虫 — Js 逆向案例四网易云音乐评论
    基于WEB在线音乐工厂的设计与实现
    Python判断循环语法
  • 原文地址:https://blog.csdn.net/hanxiaotongtong/article/details/125561460