• kafka学习-生产者


    目录

    1、消息生产流程

    2、生产者常见参数配置

    3、序列化器

    基本概念

    自定义序列化器

    4、分区器

    默认分区规则

    自定义分区器

    5、生产者拦截器

    作用

    自定义拦截器

    6、生产者原理解析


    1、消息生产流程

    2、生产者常见参数配置

    3、序列化

    基本概念

    • 在Kafka中保存的数据都是字节数组。
    • 消息发送前,需要将消息序列化为字节数组进行发送。
    • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
    • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
    • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

    自定义序列化器

    实现org.apache.kafka.common.serialization.Serializer接口,并实现其中的serializer方法。

    1. @Data
    2. public class User {
    3. private Integer userId;
    4. private String username;
    5. }
    6. public class UserSerializer implements Serializer {
    7. @Override
    8. public void configure(Map configs, boolean isKey) {
    9. // do nothing
    10. }
    11. @Override
    12. public byte[] serialize(String topic, User data) {
    13. try {
    14. // 如果数据是null,则返回null
    15. if (data == null) return null;
    16. Integer userId = data.getUserId();
    17. String username = data.getUsername();
    18. int length = 0;
    19. byte[] bytes = null;
    20. if (null != username) {
    21. bytes = username.getBytes("utf-8");
    22. length = bytes.length;
    23. }
    24. // 第一个4字节存储userId的值
    25. // 第二个4字节存储username字节数组的长度int值
    26. // 第三个length长度,存储username序列化之后的字节数组
    27. ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
    28. buffer.putInt(userId);
    29. buffer.putInt(length);
    30. buffer.put(bytes);
    31. return buffer.array();
    32. } catch (UnsupportedEncodingException e) {
    33. throw new SerializationException("序列化数据异常");
    34. }
    35. }
    36. @Override
    37. public void close() {
    38. // do nothing
    39. }
    40. }

    4、分区器

    默认分区规则

    KafkaProducer.partition();DefaultPartitioner.partition();

    1. 如果record提供了分区号,则使⽤record提供的分区号
    2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
    3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

    自定义分区器

    实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

    在生产者参数中通过配置partitioner.class指定自定义分区器。

    1. /**
    2. * 自定义分区器
    3. */
    4. public class MyPartitioner implements Partitioner {
    5. @Override
    6. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    7. // 此处可以计算分区的数字。
    8. // 我们直接指定为2
    9. return 2;
    10. }
    11. @Override
    12. public void close() {
    13. }
    14. @Override
    15. public void configure(Map configs) {
    16. }
    17. }

    5、生产者拦截器

    作用

            在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

    自定义拦截器

            自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

    • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
    • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
    • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

            在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

    1. public class Interceptor implements ProducerInterceptor {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
    3. @Override
    4. public ProducerRecord onSend(ProducerRecord record) {
    5. System.out.println("拦截器---go");
    6. // 此处根据业务需要对相关的数据作修改
    7. String topic = record.topic();
    8. Integer partition = record.partition();
    9. Long timestamp = record.timestamp();
    10. KEY key = record.key();
    11. VALUE value = record.value();
    12. Headers headers = record.headers();
    13. // 添加消息头
    14. headers.add("interceptor", "interceptor".getBytes());
    15. ProducerRecord newRecord =
    16. new ProducerRecord(topic, partition, timestamp, key, value, headers);
    17. return newRecord;
    18. }
    19. @Override
    20. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    21. System.out.println("拦截器---back");
    22. if (exception != null) {
    23. // 如果发生异常,记录在日志中
    24. LOGGER.error(exception.getMessage());
    25. }
    26. }
    27. @Override
    28. public void close() {
    29. }
    30. @Override
    31. public void configure(Map configs) {
    32. }
    33. }

    6、生产者原理解析

    以上内容为个人学习理解,如有问题,欢迎在评论区指出。

    部分内容截取自网络,如有侵权,联系作者删除。

  • 相关阅读:
    yolov8 c++进行部署
    Git学习
    [python基础] 面向对象——封装,继承
    linux中如何编写脚本来实现一个可以进行cmd命令及shell脚本调用的UI交互页面
    计算机相关术语科普之什么叫网关(Gateway)
    vim 从嫌弃到依赖(23)——最后的闲扯
    PTA题目 谁先倒
    《架构整洁之道》读书笔记(下)
    SpringBoot-依赖管理和自动配置
    【微机原理及接口技术】中断系统
  • 原文地址:https://blog.csdn.net/weixin_37672801/article/details/132783088