kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
- kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
-
- static {
- CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
- Type.LIST,
- Collections.emptyList(),
- new ConfigDef.NonNullValidator(),
- Importance.HIGH,
- CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
- .define(CLIENT_DNS_LOOKUP_CONFIG,
- Type.STRING,
- ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
- in(ClientDnsLookup.DEFAULT.toString(),
- ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
- ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
- .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
- .define(GROUP_INSTANCE_ID_CONFIG,
- Type.STRING,
- null,
- Importance.MEDIUM,
- GROUP_INSTANCE_ID_DOC)
- .define(SESSION_TIMEOUT_MS_CONFIG,
- Type.INT,
- 10000,
- Importance.HIGH,
- SESSION_TIMEOUT_MS_DOC)
- .define(HEARTBEAT_INTERVAL_MS_CONFIG,
- Type.INT,
- 3000,
- Importance.HIGH,
- HEARTBEAT_INTERVAL_MS_DOC)
- .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
- Type.LIST,
- Collections.singletonList(RangeAssignor.class),
- new ConfigDef.NonNullValidator(),
- Importance.MEDIUM,
- PARTITION_ASSIGNMENT_STRATEGY_DOC)
- .define(METADATA_MAX_AGE_CONFIG,
- Type.LONG,
- 5 * 60 * 1000,
- atLeast(0),
- Importance.LOW,
- CommonClientConfigs.METADATA_MAX_AGE_DOC)
- .define(ENABLE_AUTO_COMMIT_CONFIG,
- Type.BOOLEAN,
- true,
- Importance.MEDIUM,
- ENABLE_AUTO_COMMIT_DOC)
- .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
- Type.INT,
- 5000,
- atLeast(0),
- Importance.LOW,
- AUTO_COMMIT_INTERVAL_MS_DOC)
- .define(CLIENT_ID_CONFIG,
- Type.STRING,
- "",
- Importance.LOW,
- CommonClientConfigs.CLIENT_ID_DOC)
- .define(CLIENT_RACK_CONFIG,
- Type.STRING,
- "",
- Importance.LOW,
- CommonClientConfigs.CLIENT_RACK_DOC)
- .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
- Type.INT,
- DEFAULT_MAX_PARTITION_FETCH_BYTES,
- atLeast(0),
- Importance.HIGH,
- MAX_PARTITION_FETCH_BYTES_DOC)
- .define(SEND_BUFFER_CONFIG,
- Type.INT,
- 128 * 1024,
- atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
- Importance.MEDIUM,
- CommonClientConfigs.SEND_BUFFER_DOC)
- .define(RECEIVE_BUFFER_CONFIG,
- Type.INT,
- 64 * 1024,
- atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
- Importance.MEDIUM,
- CommonClientConfigs.RECEIVE_BUFFER_DOC)
- .define(FETCH_MIN_BYTES_CONFIG,
- Type.INT,
- 1,
- atLeast(0),
- Importance.HIGH,
- FETCH_MIN_BYTES_DOC)
- .define(FETCH_MAX_BYTES_CONFIG,
- Type.INT,
- DEFAULT_FETCH_MAX_BYTES,
- atLeast(0),
- Importance.MEDIUM,
- FETCH_MAX_BYTES_DOC)
- .define(FETCH_MAX_WAIT_MS_CONFIG,
- Type.INT,
- 500,
- atLeast(0),
- Importance.LOW,
- FETCH_MAX_WAIT_MS_DOC)
- .define(RECONNECT_BACKOFF_MS_CONFIG,
- Type.LONG,
- 50L,
- atLeast(0L),
- Importance.LOW,
- CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
- .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
- Type.LONG,
- 1000L,
- atLeast(0L),
- Importance.LOW,
- CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
- .define(RETRY_BACKOFF_MS_CONFIG,
- Type.LONG,
- 100L,
- atLeast(0L),
- Importance.LOW,
- CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-
- .define(AUTO_OFFSET_RESET_CONFIG,
- Type.STRING,
- "latest", //
- in("latest", "earliest", "none"),
- Importance.MEDIUM,
- AUTO_OFFSET_RESET_DOC)
- .define(CHECK_CRCS_CONFIG,
- Type.BOOLEAN,
- true,
- Importance.LOW,
- CHECK_CRCS_DOC)
- .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
- Type.LONG,
- 30000,
- atLeast(0),
- Importance.LOW,
- CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG,
- Type.INT,
- 2,
- atLeast(1),
- Importance.LOW,
- CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
- .define(METRICS_RECORDING_LEVEL_CONFIG,
- Type.STRING,
- Sensor.RecordingLevel.INFO.toString(),
- in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
- Importance.LOW,
- CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG,
- Type.LIST,
- Collections.emptyList(),
- new ConfigDef.NonNullValidator(),
- Importance.LOW,
- CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- VALUE_DESERIALIZER_CLASS_DOC)
- .define(REQUEST_TIMEOUT_MS_CONFIG,
- Type.INT,
- 30000,
- atLeast(0),
- Importance.MEDIUM,
- REQUEST_TIMEOUT_MS_DOC)
- .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
- Type.INT,
- 60 * 1000,
- atLeast(0),
- Importance.MEDIUM,
- CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
- .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
- Type.LONG,
- CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
- Importance.MEDIUM,
- CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
- .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
- Type.LONG,
- CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
- Importance.MEDIUM,
- CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
- /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
- .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
- Type.LONG,
- 9 * 60 * 1000,
- Importance.MEDIUM,
- CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
- .define(INTERCEPTOR_CLASSES_CONFIG,
- Type.LIST,
- Collections.emptyList(),
- new ConfigDef.NonNullValidator(),
- Importance.LOW,
- INTERCEPTOR_CLASSES_DOC)
- .define(MAX_POLL_RECORDS_CONFIG,
- Type.INT,
- 500,
- atLeast(1),
- Importance.MEDIUM,
- MAX_POLL_RECORDS_DOC)
- .define(MAX_POLL_INTERVAL_MS_CONFIG,
- Type.INT,
- 300000,
- atLeast(1),
- Importance.MEDIUM,
- MAX_POLL_INTERVAL_MS_DOC)
- .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
- Type.BOOLEAN,
- DEFAULT_EXCLUDE_INTERNAL_TOPICS,
- Importance.MEDIUM,
- EXCLUDE_INTERNAL_TOPICS_DOC)
- .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
- Type.BOOLEAN,
- true,
- Importance.LOW)
- .defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
- Type.BOOLEAN,
- false,
- Importance.LOW)
- .define(ISOLATION_LEVEL_CONFIG,
- Type.STRING,
- DEFAULT_ISOLATION_LEVEL,
- in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
- Importance.MEDIUM,
- ISOLATION_LEVEL_DOC)
- .define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
- Type.BOOLEAN,
- DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
- Importance.MEDIUM,
- ALLOW_AUTO_CREATE_TOPICS_DOC)
- // security support
- .define(SECURITY_PROVIDERS_CONFIG,
- Type.STRING,
- null,
- Importance.LOW,
- SECURITY_PROVIDERS_DOC)
- .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
- Type.STRING,
- CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- Importance.MEDIUM,
- CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .withClientSslSupport()
- .withClientSaslSupport();
- }
像auto.offset.reset这个配置默认值为latest一样,再看下ConsumerConfig的几个构造方法
- public ConsumerConfig(Properties props) {
- super(CONFIG, props);
- }
-
- public ConsumerConfig(Map
props) { - super(CONFIG, props);
- }
是的,所有的ConsumerConfig构造方法都将上面的默认配置CONFIG传入了构造方法,将下来的处理就是如果显式配置了对应的配置项就使用显式配置数据,没有则使用CONFIG里面的默认配置。
PS:
上面的默认配置除了有一些配置的默认配置,一些枚举属性还有其可选值,比如
auto.offset.reset的可选项