• kafka生产者


    1.原理

    2.普通异步发送

    引入pom:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.kafka</groupId>
    4. <artifactId>kafka-clients</artifactId>
    5. <version>3.0.0</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.slf4j</groupId>
    9. <artifactId>slf4j-log4j12</artifactId>
    10. <version>1.7.25</version>
    11. </dependency>
    12. </dependencies>
    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.util.Properties;
    7. public class CustomProducer {
    8. public static void main(String[] args) {
    9. // 1. 创建 kafka 生产者的配置对象
    10. Properties properties = new Properties();
    11. // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    13. "hadoop100:9092");
    14. // key,value 序列化(必须):key.serializer,value.serializer
    15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    16. "org.apache.kafka.common.serialization.StringSerializer");
    17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    18. "org.apache.kafka.common.serialization.StringSerializer");
    19. // 3. 创建 kafka 生产者对象
    20. KafkaProducer<String, String> kafkaProducer = new
    21. KafkaProducer<String, String>(properties);
    22. // 4. 调用 send 方法,发送消息
    23. for (int i = 0; i < 5; i++) {
    24. kafkaProducer.send(new
    25. ProducerRecord<>("first","atguigu " + i));
    26. }
    27. // 5. 关闭资源
    28. kafkaProducer.close();
    29. }
    30. }

    测试效果:

    3.带回调的异步发送

    回调的信息实际是从队列返回的

     

    4.同步发送 

    只需在异步发送的基础上,再调用一下 get()方法即可。

    5.分区

    6.分区策略 

    指定key的值:对key的hashcode做分配

    希望将订单表的数据全部发到kafka的一个分区上,怎么处理?

    将该表的名称作为key值然后发送即可 
    如:

    7.自定义分区 (脏数据的处理)

    如果研发人员可以根据企业需求,自己重新实现分区器。 1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区, 不包含 atguigu,就发往 1 号分区。 

    自定义分区器:

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.Partitioner;
    3. import org.apache.kafka.common.Cluster;
    4. import java.util.Map;
    5. public class MyPartitioner implements Partitioner {
    6. @Override
    7. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    8. String s = value.toString();
    9. int partion;
    10. if(s.contains("atguigu")) {
    11. partion = 1;
    12. }else {
    13. partion=0;
    14. }
    15. return partion;
    16. }
    17. @Override
    18. public void close() {
    19. }
    20. @Override
    21. public void configure(Map<String, ?> map) {
    22. }
    23. }

     拷贝全类名,产生关联

    测试结论:

    8.如何让提高生产者的吞吐量

     

    1. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
    2. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
    3. properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
    4. // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
    5. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

  • 相关阅读:
    java毕业设计艾灸减肥管理网站Mybatis+系统+数据库+调试部署
    DAMA-第三章(数据治理)
    windows的redis配置sentinel
    【雷达通信】合成孔径雷达地面运动目标检测技术研究(Matlab代码实现)
    C++多态
    EEG 情绪标签 - 简介
    FastAPI 学习之路(九)请求体有多个参数如何处理?
    Vue路由重复点击报错解决
    【2010NOIP普及组】T4. 三国游戏 试题解析
    红黑树实现
  • 原文地址:https://blog.csdn.net/aaaa1234561/article/details/136242395