Avro 协议可以提升转换效率,本质是kafka就支持
1、json
2、Avro
3、protobuf/grpc
在 confluentinc/cp-schema-registry:6.0.0 的image中有如下命令
[appuser@schema-registry bin]$ ls -ltr kafka*consumer
-rwxr-xr-x 1 root root 1578 Sep 16 2020 kafka-protobuf-console-consumer
-rwxr-xr-x 1 root root 1576 Sep 16 2020 kafka-json-schema-console-consumer
-rwxr-xr-x 1 root root 1553 Sep 16 2020 kafka-avro-console-consumer
java使用可以参考:https://docs.confluent.io/platform/current/streams/developer-guide/datatypes.html#avro-primitive
核心代码
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values
以下代码也是类似的:
public static Serde<EntitySentiment> EntitySentiment(String url, boolean isKey) {
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", url);
Serde<EntitySentiment> serde = new SpecificAvroSerde<>();
serde.configure(serdeConfig, isKey);
return serde;
}
和下面useSchemaRegistry 分支 进行注册使用
// write to the output topic. note: the following code shows how to use
// both a registry-aware Avro Serde and a registryless Avro Serde
if (useSchemaRegistry) {
enriched.to(
"crypto-sentiment",
// registry-aware Avro Serde
Produced.with(
Serdes.ByteArray(), AvroSerdes.EntitySentiment("http://kafka:8081", false)));
} else {
enriched.to(
"crypto-sentiment",
Produced.with(
Serdes.ByteArray(),
// registryless Avro Serde
com.mitchseymour.kafka.serialization.avro.AvroSerdes.get(EntitySentiment.class)));
}
以上代码的else 是非registry模式,使用来自
https://github.com/mitch-seymour/kafka-registryless-avro-serdes的代码包。