一、准备
确保服务器上已经搭建完成JDK,zookeeper服务;
如果未搭建完成,请移步参考以下文章:
zookeeper官网地址:Apache ZooKeeper
下载安装方式
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
由于要安装在远程服务器上,故采用第一种方式安装;
注意:要下载源码包,否则启动客户端是启动失败;后缀带-bin.tar.gz
启动报错如下:
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... FAILED TO START
看下zookeeper日志文件具体报错信息:
错误: 找不到或无法加载主类 org.apache.zookeeper.ZooKeeperMain
创建和解压
tar -zxvf apache-zookeeper-3.5.9.tar.gz
重命名:mv apache-zookeeper-3.5.9 zookeeper
创建数据存储目录与日志目录
进入zookeeper解压缩后的目录,新建数据文件夹dataDir和日志文件夹dataLogDir
命令:mkdir dataDir和mkdir dataLogDir
conf配置文件
进入配置目录,赋值拷贝样本文件
命令:cp zoo_sample.cfg zoo.cfg
修改 zoo.cfg文件内容
1.修改数据存储文件地址,按照上面建立的目录,小编的如下/opt/software/zookeeper/dataLogDir
- [root@localhost kafka-logs]# cat /opt/zk/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
- # The number of milliseconds of each tick
- tickTime=2000 #服务心跳时间
- # The number of ticks that the initial
- # synchronization phase can take
- initLimit=10 #投票选举新leader的初始化时间
- # The number of ticks that can pass between
- # sending a request and getting an acknowledgement
- syncLimit=5 #leader与follower心跳检测最大容忍时间
- # the directory where the snapshot is stored.
- # do not use /tmp for storage, /tmp here is just
- # example sakes.
- dataDir=/opt/zk/apache-zookeeper-3.8.0-bin/data #数据目录
- dataLogDir=/opt/zk/apache-zookeeper-3.8.0-bin/log #日志目录
- pidfile=/var/run/zookeeper.pid
- # the port at which the clients will connect
- clientPort=2181 #zk端口
- # the maximum number of client connections.
- # increase this if you need to handle more clients
- #maxClientCnxns=60
- #
- # Be sure to read the maintenance section of the
- # administrator guide before turning on autopurge.
- #
- # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- #
- # The number of snapshots to retain in dataDir
- #autopurge.snapRetainCount=3
- # Purge task interval in hours
- # Set to "0" to disable auto purge feature
- #autopurge.purgeInterval=1
-
- ## Metrics Providers
- #
- # https://prometheus.io Metrics Exporter
- #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
- #metricsProvider.httpHost=0.0.0.0
- #metricsProvider.httpPort=7000
- #metricsProvider.exportJvmInfo=true
配置环境变量
vim /etc/profile- #ZOOKEEPER_HOME
- export ZOOKEEPER_HOME=/opt/software/zookeeper
- export PATH=$ZOOKEEPER_HOME/bin:$PATH
3.重新加载配置:source /etc/profile
./zkServer.sh start./zkServer.sh stop./zkServer.sh status
kafka官网下载:Apache Download Mirrors
解压缩:tar -zxvf kafka_2.11-2.2.0.tgz
重命名:mv kafka_2.11-2.2.0 kafka
- [root@localhost kafa]# ls -al
- 总用量 164020
- drwxr-xr-x. 4 root root 95 8月 16 08:26 .
- drwxr-xr-x. 8 root root 82 5月 18 16:09 ..
- drwxr-xr-x. 8 root root 119 8月 16 08:33 kafka
- -rw-r--r--. 1 root root 63999924 8月 15 17:22 kafka_2.11-2.2.0.tgz
- -rw-r--r--. 1 root root 103956099 8月 15 16:03 kafka_2.13-3.2.1.tgz
创建logs文件,用于存储kafka日志:
在kafka安装目录下创建:mkdir kafka-logs
/opt/kafka/kafka-logs
修改server.properties配置文件
- [root@localhost kafka]# cat config/server.properties | grep -v "#" | grep -v "^$"
- broker.id=1
- listeners=PLAINTEXT://0.0.0.0:9092
- advertised.listeners=PLAINTEXT://192.168.1.36:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/opt/kafa/kafka/kafka-logs
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- zookeeper.connect=localhost:2181
- zookeeper.connection.timeout.ms=6000
- group.initial.rebalance.delay.ms=0
解释:
- #一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
- broker.id=1
-
- #listeners=PLAINTEXT://:9092
-
- #advertised.listeners=PLAINTEXT://your.host.name:9092
-
- #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS
-
- # broker 处理消息的最大线程数,一般情况下不需要去修改
- num.network.threads=3
-
- # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
- num.io.threads=8
-
- # socket的发送缓冲区(SO_SNDBUF)
- socket.send.buffer.bytes=102400
-
- # socket的接收缓冲区 (SO_RCVBUF)
- socket.receive.buffer.bytes=102400
-
- # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
- socket.request.max.bytes=104857600
-
- #kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
- log.dirs=/opt/software/kafka/kafka-logs
- # 每个topic的分区个数,更多的partition会产生更多的segment file
- num.partitions=1
-
- num.recovery.threads.per.data.dir=1
-
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
-
- # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
- #log.flush.interval.messages=10000
-
- # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
- #log.flush.interval.ms=1000
-
- # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
- log.retention.hours=168
-
- #log.retention.bytes=1073741824
-
- # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
- log.segment.bytes=1073741824
-
- # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
- log.retention.check.interval.ms=300000
-
- # Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:prot
- zookeeper.connect=localhost:2181
-
- # 连接zk的超时时间
- zookeeper.connection.timeout.ms=6000
-
- # ZooKeeper集群中leader和follower之间的同步实际
- group.initial.rebalance.delay.ms=0
四、运行
由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;
如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:
使用安装包中的脚本启动单节点Zookeeper 实例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
注意:以下命令,小编均选择在kafka安装根目录下执行;
以下方式任选其一
kafka目录启动
使用kafka-server-start.sh 启动kafka 服务:
bin/kafka-server-start.sh config/server.properties
这种命令执行并不是后台进程运行,故使用以下命令
bin/kafka-server-start.sh -daemon config/server.properties
命令需要切换到kafka的bin目录下,
./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties
- [root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic wubo
- Created topic wubo.
- [root@localhost kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic wubo
- Topic:wubo PartitionCount:5 ReplicationFactor:1 Configs:
- Topic: wubo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- Topic: wubo Partition: 1 Leader: 1 Replicas: 1 Isr: 1
- Topic: wubo Partition: 2 Leader: 1 Replicas: 1 Isr: 1
- Topic: wubo Partition: 3 Leader: 1 Replicas: 1 Isr: 1
- Topic: wubo Partition: 4 Leader: 1 Replicas: 1 Isr: 1
说明:
第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。
- [root@localhost kafka]# kafka-topics.sh --list --zookeeper localhost:2181
- __consumer_offsets
- jettech
- test
- wubo
- [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic wubo
- >"jettech hello world"
- >^C[root@localhost kafka]#
使用Ctrl+C退出生成消息;
使用kafka-console-consumer.sh 接收消息并在终端打印
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)
- [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wubo --from-beginning
- "jettech hello world"
kafka-topics.sh --delete --zookeeper localhost:2181 --topic wubo