• kafka系列七、kafka核心配置(转)


    一、producer核心配置

    1、acks :发送应答(默认值:1)

    生产者在考虑完成请求之前要求leader收到的确认的数量。这控制了发送的记录的持久性。允许以下设置:

    • acks=0:设置为0,则生产者将完全不等待来自服务器的任何确认。记录将立即添加到socket缓冲区,并被认为已发送。在这种情况下,不能保证服务器已经收到记录,重试配置将不会生效(因为客户机通常不会知道任何失败)。每个记录返回的偏移量总是-1。
    • acks=1:leader会将记录写到本地日志中,但不会等待所有follower的完全确认。在这种情况下,如果leader在记录失败后立即失败,但在追随者复制记录之前失败,那么记录就会丢失。
    • acks=all / -1:leader将等待完整的同步副本来确认记录。这保证了只要至少有一个同步副本仍然存在,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。

    2、batch.size:批量发送大小(默认:16384,16K)

    缓存到本地内存,批量发送大小,意思每次发送16K到broke。当多个记录被发送到同一个分区时,生产者将尝试将记录批处理成更少的请求。这有助于客户机和服务器上的性能。此配置以字节为单位控制默认批处理大小。

    3、bootstrap.servers:服务器地址

    broke服务器地址,多个用逗号割开。

    4、buffer.memory:生产者最大可用缓存 (默认:33554432,32M)

    生产者可以用来缓冲等待发送到服务器的记录的总内存字节。如果记录被发送的速度超过了它们可以被发送到服务器的速度,那么生产者将阻塞max.block。然后它会抛出一个异常。

    该设置应该大致与生成器将使用的总内存相对应,但不是硬绑定,因为生成器使用的并非所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护飞行中的请求。

    生产者产生的消息缓存到本地,每次批量发送batch.size大小到服务器。

    5、client.id:生产者ID(默认“”)

    请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称,从而能够跟踪ip/端口之外的请求源。

    6、compression.type:压缩类型(默认值:producer)

    指定给定主题的最终压缩类型。此配置接受标准压缩编解码器(“gzip”、“snappy”、“lz4”、“zstd”)。它还接受“未压缩”,相当于没有压缩;以及“生产者”,即保留生产者设置的原始压缩编解码器。

    “gzip”:压缩效率高,适合高内存、CPU

    “snappy”:适合带宽敏感性,压缩力度大

    7、retries:失败重试次数(默认:2147483647)

    异常是RetriableException类型或者TransactionManager允许重试;

    transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。
    在这里插入图片描述
    允许重试,但不需要设置max.in.flight.requests.per.connection(单个连接上发送的未确认请求的最大数量)。连接到1可能会改变记录的顺序,因为如果将两个批发送到单个分区,第一个批处理失败并重试,但是第二个批处理成功,那么第二个批处理中的记录可能先出现。

    通过delivery.timeout.ms也可以控制重试次数,如果重试次数没有用尽,传输超时也会停止。

    retry.backoff.ms:重试阻塞时间(默认:100)

    这避免了在某些失败场景下以紧密循环的方式重复发送请求。

    8、delivery.timeout.ms:传输时间(默认:120000,2分钟)

    生产者发送完请求接受服务器ACk的时间,该时间允许重试 ,该配置应该大于request.timeout.ms + linger.ms。

    9、connections.max.idle.ms:关闭空闲连接时间(默认:540000)

    在此配置指定的毫秒数之后关闭空闲连接。

    10、enable.idempotence:开启幂等(默认:false)

    当设置为“true”时,生产者将确保在流中准确地写入每个消息的副本。如果“false”,则由于代理失败而导致生产者重试,等等,可能会在流中写入重试消息的副本。请注意,启用幂等需要使用max.in.flight.requests.per.connection,连接小于或等于5,重试大于0且ack必须为“all”。如果用户没有显式地设置这些值,将选择合适的值。如果设置了不兼容的值,就会抛出ConfigException。

    11、max.in.flight.requests.per.connection:单个连接上发送的未确认请求的最大数量(默认:5)

    阻塞前客户端在单个连接上发送的未确认请求的最大数量。请注意,如果该设置设置为大于1,并且发送失败,则有由于重试(即,如果启用重试)。

    12、interceptor.classes:拦截器(默认:无)

    用作拦截器的类的列表。实现接口:org.apache.kafka.clients.producer。ProducerInterceptor接口允许将生产者接收到的记录发布到Kafka集群之前拦截它们(可能还会发生突变)。默认情况下,没有拦截器。

    13、key.serializer:key序列化器(默认无)

    实现org.apache.kafka.common. serialize .Serializer接口的key的序列化器类。String可配置:class org.apache.kafka.common.serialization.StringSerializer。

    14、value.serializer:value序列化器(默认无)

    序列化器类的值,该值实现org.apache.kafka.common. serialize .Serializer接口。String可配置:class org.apache.kafka.common.serialization.StringSerializer

    15、linger.ms:发送延迟时间(默认:0)

    为减少负载和客户端的请求数量,生产者不会一条一条发送,而是会逗留一段时间批量发送。batch.size和linger.ms满足任何一个条件都会发送

    16、max.block.ms:阻塞时间(默认:60000,一分钟)

    配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()阻塞的时间。由于缓冲区已满或元数据不可用,也会阻塞。用户提供的序列化器或分区程序中的阻塞将不计入此超时。

    17、max.request.size:最大请求字节大小(默认:1048576,1M)

    请求的最大字节大小。此设置将限制生产者在单个请求中发送记录批的数量,以避免发送大量请求。这也有效地限制了最大记录批大小。注意,服务器对记录批处理大小有自己的上限,这可能与此不同。

    18、metric.reporters:自定义指标报告器

    用作指标报告器的类的列表。metricsreporter接口实现了org.apache.kafka.common.metrics.MetricsReporter接口,该接口允许插入将在创建新度量时得到通知的类。JmxReporter始终包含在注册JMX统计信息中。

    19、partitioner.class:自定义分区策略

    实现接口 org.apache.kafka.clients.producer.Partitioner,默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

    20、request.timeout.ms:请求超时时间(默认:30000)

    配置控制客户机等待请求响应的最长时间。如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。这个应该大于replica.lag.time.max。ms(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性。

  • 相关阅读:
    springboot+高校失物招领系统 毕业设计-附源码121441
    算法D31 | 贪心算法1 | 455.分发饼干 376. 摆动序列 53. 最大子序和
    php代码调用python程序代码
    mysql第二次作业
    数据结构课程笔记总结1 - 排序算法
    【2022年玄武云科技AI算法岗秋招面试记录】
    视图相关知识的汇总
    u盘启动出现蓝屏怎么办
    gcc和makfile
    利用QGIS采集卫星图上的建筑并转成矢量数据
  • 原文地址:https://blog.csdn.net/qq_24095055/article/details/125544899