1、集群版本:2.8.1,3.0版本不再支持JDK8,不在需要Zookeeper.
2、Zookeeper版本:3.8.0.
3、Zookeeper三个节点配置文件以及环境搭建.
三个端口分别是 :2181、2182、2183.

依次建立三个保存id的文件myid文件,不需要任何后缀.
D:\BigData\apache-zookeeper-3.8.0-bin\data-file\data1
分别是数字1、2、3


①、zoo-1.cfg
- # 存放数据
- dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data1
-
- #存放日志
- dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log1
-
- # the port at which the clients will connect
- # 监听端口ifconf
- clientPort=2181
-
- # 配置集群服务,这里每个文件都一样
- server.1=192.168.1.6:2881:3881
- server.2=192.168.1.6:2882:3882
- server.3=192.168.1.6:2883:3883
- # The number of milliseconds of each tick
- tickTime=2000
- # The number of ticks that the initial
- # synchronization phase can take
- initLimit=10
- # The number of ticks that can pass between
- # sending a request and getting an acknowledgement
- syncLimit=5
- #
- # Be sure to read the maintenance section of the
- # administrator guide before turning on autopurge.
- #
- # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- #
- # The number of snapshots to retain in dataDir
- #autopurge.snapRetainCount=3
- # Purge task interval in hours
- # Set to "0" to disable auto purge feature
- #autopurge.purgeInterval=1
-
- ## Metrics Providers
- #
- # https://prometheus.io Metrics Exporter
- #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
- #metricsProvider.httpHost=0.0.0.0
- #metricsProvider.httpPort=7000
- #metricsProvider.exportJvmInfo=true
②、zoo-2.cfg
- # 存放数据
- dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data2
-
- #存放日志
- dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log2
-
- # the port at which the clients will connect
- # 监听端口ifconf
- clientPort=2182
-
- # 配置集群服务,这里每个文件都一样
- server.1=192.168.1.6:2881:3881
- server.2=192.168.1.6:2882:3882
- server.3=192.168.1.6:2883:3883
- # The number of milliseconds of each tick
- tickTime=2000
- # The number of ticks that the initial
- # synchronization phase can take
- initLimit=10
- # The number of ticks that can pass between
- # sending a request and getting an acknowledgement
- syncLimit=5
- #
- # Be sure to read the maintenance section of the
- # administrator guide before turning on autopurge.
- #
- # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- #
- # The number of snapshots to retain in dataDir
- #autopurge.snapRetainCount=3
- # Purge task interval in hours
- # Set to "0" to disable auto purge feature
- #autopurge.purgeInterval=1
-
- ## Metrics Providers
- #
- # https://prometheus.io Metrics Exporter
- #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
- #metricsProvider.httpHost=0.0.0.0
- #metricsProvider.httpPort=7000
- #metricsProvider.exportJvmInfo=true
③、zoo-3.cfg
- # 存放数据
- dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data3
-
- #存放日志
- dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log3
-
- # the port at which the clients will connect
- # 监听端口ifconf
- clientPort=2183
-
- # 配置集群服务,这里每个文件都一样
- server.1=192.168.1.6:2881:3881
- server.2=192.168.1.6:2882:3882
- server.3=192.168.1.6:2883:3883
- # The number of milliseconds of each tick
- tickTime=2000
- # The number of ticks that the initial
- # synchronization phase can take
- initLimit=10
- # The number of ticks that can pass between
- # sending a request and getting an acknowledgement
- syncLimit=5
- #
- # Be sure to read the maintenance section of the
- # administrator guide before turning on autopurge.
- #
- # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- #
- # The number of snapshots to retain in dataDir
- #autopurge.snapRetainCount=3
- # Purge task interval in hours
- # Set to "0" to disable auto purge feature
- #autopurge.purgeInterval=1
-
- ## Metrics Providers
- #
- # https://prometheus.io Metrics Exporter
- #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
- #metricsProvider.httpHost=0.0.0.0
- #metricsProvider.httpPort=7000
- #metricsProvider.exportJvmInfo=true
④、对应数据目录和日志目录 文件

⑤、配置三个启动脚本.
zkServer1.cmd和zkServer2.cmd和zkServer3.cmd配置如下
- @echo off
- REM Licensed to the Apache Software Foundation (ASF) under one or more
- REM contributor license agreements. See the NOTICE file distributed with
- REM this work for additional information regarding copyright ownership.
- REM The ASF licenses this file to You under the Apache License, Version 2.0
- REM (the "License"); you may not use this file except in compliance with
- REM the License. You may obtain a copy of the License at
- REM
- REM http://www.apache.org/licenses/LICENSE-2.0
- REM
- REM Unless required by applicable law or agreed to in writing, software
- REM distributed under the License is distributed on an "AS IS" BASIS,
- REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- REM See the License for the specific language governing permissions and
- REM limitations under the License.
-
- setlocal
- call "%~dp0zkEnv.cmd"
- set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
- set ZOOCFG=D:\\BigData\\apache-zookeeper-3.8.0-bin\\conf\\zoo-1.cfg
- set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log
-
- echo on
- call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
-
- endlocal

⑥、启动Zookeeper节点.
依次点击三个启动cmd命令即可.
第一个节点.

第二个节点.

第三个节点.

1、复制搭建目录以及目录规划.
D:\BigData\Kafka-Cluster\kafka-1

2、配置文件.
第一个节点:server.properties
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- # see kafka.server.KafkaConfig for additional details and defaults
-
- ############################# Server Basics #############################
-
- # The id of the broker. This must be set to a unique integer for each broker.
- broker.id=0
-
- port=9092
- host.name=192.168.1.6
- ############################# Socket Server Settings #############################
-
- # The address the socket server listens on. It will get the value returned from
- # java.net.InetAddress.getCanonicalHostName() if not configured.
- # FORMAT:
- # listeners = listener_name://host_name:port
- # EXAMPLE:
- # listeners = PLAINTEXT://your.host.name:9092
- #listeners=PLAINTEXT://:9092
-
- # Hostname and port the broker will advertise to producers and consumers. If not set,
- # it uses the value for "listeners" if configured. Otherwise, it will use the value
- # returned from java.net.InetAddress.getCanonicalHostName().
- #advertised.listeners=PLAINTEXT://your.host.name:9092
-
- # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
- #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
- # The number of threads that the server uses for receiving requests from the network and sending responses to the network
- num.network.threads=3
-
- # The number of threads that the server uses for processing requests, which may include disk I/O
- num.io.threads=8
-
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
-
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
-
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
-
-
- ############################# Log Basics #############################
-
- # A comma separated list of directories under which to store log files
- log.dirs=D:\BigData\Kafka-Cluster\kafka-1\logs
-
- # The default number of log partitions per topic. More partitions allow greater
- # parallelism for consumption, but this will also result in more files across
- # the brokers.
- num.partitions=2
-
- # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
- # This value is recommended to be increased for installations with data dirs located in RAID array.
- num.recovery.threads.per.data.dir=1
-
- ############################# Internal Topic Settings #############################
- # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
- # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
-
- ############################# Log Flush Policy #############################
-
- # Messages are immediately written to the filesystem but by default we only fsync() to sync
- # the OS cache lazily. The following configurations control the flush of data to disk.
- # There are a few important trade-offs here:
- # 1. Durability: Unflushed data may be lost if you are not using replication.
- # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
- # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
- # The settings below allow one to configure the flush policy to flush data after a period of time or
- # every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
- # The number of messages to accept before forcing a flush of data to disk
- #log.flush.interval.messages=10000
-
- # The maximum amount of time a message can sit in a log before we force a flush
- #log.flush.interval.ms=1000
-
- ############################# Log Retention Policy #############################
-
- # The following configurations control the disposal of log segments. The policy can
- # be set to delete segments after a period of time, or after a given size has accumulated.
- # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
- # from the end of the log.
-
- # The minimum age of a log file to be eligible for deletion due to age
- log.retention.hours=168
-
- # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
- # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
- #log.retention.bytes=1073741824
-
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
-
- # The interval at which log segments are checked to see if they can be deleted according
- # to the retention policies
- log.retention.check.interval.ms=300000
-
- ############################# Zookeeper #############################
-
- # Zookeeper connection string (see zookeeper docs for details).
- # This is a comma separated host:port pairs, each corresponding to a zk
- # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
- # You can also append an optional chroot string to the urls to specify the
- # root directory for all kafka znodes.
- #zookeeper.connect=localhost:2181
-
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=18000
-
-
- ############################# Group Coordinator Settings #############################
-
- # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
- # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
- # The default value for this is 3 seconds.
- # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
- # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
- group.initial.rebalance.delay.ms=0
- #对应着刚刚配置的zookeeper的三个ip与端口地址
- zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183
第二个节点:server.properties
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- # see kafka.server.KafkaConfig for additional details and defaults
-
- ############################# Server Basics #############################
-
- # The id of the broker. This must be set to a unique integer for each broker.
- broker.id=1
-
- port=9093
- host.name=192.168.1.6
- ############################# Socket Server Settings #############################
-
- # The address the socket server listens on. It will get the value returned from
- # java.net.InetAddress.getCanonicalHostName() if not configured.
- # FORMAT:
- # listeners = listener_name://host_name:port
- # EXAMPLE:
- # listeners = PLAINTEXT://your.host.name:9092
- #listeners=PLAINTEXT://:9092
-
- # Hostname and port the broker will advertise to producers and consumers. If not set,
- # it uses the value for "listeners" if configured. Otherwise, it will use the value
- # returned from java.net.InetAddress.getCanonicalHostName().
- #advertised.listeners=PLAINTEXT://your.host.name:9092
-
- # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
- #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
- # The number of threads that the server uses for receiving requests from the network and sending responses to the network
- num.network.threads=3
-
- # The number of threads that the server uses for processing requests, which may include disk I/O
- num.io.threads=8
-
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
-
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
-
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
-
-
- ############################# Log Basics #############################
-
- # A comma separated list of directories under which to store log files
- log.dirs=D:\BigData\Kafka-Cluster\kafka-2\logs
-
- # The default number of log partitions per topic. More partitions allow greater
- # parallelism for consumption, but this will also result in more files across
- # the brokers.
- num.partitions=2
-
- # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
- # This value is recommended to be increased for installations with data dirs located in RAID array.
- num.recovery.threads.per.data.dir=1
-
- ############################# Internal Topic Settings #############################
- # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
- # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
-
- ############################# Log Flush Policy #############################
-
- # Messages are immediately written to the filesystem but by default we only fsync() to sync
- # the OS cache lazily. The following configurations control the flush of data to disk.
- # There are a few important trade-offs here:
- # 1. Durability: Unflushed data may be lost if you are not using replication.
- # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
- # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
- # The settings below allow one to configure the flush policy to flush data after a period of time or
- # every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
- # The number of messages to accept before forcing a flush of data to disk
- #log.flush.interval.messages=10000
-
- # The maximum amount of time a message can sit in a log before we force a flush
- #log.flush.interval.ms=1000
-
- ############################# Log Retention Policy #############################
-
- # The following configurations control the disposal of log segments. The policy can
- # be set to delete segments after a period of time, or after a given size has accumulated.
- # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
- # from the end of the log.
-
- # The minimum age of a log file to be eligible for deletion due to age
- log.retention.hours=168
-
- # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
- # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
- #log.retention.bytes=1073741824
-
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
-
- # The interval at which log segments are checked to see if they can be deleted according
- # to the retention policies
- log.retention.check.interval.ms=300000
-
- ############################# Zookeeper #############################
-
- # Zookeeper connection string (see zookeeper docs for details).
- # This is a comma separated host:port pairs, each corresponding to a zk
- # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
- # You can also append an optional chroot string to the urls to specify the
- # root directory for all kafka znodes.
- # zookeeper.connect=localhost:2181
-
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=18000
-
-
- ############################# Group Coordinator Settings #############################
-
- # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
- # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
- # The default value for this is 3 seconds.
- # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
- # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
- group.initial.rebalance.delay.ms=0
- #对应着刚刚配置的zookeeper的三个ip与端口地址
- zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183
第三个节点:server.properties
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- # see kafka.server.KafkaConfig for additional details and defaults
-
- ############################# Server Basics #############################
-
- # The id of the broker. This must be set to a unique integer for each broker.
- broker.id=2
-
- port=9094
- host.name=192.168.1.6
- ############################# Socket Server Settings #############################
-
- # The address the socket server listens on. It will get the value returned from
- # java.net.InetAddress.getCanonicalHostName() if not configured.
- # FORMAT:
- # listeners = listener_name://host_name:port
- # EXAMPLE:
- # listeners = PLAINTEXT://your.host.name:9092
- #listeners=PLAINTEXT://:9092
-
- # Hostname and port the broker will advertise to producers and consumers. If not set,
- # it uses the value for "listeners" if configured. Otherwise, it will use the value
- # returned from java.net.InetAddress.getCanonicalHostName().
- #advertised.listeners=PLAINTEXT://your.host.name:9092
-
- # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
- #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
- # The number of threads that the server uses for receiving requests from the network and sending responses to the network
- num.network.threads=3
-
- # The number of threads that the server uses for processing requests, which may include disk I/O
- num.io.threads=8
-
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
-
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
-
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
-
-
- ############################# Log Basics #############################
-
- # A comma separated list of directories under which to store log files
- log.dirs=D:\BigData\Kafka-Cluster\kafka-3\logs
-
- # The default number of log partitions per topic. More partitions allow greater
- # parallelism for consumption, but this will also result in more files across
- # the brokers.
- num.partitions=2
-
- # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
- # This value is recommended to be increased for installations with data dirs located in RAID array.
- num.recovery.threads.per.data.dir=1
-
- ############################# Internal Topic Settings #############################
- # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
- # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
-
- ############################# Log Flush Policy #############################
-
- # Messages are immediately written to the filesystem but by default we only fsync() to sync
- # the OS cache lazily. The following configurations control the flush of data to disk.
- # There are a few important trade-offs here:
- # 1. Durability: Unflushed data may be lost if you are not using replication.
- # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
- # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
- # The settings below allow one to configure the flush policy to flush data after a period of time or
- # every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
- # The number of messages to accept before forcing a flush of data to disk
- #log.flush.interval.messages=10000
-
- # The maximum amount of time a message can sit in a log before we force a flush
- #log.flush.interval.ms=1000
-
- ############################# Log Retention Policy #############################
-
- # The following configurations control the disposal of log segments. The policy can
- # be set to delete segments after a period of time, or after a given size has accumulated.
- # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
- # from the end of the log.
-
- # The minimum age of a log file to be eligible for deletion due to age
- log.retention.hours=168
-
- # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
- # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
- #log.retention.bytes=1073741824
-
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
-
- # The interval at which log segments are checked to see if they can be deleted according
- # to the retention policies
- log.retention.check.interval.ms=300000
-
- ############################# Zookeeper #############################
-
- # Zookeeper connection string (see zookeeper docs for details).
- # This is a comma separated host:port pairs, each corresponding to a zk
- # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
- # You can also append an optional chroot string to the urls to specify the
- # root directory for all kafka znodes.
- # zookeeper.connect=localhost:2181
-
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=18000
-
-
- ############################# Group Coordinator Settings #############################
-
- # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
- # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
- # The default value for this is 3 seconds.
- # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
- # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
- group.initial.rebalance.delay.ms=0
- zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183
3、启动命令
说明:注意路径和写法空格之类的.
kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-1\config\server.properties
第一个节点:

第二个节点:
kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-2\config\server.properties

第三个节点
kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-3\config\server.properties

INFO Cluster ID = -7jJzueoRVav3spVetQ4kw (kafka.server.KafkaServer)
4、创建Topic
kafka-topics.bat --create --zookeeper 192.168.1.6:2181 --replication-factor 2 --partitions 3 --topic cluster_test【三个分区两个副本的Topic】

5、生产者发送消息
kafka-console-producer.bat --broker-list 192.168.1.6:9092,192.168.1.6:9093,192.168.1.6:9094 --topic cluster_test

6、消费者消费消息
kafka-console-consumer.bat --bootstrap-server 192.168.1.6:9092,192.168.1.6:9093,192.168.1.6:9094 --topic cluster_test

遇到问题:kafka经典报错:broker.id doesn‘t match stored broker.id.
解决问题方式:每个节点的日志文件都单独配置目录即可,三个节点都写一个日志文件只能有一个匹配,当然是只有第一个生效了哟.

查询命令:
kafka-topics.bat --describe --zookeeper 192.168.1.6:2181 --topic cluster_test
查看所有Topic:
kafka-topics.bat --zookeeper 192.168.1.6:2181 --list
总结:简单搭建了一下基于Zookeeper的KafkaWindows下的伪集群,本地测试使用即可,一般生产环境都是大型的Kafka集群,broker都在10+,甚至是20+等,有晚上的云监控和告警机制的,完整的鉴权,Kafka3.0逐渐移除了Zookeeper了,搭建更加简单了,元数据有自身管理了.