在配置kafka生产者客户端参数时,必须要指定key
和value
的序列化方式
kafka客户端序列化的接口是org.apache.kafka.common.serialization.Serializer
,并且kafka为我们提供了String,ByteArray,ByteBuffer等等序列化器:
Serializer
接口源码如下:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
default void close() {
// intentionally left blank
}
}
主要包括三个方法:
(1)configure()
方法用来配置当前类
(2)serialize()
方法用来执行序列化操作
(3)close()
方法用来关闭当前序列化器
我们可以结合StringSerialize
来理解一下:
public class StringSerializer implements Serializer<String> {
private String encoding = StandardCharsets.UTF_8.name();
// configure方法主要用来确定编码方式,如果没有特意的配置,默认的UTF_8
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
// serialize则把String类型转换为byte[]
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}
如果kafka自带的序列化器不能满足我们的需求,则我们可以实现org.apache.kafka.common.serialization.Serializer
接口来自定义序列化器,下面是一个自定义类序列化器的例子:
假设我们要发送的是User
对象:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String name;
private Integer age;
}
我们可以自定义UserSerializer
如下:
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User data) {
if (data==null){
return null;
}
String json = JSON.toJSONString(data);
return json.getBytes(StandardCharsets.UTF_8);
}
}
我们自定义把User
对象转换为JSON字符串,然后把JSON字符串转换为字节数组返回
要使用自定义的序列化器,配置value.serializer
即可
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
和序列化器对应的,是反序列化器,消费者需要使用反序列化器把从kafka中收到的字节数组转换为相应的对象,也就是说序列化器和反序列化器要对应起来才能转换成功。
反序列化器的接口是org.apache.kafka.common.serialization.Deserializer
,同样的kafka也提供了一些常用的反序列化器:
我们也可以自定义反序列化器,以上面的User
为例,自定义反序列化器将byte数组转换为User对象:
public class UserDeserializer implements Deserializer<User> {
@Override
public User deserialize(String topic, byte[] data) {
if (data==null){
return null;
}
String json = new String(data, StandardCharsets.UTF_8);
return JSON.parseObject(json, User.class);
}
}
然后配置消费者客户端的value.deserializer
参数:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
需要注意的是,上面只是配置了value的序列化与反序列化,key还是StringSerializer。同样的key也是可以自定义配置的