目录


实现org.apache.kafka.common.serialization.Serializer
- @Data
- public class User {
- private Integer userId;
- private String username;
- }
-
- public class UserSerializer implements Serializer
{ - @Override
- public void configure(Map
configs, boolean isKey) { - // do nothing
- }
-
- @Override
- public byte[] serialize(String topic, User data) {
- try {
- // 如果数据是null,则返回null
- if (data == null) return null;
- Integer userId = data.getUserId();
- String username = data.getUsername();
- int length = 0;
- byte[] bytes = null;
- if (null != username) {
- bytes = username.getBytes("utf-8");
- length = bytes.length;
- }
- // 第一个4字节存储userId的值
- // 第二个4字节存储username字节数组的长度int值
- // 第三个length长度,存储username序列化之后的字节数组
- ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
- buffer.putInt(userId);
- buffer.putInt(length);
- buffer.put(bytes);
- return buffer.array();
- } catch (UnsupportedEncodingException e) {
- throw new SerializationException("序列化数据异常");
- }
- }
- @Override
- public void close() {
- // do nothing
- }
- }
KafkaProducer.partition();DefaultPartitioner.partition();
实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。
在生产者参数中通过配置partitioner.class指定自定义分区器。
- /**
- * 自定义分区器
- */
- public class MyPartitioner implements Partitioner {
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 此处可以计算分区的数字。
- // 我们直接指定为2
- return 2;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map
configs) { -
- }
- }
在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。
自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:
在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。
- public class Interceptor
implements ProducerInterceptor { - private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
- @Override
- public ProducerRecord
onSend(ProducerRecord record) { - System.out.println("拦截器---go");
- // 此处根据业务需要对相关的数据作修改
- String topic = record.topic();
- Integer partition = record.partition();
- Long timestamp = record.timestamp();
- KEY key = record.key();
- VALUE value = record.value();
- Headers headers = record.headers();
- // 添加消息头
- headers.add("interceptor", "interceptor".getBytes());
- ProducerRecord
newRecord = - new ProducerRecord
(topic, partition, timestamp, key, value, headers); - return newRecord;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- System.out.println("拦截器---back");
- if (exception != null) {
- // 如果发生异常,记录在日志中
- LOGGER.error(exception.getMessage());
- }
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void configure(Map
configs) { - }
- }



以上内容为个人学习理解,如有问题,欢迎在评论区指出。
部分内容截取自网络,如有侵权,联系作者删除。