• java kafka客户端何时设置的kafka消费者默认值


    kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:

    1. kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
    2. static {
    3. CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
    4. Type.LIST,
    5. Collections.emptyList(),
    6. new ConfigDef.NonNullValidator(),
    7. Importance.HIGH,
    8. CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
    9. .define(CLIENT_DNS_LOOKUP_CONFIG,
    10. Type.STRING,
    11. ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
    12. in(ClientDnsLookup.DEFAULT.toString(),
    13. ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
    14. ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
    15. Importance.MEDIUM,
    16. CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
    17. .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
    18. .define(GROUP_INSTANCE_ID_CONFIG,
    19. Type.STRING,
    20. null,
    21. Importance.MEDIUM,
    22. GROUP_INSTANCE_ID_DOC)
    23. .define(SESSION_TIMEOUT_MS_CONFIG,
    24. Type.INT,
    25. 10000,
    26. Importance.HIGH,
    27. SESSION_TIMEOUT_MS_DOC)
    28. .define(HEARTBEAT_INTERVAL_MS_CONFIG,
    29. Type.INT,
    30. 3000,
    31. Importance.HIGH,
    32. HEARTBEAT_INTERVAL_MS_DOC)
    33. .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    34. Type.LIST,
    35. Collections.singletonList(RangeAssignor.class),
    36. new ConfigDef.NonNullValidator(),
    37. Importance.MEDIUM,
    38. PARTITION_ASSIGNMENT_STRATEGY_DOC)
    39. .define(METADATA_MAX_AGE_CONFIG,
    40. Type.LONG,
    41. 5 * 60 * 1000,
    42. atLeast(0),
    43. Importance.LOW,
    44. CommonClientConfigs.METADATA_MAX_AGE_DOC)
    45. .define(ENABLE_AUTO_COMMIT_CONFIG,
    46. Type.BOOLEAN,
    47. true,
    48. Importance.MEDIUM,
    49. ENABLE_AUTO_COMMIT_DOC)
    50. .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
    51. Type.INT,
    52. 5000,
    53. atLeast(0),
    54. Importance.LOW,
    55. AUTO_COMMIT_INTERVAL_MS_DOC)
    56. .define(CLIENT_ID_CONFIG,
    57. Type.STRING,
    58. "",
    59. Importance.LOW,
    60. CommonClientConfigs.CLIENT_ID_DOC)
    61. .define(CLIENT_RACK_CONFIG,
    62. Type.STRING,
    63. "",
    64. Importance.LOW,
    65. CommonClientConfigs.CLIENT_RACK_DOC)
    66. .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
    67. Type.INT,
    68. DEFAULT_MAX_PARTITION_FETCH_BYTES,
    69. atLeast(0),
    70. Importance.HIGH,
    71. MAX_PARTITION_FETCH_BYTES_DOC)
    72. .define(SEND_BUFFER_CONFIG,
    73. Type.INT,
    74. 128 * 1024,
    75. atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
    76. Importance.MEDIUM,
    77. CommonClientConfigs.SEND_BUFFER_DOC)
    78. .define(RECEIVE_BUFFER_CONFIG,
    79. Type.INT,
    80. 64 * 1024,
    81. atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
    82. Importance.MEDIUM,
    83. CommonClientConfigs.RECEIVE_BUFFER_DOC)
    84. .define(FETCH_MIN_BYTES_CONFIG,
    85. Type.INT,
    86. 1,
    87. atLeast(0),
    88. Importance.HIGH,
    89. FETCH_MIN_BYTES_DOC)
    90. .define(FETCH_MAX_BYTES_CONFIG,
    91. Type.INT,
    92. DEFAULT_FETCH_MAX_BYTES,
    93. atLeast(0),
    94. Importance.MEDIUM,
    95. FETCH_MAX_BYTES_DOC)
    96. .define(FETCH_MAX_WAIT_MS_CONFIG,
    97. Type.INT,
    98. 500,
    99. atLeast(0),
    100. Importance.LOW,
    101. FETCH_MAX_WAIT_MS_DOC)
    102. .define(RECONNECT_BACKOFF_MS_CONFIG,
    103. Type.LONG,
    104. 50L,
    105. atLeast(0L),
    106. Importance.LOW,
    107. CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
    108. .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
    109. Type.LONG,
    110. 1000L,
    111. atLeast(0L),
    112. Importance.LOW,
    113. CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
    114. .define(RETRY_BACKOFF_MS_CONFIG,
    115. Type.LONG,
    116. 100L,
    117. atLeast(0L),
    118. Importance.LOW,
    119. CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
    120. .define(AUTO_OFFSET_RESET_CONFIG,
    121. Type.STRING,
    122. "latest", //
    123. in("latest", "earliest", "none"),
    124. Importance.MEDIUM,
    125. AUTO_OFFSET_RESET_DOC)
    126. .define(CHECK_CRCS_CONFIG,
    127. Type.BOOLEAN,
    128. true,
    129. Importance.LOW,
    130. CHECK_CRCS_DOC)
    131. .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
    132. Type.LONG,
    133. 30000,
    134. atLeast(0),
    135. Importance.LOW,
    136. CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
    137. .define(METRICS_NUM_SAMPLES_CONFIG,
    138. Type.INT,
    139. 2,
    140. atLeast(1),
    141. Importance.LOW,
    142. CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
    143. .define(METRICS_RECORDING_LEVEL_CONFIG,
    144. Type.STRING,
    145. Sensor.RecordingLevel.INFO.toString(),
    146. in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
    147. Importance.LOW,
    148. CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
    149. .define(METRIC_REPORTER_CLASSES_CONFIG,
    150. Type.LIST,
    151. Collections.emptyList(),
    152. new ConfigDef.NonNullValidator(),
    153. Importance.LOW,
    154. CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
    155. .define(KEY_DESERIALIZER_CLASS_CONFIG,
    156. Type.CLASS,
    157. Importance.HIGH,
    158. KEY_DESERIALIZER_CLASS_DOC)
    159. .define(VALUE_DESERIALIZER_CLASS_CONFIG,
    160. Type.CLASS,
    161. Importance.HIGH,
    162. VALUE_DESERIALIZER_CLASS_DOC)
    163. .define(REQUEST_TIMEOUT_MS_CONFIG,
    164. Type.INT,
    165. 30000,
    166. atLeast(0),
    167. Importance.MEDIUM,
    168. REQUEST_TIMEOUT_MS_DOC)
    169. .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
    170. Type.INT,
    171. 60 * 1000,
    172. atLeast(0),
    173. Importance.MEDIUM,
    174. CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
    175. .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
    176. Type.LONG,
    177. CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
    178. Importance.MEDIUM,
    179. CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
    180. .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
    181. Type.LONG,
    182. CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
    183. Importance.MEDIUM,
    184. CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
    185. /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
    186. .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
    187. Type.LONG,
    188. 9 * 60 * 1000,
    189. Importance.MEDIUM,
    190. CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
    191. .define(INTERCEPTOR_CLASSES_CONFIG,
    192. Type.LIST,
    193. Collections.emptyList(),
    194. new ConfigDef.NonNullValidator(),
    195. Importance.LOW,
    196. INTERCEPTOR_CLASSES_DOC)
    197. .define(MAX_POLL_RECORDS_CONFIG,
    198. Type.INT,
    199. 500,
    200. atLeast(1),
    201. Importance.MEDIUM,
    202. MAX_POLL_RECORDS_DOC)
    203. .define(MAX_POLL_INTERVAL_MS_CONFIG,
    204. Type.INT,
    205. 300000,
    206. atLeast(1),
    207. Importance.MEDIUM,
    208. MAX_POLL_INTERVAL_MS_DOC)
    209. .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
    210. Type.BOOLEAN,
    211. DEFAULT_EXCLUDE_INTERNAL_TOPICS,
    212. Importance.MEDIUM,
    213. EXCLUDE_INTERNAL_TOPICS_DOC)
    214. .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
    215. Type.BOOLEAN,
    216. true,
    217. Importance.LOW)
    218. .defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
    219. Type.BOOLEAN,
    220. false,
    221. Importance.LOW)
    222. .define(ISOLATION_LEVEL_CONFIG,
    223. Type.STRING,
    224. DEFAULT_ISOLATION_LEVEL,
    225. in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
    226. Importance.MEDIUM,
    227. ISOLATION_LEVEL_DOC)
    228. .define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
    229. Type.BOOLEAN,
    230. DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
    231. Importance.MEDIUM,
    232. ALLOW_AUTO_CREATE_TOPICS_DOC)
    233. // security support
    234. .define(SECURITY_PROVIDERS_CONFIG,
    235. Type.STRING,
    236. null,
    237. Importance.LOW,
    238. SECURITY_PROVIDERS_DOC)
    239. .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
    240. Type.STRING,
    241. CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
    242. Importance.MEDIUM,
    243. CommonClientConfigs.SECURITY_PROTOCOL_DOC)
    244. .withClientSslSupport()
    245. .withClientSaslSupport();
    246. }

    像auto.offset.reset这个配置默认值为latest一样,再看下ConsumerConfig的几个构造方法

    1. public ConsumerConfig(Properties props) {
    2. super(CONFIG, props);
    3. }
    4. public ConsumerConfig(Map props) {
    5. super(CONFIG, props);
    6. }

    是的,所有的ConsumerConfig构造方法都将上面的默认配置CONFIG传入了构造方法,将下来的处理就是如果显式配置了对应的配置项就使用显式配置数据,没有则使用CONFIG里面的默认配置。

    PS:

    上面的默认配置除了有一些配置的默认配置,一些枚举属性还有其可选值,比如

    auto.offset.reset的可选项

  • 相关阅读:
    stm32的ADC采样率如何通过Time定时器进行控制
    树莓派4B安装ROS的方法总结
    Java IO Steam
    7.Gin 路由详解 - 路由分组 - 路由文件抽离
    Django ORM查询之聚合函数、聚合查询(aggregate)、分组查询(annotate)
    shell脚本适用场景
    Thread线程启动的多种方式
    WebGL-Vue3-TS-Threejs:基础练习 / Javascript 3D library / demo
    计算机组原,主机组成和指令流程,主机组成和指令流程,冯诺依曼体系,现代计算机框图,编程举例,控制器、存储器、运算器的构成,取数指令执行流程
    【软件测试】如何用python连接Linux服务器
  • 原文地址:https://blog.csdn.net/john1337/article/details/136715491