什么叫序列化与反序列化?说白了就是把对象转成可传输、可存储的格式(json、xml、二进制、甚至自定义格式)叫做序列化。反序列化顾名思义,就是将可传输、可存储的格式转换成对象。
kafka客户端生产者序列化接口如下,如果我们需要实现自定义数据格式的序列化,需要自定义一个类实现该接口。
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.header.Headers;
import java.io.Closeable;
import java.util.Map;
/**
* 将对象转成二进制数组的接口序列化实现类
*/
public interface Serializer<T> extends Closeable {
/**
* 参数configs会传入生产者配置参数,
* 序列化器实现类可以根据生产者参数配置影响序列化逻辑
* isKey布尔型,表示当前序列化的对象是不是消息的key,如果不是key就是value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* 重要方法将对象data转换为二进制数组
*/
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
/**
* 关闭序列化器
* 此方法的实现必须是幂等的,因为可能被调多次
*/
@Override
default void close() {
// intentionally left blank
}
}
本文打算使用Jackson做序列化实现,将对象转换成JSON用于网络传输。之所以使用Jackson是因为它是SpringBoot默认的JSON处理框架,本专栏后面会为大家介绍《SpringBoot集成kafka》,二者可以无缝衔接。使用Jackson实现对象序列化,通过maven坐标引入jackson
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
定义一个类,作为我们的对象序列化的目标类。
public class User {
private String firstName;
private String lastName;
private int age;
@Override
public String toString() {
return "User{" +
"firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", age=" + age +
'}';
}
//省略若干get、set方法
}
下面我们实现序列化器接口Serializer,实现方式很简单,就是将kakfa生产者发送的消息对象User,通过ObjectMapper的writeValueAsBytes转换JSON格式,进而转换成二进制。
public class MyProducerSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) { }
@Override
public byte[] serialize(String topic, User data) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
@Override
public void close() { }
}
kafka生产者消息只能选择一种格式,不能上一条数据是JSON,下一条数据是XML。所以序列化器也只能配置一个,配置方法如下:
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyProducerSerializer.class.getName());
测试方法,使用下面的测试用例完成测试。在MyProducerSerializer的serialize方法中下断点,断点断下证明自定义的序列化器生效。后续文章我们还会结合消费者实现反序列化器,进行进一步的验证。
@Test
void testSerializer() throws ExecutionException, InterruptedException {
User user = new User();
user.setAge(21);
user.setFirstName("stephen");
user.setLastName("curry");
//将user发往zimug-test这个topic
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
RecordMetadata metadata =
producer.send(new ProducerRecord<>("json-test",user)).get();
//消息发送成功之后,会获得消息的偏移量信息,成功打印证明消息发送成功
System.out.println(metadata.offset());
}