vim /etc/profile
在最后面添加:
export JAVA_HOME=/app/jdk-12.0.2
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile
这样可以使配置文件立即生效
https://kafka.apache.org/downloads
Kafka依赖于ZooKeeper,所以您需要先启动一个ZooKeeper服务器。如果没有安装,您可以使用随Kafka一起打包的便捷脚本来获取一个快速但是比较粗糙的单节点ZooKeeper实例。
vim /data/kafka_2.12-2.8.0/config/zookeeper.properties
nohup /data/kafka_2.12-2.8.0/bin/zookeeper-server-start.sh /data/kafka_2.12-2.8.0/config/zookeeper.properties &
#查看进程
ps -ef | grep zookeeper
#查看端口
lsof -i:2181
netstat -antp | grep 2181
tail -100f /data/kafka_2.12-2.8.0/logs/server.log
vim /data/kafka_2.12-2.8.0/config/server.properties
# 去掉注释
listeners=PLAINTEXT://:9092
# 去掉注释并修改地址
advertised.listeners=PLAINTEXT://127.0.0.1:9092
nohup /data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties &
/data/kafka_2.12-2.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.12-2.8.0/config/server.properties
#查看进程
ps -ef | grep kafka
#查看端口
lsof -i:9092
netstat -antp | grep 9092
tail -100f /data/kafka_2.12-2.8.0/logs/kafkaServer.out
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 返回上面创建的 test
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
/data/kafka_2.12-2.8.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
/data/kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
{"id":"1"}
/data/kafka_2.12-2.8.0/bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic ProjectBaseInfo --zookeeper localhost:2181
要想查询消费数据,必须要指定组。那么线上运行的kafka有哪些组呢?使用以下命令:
/data/kafka_2.12-2.8.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
/data/kafka_2.12-2.8.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
参数解释:
注意: --group指定的组必须存在才行!可以用上面的–list命令来查看
TOPIC
|
PARTITION
|
CURRENT-OFFSET
|
LOG-END-OFFSET
|
LAG
|
CONSUMER-ID
|
HOST
|
CLIENT-ID
|
---|---|---|---|---|---|---|---|
topic名字 | 分区id | 当前已消费的条数 | 总条数 | 未消费的条数 | 消费id | 主机ip | 客户端id |
将$KAFKA_HOME/config/server.properties文件中的socket.request.max.bytes值重置为超过数据包大小,然后重新启动kafka服务器。将socket.request.max.bytes翻倍。
vim /data/kafka_2.12-2.8.0/config/server.properties
vim /data/kafka_2.12-2.8.0/config/server.properties
message.max.bytes=2073741824
对于已创建的topic,调整max.message.bytes参数方法:
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --config max.message.bytes=2073741824
但是需要注意的是,在这里配置的值应该小于服务端配置的最大值,否则报如下错误
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
max.request.size必须小于message.max.bytes
kafka是由scala和java编写的。所以需要调一些jvm的参数。java的内存分为堆内内存和堆外内存。
-XX:MaxDirectMemorySize加大,该参数默认是64M,可以根据需求调大,
如果没找到,就在KAFKA_JVM_PERFORMANCE_OPTS添加,
并且检查JVM参数里面有无:-XX:+DisableExplicitGC,如果有就去掉。
因为kafka的网络IO使用了java的nio中的DirectMemory的方式,而这个申请的是堆外内存。
bin目录下的kafka-run-class.sh中需要配置的参数,找到 Memory options处,默认设置是256M,将其修改为如下值:
修改前:
vim /data/kafka_2.12-2.8.0/bin/kafka-run-class.sh
-Xms2048m -Xmx2048m -XX:MaxDirectMemorySize=5120m
修改后: