• Windows下Kafka集群搭建


    一、Windows下Zookeeper集群搭建.

    1、集群版本:2.8.1,3.0版本不再支持JDK8,不在需要Zookeeper.

    2、Zookeeper版本:3.8.0.

    3、Zookeeper三个节点配置文件以及环境搭建.

      三个端口分别是 :2181、2182、2183.

     依次建立三个保存id的文件myid文件,不需要任何后缀.

     D:\BigData\apache-zookeeper-3.8.0-bin\data-file\data1

    分别是数字1、2、3

     

     

     

      ①、zoo-1.cfg

    1. # 存放数据
    2. dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data1
    3. #存放日志
    4. dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log1
    5. # the port at which the clients will connect
    6. # 监听端口ifconf
    7. clientPort=2181
    8. # 配置集群服务,这里每个文件都一样
    9. server.1=192.168.1.6:2881:3881
    10. server.2=192.168.1.6:2882:3882
    11. server.3=192.168.1.6:2883:3883
    12. # The number of milliseconds of each tick
    13. tickTime=2000
    14. # The number of ticks that the initial
    15. # synchronization phase can take
    16. initLimit=10
    17. # The number of ticks that can pass between
    18. # sending a request and getting an acknowledgement
    19. syncLimit=5
    20. #
    21. # Be sure to read the maintenance section of the
    22. # administrator guide before turning on autopurge.
    23. #
    24. # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    25. #
    26. # The number of snapshots to retain in dataDir
    27. #autopurge.snapRetainCount=3
    28. # Purge task interval in hours
    29. # Set to "0" to disable auto purge feature
    30. #autopurge.purgeInterval=1
    31. ## Metrics Providers
    32. #
    33. # https://prometheus.io Metrics Exporter
    34. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    35. #metricsProvider.httpHost=0.0.0.0
    36. #metricsProvider.httpPort=7000
    37. #metricsProvider.exportJvmInfo=true

     ②、zoo-2.cfg

    1. # 存放数据
    2. dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data2
    3. #存放日志
    4. dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log2
    5. # the port at which the clients will connect
    6. # 监听端口ifconf
    7. clientPort=2182
    8. # 配置集群服务,这里每个文件都一样
    9. server.1=192.168.1.6:2881:3881
    10. server.2=192.168.1.6:2882:3882
    11. server.3=192.168.1.6:2883:3883
    12. # The number of milliseconds of each tick
    13. tickTime=2000
    14. # The number of ticks that the initial
    15. # synchronization phase can take
    16. initLimit=10
    17. # The number of ticks that can pass between
    18. # sending a request and getting an acknowledgement
    19. syncLimit=5
    20. #
    21. # Be sure to read the maintenance section of the
    22. # administrator guide before turning on autopurge.
    23. #
    24. # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    25. #
    26. # The number of snapshots to retain in dataDir
    27. #autopurge.snapRetainCount=3
    28. # Purge task interval in hours
    29. # Set to "0" to disable auto purge feature
    30. #autopurge.purgeInterval=1
    31. ## Metrics Providers
    32. #
    33. # https://prometheus.io Metrics Exporter
    34. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    35. #metricsProvider.httpHost=0.0.0.0
    36. #metricsProvider.httpPort=7000
    37. #metricsProvider.exportJvmInfo=true

    ③、zoo-3.cfg

    1. # 存放数据
    2. dataDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\data3
    3. #存放日志
    4. dataLogDir=D:\\BigData\\apache-zookeeper-3.8.0-bin\\data-file\\log3
    5. # the port at which the clients will connect
    6. # 监听端口ifconf
    7. clientPort=2183
    8. # 配置集群服务,这里每个文件都一样
    9. server.1=192.168.1.6:2881:3881
    10. server.2=192.168.1.6:2882:3882
    11. server.3=192.168.1.6:2883:3883
    12. # The number of milliseconds of each tick
    13. tickTime=2000
    14. # The number of ticks that the initial
    15. # synchronization phase can take
    16. initLimit=10
    17. # The number of ticks that can pass between
    18. # sending a request and getting an acknowledgement
    19. syncLimit=5
    20. #
    21. # Be sure to read the maintenance section of the
    22. # administrator guide before turning on autopurge.
    23. #
    24. # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    25. #
    26. # The number of snapshots to retain in dataDir
    27. #autopurge.snapRetainCount=3
    28. # Purge task interval in hours
    29. # Set to "0" to disable auto purge feature
    30. #autopurge.purgeInterval=1
    31. ## Metrics Providers
    32. #
    33. # https://prometheus.io Metrics Exporter
    34. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    35. #metricsProvider.httpHost=0.0.0.0
    36. #metricsProvider.httpPort=7000
    37. #metricsProvider.exportJvmInfo=true

    ④、对应数据目录和日志目录 文件 

    ⑤、配置三个启动脚本.

           zkServer1.cmd和zkServer2.cmd和zkServer3.cmd配置如下

    1. @echo off
    2. REM Licensed to the Apache Software Foundation (ASF) under one or more
    3. REM contributor license agreements. See the NOTICE file distributed with
    4. REM this work for additional information regarding copyright ownership.
    5. REM The ASF licenses this file to You under the Apache License, Version 2.0
    6. REM (the "License"); you may not use this file except in compliance with
    7. REM the License. You may obtain a copy of the License at
    8. REM
    9. REM http://www.apache.org/licenses/LICENSE-2.0
    10. REM
    11. REM Unless required by applicable law or agreed to in writing, software
    12. REM distributed under the License is distributed on an "AS IS" BASIS,
    13. REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14. REM See the License for the specific language governing permissions and
    15. REM limitations under the License.
    16. setlocal
    17. call "%~dp0zkEnv.cmd"
    18. set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
    19. set ZOOCFG=D:\\BigData\\apache-zookeeper-3.8.0-bin\\conf\\zoo-1.cfg
    20. set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log
    21. echo on
    22. call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
    23. endlocal

      ⑥、启动Zookeeper节点.

     依次点击三个启动cmd命令即可.

    第一个节点.

    第二个节点.

     第三个节点.

     二、Windows下Kafka集群搭建.

    1、复制搭建目录以及目录规划.

    D:\BigData\Kafka-Cluster\kafka-1

     2、配置文件.

     第一个节点:server.properties

    1. # Licensed to the Apache Software Foundation (ASF) under one or more
    2. # contributor license agreements. See the NOTICE file distributed with
    3. # this work for additional information regarding copyright ownership.
    4. # The ASF licenses this file to You under the Apache License, Version 2.0
    5. # (the "License"); you may not use this file except in compliance with
    6. # the License. You may obtain a copy of the License at
    7. #
    8. # http://www.apache.org/licenses/LICENSE-2.0
    9. #
    10. # Unless required by applicable law or agreed to in writing, software
    11. # distributed under the License is distributed on an "AS IS" BASIS,
    12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. # See the License for the specific language governing permissions and
    14. # limitations under the License.
    15. # see kafka.server.KafkaConfig for additional details and defaults
    16. ############################# Server Basics #############################
    17. # The id of the broker. This must be set to a unique integer for each broker.
    18. broker.id=0
    19. port=9092
    20. host.name=192.168.1.6
    21. ############################# Socket Server Settings #############################
    22. # The address the socket server listens on. It will get the value returned from
    23. # java.net.InetAddress.getCanonicalHostName() if not configured.
    24. # FORMAT:
    25. # listeners = listener_name://host_name:port
    26. # EXAMPLE:
    27. # listeners = PLAINTEXT://your.host.name:9092
    28. #listeners=PLAINTEXT://:9092
    29. # Hostname and port the broker will advertise to producers and consumers. If not set,
    30. # it uses the value for "listeners" if configured. Otherwise, it will use the value
    31. # returned from java.net.InetAddress.getCanonicalHostName().
    32. #advertised.listeners=PLAINTEXT://your.host.name:9092
    33. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    34. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    35. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    36. num.network.threads=3
    37. # The number of threads that the server uses for processing requests, which may include disk I/O
    38. num.io.threads=8
    39. # The send buffer (SO_SNDBUF) used by the socket server
    40. socket.send.buffer.bytes=102400
    41. # The receive buffer (SO_RCVBUF) used by the socket server
    42. socket.receive.buffer.bytes=102400
    43. # The maximum size of a request that the socket server will accept (protection against OOM)
    44. socket.request.max.bytes=104857600
    45. ############################# Log Basics #############################
    46. # A comma separated list of directories under which to store log files
    47. log.dirs=D:\BigData\Kafka-Cluster\kafka-1\logs
    48. # The default number of log partitions per topic. More partitions allow greater
    49. # parallelism for consumption, but this will also result in more files across
    50. # the brokers.
    51. num.partitions=2
    52. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    53. # This value is recommended to be increased for installations with data dirs located in RAID array.
    54. num.recovery.threads.per.data.dir=1
    55. ############################# Internal Topic Settings #############################
    56. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    57. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    58. offsets.topic.replication.factor=1
    59. transaction.state.log.replication.factor=1
    60. transaction.state.log.min.isr=1
    61. ############################# Log Flush Policy #############################
    62. # Messages are immediately written to the filesystem but by default we only fsync() to sync
    63. # the OS cache lazily. The following configurations control the flush of data to disk.
    64. # There are a few important trade-offs here:
    65. # 1. Durability: Unflushed data may be lost if you are not using replication.
    66. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    67. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    68. # The settings below allow one to configure the flush policy to flush data after a period of time or
    69. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    70. # The number of messages to accept before forcing a flush of data to disk
    71. #log.flush.interval.messages=10000
    72. # The maximum amount of time a message can sit in a log before we force a flush
    73. #log.flush.interval.ms=1000
    74. ############################# Log Retention Policy #############################
    75. # The following configurations control the disposal of log segments. The policy can
    76. # be set to delete segments after a period of time, or after a given size has accumulated.
    77. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    78. # from the end of the log.
    79. # The minimum age of a log file to be eligible for deletion due to age
    80. log.retention.hours=168
    81. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    82. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    83. #log.retention.bytes=1073741824
    84. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    85. log.segment.bytes=1073741824
    86. # The interval at which log segments are checked to see if they can be deleted according
    87. # to the retention policies
    88. log.retention.check.interval.ms=300000
    89. ############################# Zookeeper #############################
    90. # Zookeeper connection string (see zookeeper docs for details).
    91. # This is a comma separated host:port pairs, each corresponding to a zk
    92. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    93. # You can also append an optional chroot string to the urls to specify the
    94. # root directory for all kafka znodes.
    95. #zookeeper.connect=localhost:2181
    96. # Timeout in ms for connecting to zookeeper
    97. zookeeper.connection.timeout.ms=18000
    98. ############################# Group Coordinator Settings #############################
    99. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    100. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    101. # The default value for this is 3 seconds.
    102. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    103. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    104. group.initial.rebalance.delay.ms=0
    105. #对应着刚刚配置的zookeeper的三个ip与端口地址
    106. zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183

    第二个节点:server.properties

    1. # Licensed to the Apache Software Foundation (ASF) under one or more
    2. # contributor license agreements. See the NOTICE file distributed with
    3. # this work for additional information regarding copyright ownership.
    4. # The ASF licenses this file to You under the Apache License, Version 2.0
    5. # (the "License"); you may not use this file except in compliance with
    6. # the License. You may obtain a copy of the License at
    7. #
    8. # http://www.apache.org/licenses/LICENSE-2.0
    9. #
    10. # Unless required by applicable law or agreed to in writing, software
    11. # distributed under the License is distributed on an "AS IS" BASIS,
    12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. # See the License for the specific language governing permissions and
    14. # limitations under the License.
    15. # see kafka.server.KafkaConfig for additional details and defaults
    16. ############################# Server Basics #############################
    17. # The id of the broker. This must be set to a unique integer for each broker.
    18. broker.id=1
    19. port=9093
    20. host.name=192.168.1.6
    21. ############################# Socket Server Settings #############################
    22. # The address the socket server listens on. It will get the value returned from
    23. # java.net.InetAddress.getCanonicalHostName() if not configured.
    24. # FORMAT:
    25. # listeners = listener_name://host_name:port
    26. # EXAMPLE:
    27. # listeners = PLAINTEXT://your.host.name:9092
    28. #listeners=PLAINTEXT://:9092
    29. # Hostname and port the broker will advertise to producers and consumers. If not set,
    30. # it uses the value for "listeners" if configured. Otherwise, it will use the value
    31. # returned from java.net.InetAddress.getCanonicalHostName().
    32. #advertised.listeners=PLAINTEXT://your.host.name:9092
    33. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    34. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    35. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    36. num.network.threads=3
    37. # The number of threads that the server uses for processing requests, which may include disk I/O
    38. num.io.threads=8
    39. # The send buffer (SO_SNDBUF) used by the socket server
    40. socket.send.buffer.bytes=102400
    41. # The receive buffer (SO_RCVBUF) used by the socket server
    42. socket.receive.buffer.bytes=102400
    43. # The maximum size of a request that the socket server will accept (protection against OOM)
    44. socket.request.max.bytes=104857600
    45. ############################# Log Basics #############################
    46. # A comma separated list of directories under which to store log files
    47. log.dirs=D:\BigData\Kafka-Cluster\kafka-2\logs
    48. # The default number of log partitions per topic. More partitions allow greater
    49. # parallelism for consumption, but this will also result in more files across
    50. # the brokers.
    51. num.partitions=2
    52. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    53. # This value is recommended to be increased for installations with data dirs located in RAID array.
    54. num.recovery.threads.per.data.dir=1
    55. ############################# Internal Topic Settings #############################
    56. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    57. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    58. offsets.topic.replication.factor=1
    59. transaction.state.log.replication.factor=1
    60. transaction.state.log.min.isr=1
    61. ############################# Log Flush Policy #############################
    62. # Messages are immediately written to the filesystem but by default we only fsync() to sync
    63. # the OS cache lazily. The following configurations control the flush of data to disk.
    64. # There are a few important trade-offs here:
    65. # 1. Durability: Unflushed data may be lost if you are not using replication.
    66. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    67. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    68. # The settings below allow one to configure the flush policy to flush data after a period of time or
    69. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    70. # The number of messages to accept before forcing a flush of data to disk
    71. #log.flush.interval.messages=10000
    72. # The maximum amount of time a message can sit in a log before we force a flush
    73. #log.flush.interval.ms=1000
    74. ############################# Log Retention Policy #############################
    75. # The following configurations control the disposal of log segments. The policy can
    76. # be set to delete segments after a period of time, or after a given size has accumulated.
    77. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    78. # from the end of the log.
    79. # The minimum age of a log file to be eligible for deletion due to age
    80. log.retention.hours=168
    81. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    82. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    83. #log.retention.bytes=1073741824
    84. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    85. log.segment.bytes=1073741824
    86. # The interval at which log segments are checked to see if they can be deleted according
    87. # to the retention policies
    88. log.retention.check.interval.ms=300000
    89. ############################# Zookeeper #############################
    90. # Zookeeper connection string (see zookeeper docs for details).
    91. # This is a comma separated host:port pairs, each corresponding to a zk
    92. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    93. # You can also append an optional chroot string to the urls to specify the
    94. # root directory for all kafka znodes.
    95. # zookeeper.connect=localhost:2181
    96. # Timeout in ms for connecting to zookeeper
    97. zookeeper.connection.timeout.ms=18000
    98. ############################# Group Coordinator Settings #############################
    99. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    100. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    101. # The default value for this is 3 seconds.
    102. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    103. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    104. group.initial.rebalance.delay.ms=0
    105. #对应着刚刚配置的zookeeper的三个ip与端口地址
    106. zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183

    第三个节点:server.properties

    1. # Licensed to the Apache Software Foundation (ASF) under one or more
    2. # contributor license agreements. See the NOTICE file distributed with
    3. # this work for additional information regarding copyright ownership.
    4. # The ASF licenses this file to You under the Apache License, Version 2.0
    5. # (the "License"); you may not use this file except in compliance with
    6. # the License. You may obtain a copy of the License at
    7. #
    8. # http://www.apache.org/licenses/LICENSE-2.0
    9. #
    10. # Unless required by applicable law or agreed to in writing, software
    11. # distributed under the License is distributed on an "AS IS" BASIS,
    12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. # See the License for the specific language governing permissions and
    14. # limitations under the License.
    15. # see kafka.server.KafkaConfig for additional details and defaults
    16. ############################# Server Basics #############################
    17. # The id of the broker. This must be set to a unique integer for each broker.
    18. broker.id=2
    19. port=9094
    20. host.name=192.168.1.6
    21. ############################# Socket Server Settings #############################
    22. # The address the socket server listens on. It will get the value returned from
    23. # java.net.InetAddress.getCanonicalHostName() if not configured.
    24. # FORMAT:
    25. # listeners = listener_name://host_name:port
    26. # EXAMPLE:
    27. # listeners = PLAINTEXT://your.host.name:9092
    28. #listeners=PLAINTEXT://:9092
    29. # Hostname and port the broker will advertise to producers and consumers. If not set,
    30. # it uses the value for "listeners" if configured. Otherwise, it will use the value
    31. # returned from java.net.InetAddress.getCanonicalHostName().
    32. #advertised.listeners=PLAINTEXT://your.host.name:9092
    33. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    34. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    35. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    36. num.network.threads=3
    37. # The number of threads that the server uses for processing requests, which may include disk I/O
    38. num.io.threads=8
    39. # The send buffer (SO_SNDBUF) used by the socket server
    40. socket.send.buffer.bytes=102400
    41. # The receive buffer (SO_RCVBUF) used by the socket server
    42. socket.receive.buffer.bytes=102400
    43. # The maximum size of a request that the socket server will accept (protection against OOM)
    44. socket.request.max.bytes=104857600
    45. ############################# Log Basics #############################
    46. # A comma separated list of directories under which to store log files
    47. log.dirs=D:\BigData\Kafka-Cluster\kafka-3\logs
    48. # The default number of log partitions per topic. More partitions allow greater
    49. # parallelism for consumption, but this will also result in more files across
    50. # the brokers.
    51. num.partitions=2
    52. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    53. # This value is recommended to be increased for installations with data dirs located in RAID array.
    54. num.recovery.threads.per.data.dir=1
    55. ############################# Internal Topic Settings #############################
    56. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    57. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    58. offsets.topic.replication.factor=1
    59. transaction.state.log.replication.factor=1
    60. transaction.state.log.min.isr=1
    61. ############################# Log Flush Policy #############################
    62. # Messages are immediately written to the filesystem but by default we only fsync() to sync
    63. # the OS cache lazily. The following configurations control the flush of data to disk.
    64. # There are a few important trade-offs here:
    65. # 1. Durability: Unflushed data may be lost if you are not using replication.
    66. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    67. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    68. # The settings below allow one to configure the flush policy to flush data after a period of time or
    69. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    70. # The number of messages to accept before forcing a flush of data to disk
    71. #log.flush.interval.messages=10000
    72. # The maximum amount of time a message can sit in a log before we force a flush
    73. #log.flush.interval.ms=1000
    74. ############################# Log Retention Policy #############################
    75. # The following configurations control the disposal of log segments. The policy can
    76. # be set to delete segments after a period of time, or after a given size has accumulated.
    77. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    78. # from the end of the log.
    79. # The minimum age of a log file to be eligible for deletion due to age
    80. log.retention.hours=168
    81. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    82. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    83. #log.retention.bytes=1073741824
    84. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    85. log.segment.bytes=1073741824
    86. # The interval at which log segments are checked to see if they can be deleted according
    87. # to the retention policies
    88. log.retention.check.interval.ms=300000
    89. ############################# Zookeeper #############################
    90. # Zookeeper connection string (see zookeeper docs for details).
    91. # This is a comma separated host:port pairs, each corresponding to a zk
    92. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    93. # You can also append an optional chroot string to the urls to specify the
    94. # root directory for all kafka znodes.
    95. # zookeeper.connect=localhost:2181
    96. # Timeout in ms for connecting to zookeeper
    97. zookeeper.connection.timeout.ms=18000
    98. ############################# Group Coordinator Settings #############################
    99. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    100. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    101. # The default value for this is 3 seconds.
    102. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    103. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    104. group.initial.rebalance.delay.ms=0
    105. zookeeper.connect=192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183

    3、启动命令

    说明:注意路径和写法空格之类的.

    kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-1\config\server.properties

    第一个节点:

    第二个节点:

    kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-2\config\server.properties

     第三个节点

    kafka-server-start.bat D:\BigData\Kafka-Cluster\kafka-3\config\server.properties

    INFO Cluster ID = -7jJzueoRVav3spVetQ4kw (kafka.server.KafkaServer) 

    4、创建Topic

    kafka-topics.bat --create --zookeeper 192.168.1.6:2181 --replication-factor 2 --partitions 3 --topic cluster_test【三个分区两个副本的Topic】

     5、生产者发送消息

    kafka-console-producer.bat --broker-list 192.168.1.6:9092,192.168.1.6:9093,192.168.1.6:9094 --topic cluster_test 

    6、消费者消费消息

    kafka-console-consumer.bat --bootstrap-server 192.168.1.6:9092,192.168.1.6:9093,192.168.1.6:9094 --topic cluster_test

    遇到问题:kafka经典报错:broker.id doesn‘t match stored broker.id.

    解决问题方式:每个节点的日志文件都单独配置目录即可,三个节点都写一个日志文件只能有一个匹配,当然是只有第一个生效了哟.

     查询命令:

    kafka-topics.bat --describe --zookeeper 192.168.1.6:2181 --topic cluster_test

    查看所有Topic:

    kafka-topics.bat --zookeeper 192.168.1.6:2181 --list

    总结:简单搭建了一下基于Zookeeper的KafkaWindows下的伪集群,本地测试使用即可,一般生产环境都是大型的Kafka集群,broker都在10+,甚至是20+等,有晚上的云监控和告警机制的,完整的鉴权,Kafka3.0逐渐移除了Zookeeper了,搭建更加简单了,元数据有自身管理了.

  • 相关阅读:
    C# 中的 List<int> 和 int[] ?
    Java数组
    在 Linux 上把 Vim 配置为默认编辑器
    加油站智能视频监控系统方案
    数字先锋 | 随时随地云端阅片,“云胶片”时代来啦!
    docker安装与部署
    代码随想录第二十九天打卡| 491.递增子序列,46.全排列,47.全排列 II
    tkinter绘制组件(37)——普通图片
    每个企业家都需要知道的 5 种个人品牌策略
    实现阿里云模型服务灵积 DashScope 的 Semantic Kernel Connector
  • 原文地址:https://blog.csdn.net/HcJsJqJSSM/article/details/126752143