• SpringBoot Kafka生产者 多kafka配置


    一、配置文件

    1. xxxxxx:
    2. kafka:
    3. bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
    4. producer:
    5. # 设置大于0的值,则客户端会将发送失败的记录重新发送
    6. retries: 3
    7. #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
    8. batch-size: 16384
    9. linger: 1
    10. # 设置生产者内存缓冲区的大小。#32M
    11. buffer-memory: 33554432
    12. # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
    13. # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
    14. # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
    15. acks: 1
    16. # 指定消息key和消息体的编解码方式 值的序列化方式
    17. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    18. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    19. consumer:
    20. poll-timeout: 3000
    21. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    22. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    23. auto-commit: false
    24. offset-reset: earliest
    25. records: 10
    26. session-timeout: 150000
    27. poll-interval: 360000
    28. request-timeout: 60000

    二、KafkaConfig

    1. package com.xxxxxx.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.kafka.clients.consumer.ConsumerConfig;
    4. import org.apache.kafka.clients.producer.ProducerConfig;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import org.springframework.beans.factory.annotation.Value;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import org.springframework.kafka.annotation.EnableKafka;
    10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
    12. import org.springframework.kafka.core.*;
    13. import java.util.HashMap;
    14. import java.util.Map;
    15. @Slf4j
    16. @Configuration
    17. @EnableKafka
    18. public class KafkaConfig {
    19. @Value("${xxxxxx.kafka.bootstrap-servers}")
    20. private String servers;
    21. @Value("${xxxxxx.kafka.producer.retries}")
    22. private int retries;
    23. @Value("${xxxxxx.kafka.producer.batch-size}")
    24. private int batchSize;
    25. @Value("${xxxxxx-afka.producer.linger}")
    26. private int linger;
    27. @Value("${xxxxxx.kafka.producer.buffer-memory}")
    28. private int bufferMemory;
    29. @Value("${xxxxxx.kafka.producer.acks}")
    30. private String acks;
    31. @Value("${xxxxxx.kafka.producer.key-serializer}")
    32. private String keyDeserializer;
    33. @Value("${xxxxxx.kafka.producer.value-serializer}")
    34. private String valueDeserializer;
    35. // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
    36. public Map producerConfigs() {
    37. Map props = new HashMap<>();
    38. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    39. //设置重试次数
    40. props.put(ProducerConfig.RETRIES_CONFIG, retries);
    41. //达到batchSize大小的时候会发送消息
    42. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    43. //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
    44. props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    45. //缓冲区的值
    46. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    47. //序列化手段
    48. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
    49. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
    50. props.put(ProducerConfig.ACKS_CONFIG, acks);
    51. return props;
    52. }
    53. public ProducerFactory producerFactory() {
    54. return new DefaultKafkaProducerFactory<>(producerConfigs());
    55. }
    56. @Bean(name = "xxxxxxKafkaTemplate")
    57. public KafkaTemplate kafkaTemplate() {
    58. return new KafkaTemplate(producerFactory());
    59. }
    60. }

    三、生产者

    1. @Resource(name = "xxxxxxKafkaTemplate")
    2. private KafkaTemplate kafkaTemplate;

    kafkaTemplate.send(topic, message);

  • 相关阅读:
    学习笔记—Grafana监控docker--mysql、redis的实战
    el-form for循环动态校验规则
    传统游戏难产 育碧瞄向Web3
    Java进阶篇--LockSupport
    Java详解:单列 | 双列集合 | Collections类
    Linux知识
    Java日志框架Log4j 2详解
    JavaWeb-CSS
    串口数据包收发
    数据结构——二叉树的操作(1)(C++)
  • 原文地址:https://blog.csdn.net/cndn20120225/article/details/134271528