在页面查看:点击具有Kafka服务的主机,然后点击组件
在服务器查看
如图2.11是Scala版本,2.2.1是Kafka版本
如果不知道CDH装哪,就把/opt/cloudera换成/
find /opt/cloudera -name \*kafka_\* | grep '\.jar'
kafka-topics --create \
--bootstrap-server hadoop105:9092 \
--replication-factor 副本数 \
--partitions 分区数 \
--topic 主题名称
注:
若使用--bootstrap-server hadoop105:9092创建,则消费者偏移量保存在Kafka中
若使用--zookeeper hadoop105:2181/kafka创建,则消费者偏移量保存在ZooKeeper中
kafka-topics --list --bootstrap-server hadoop105:9092
或
kafka-topics --list --zookeeper hadoop105:2181/kafka
kafka-topics --describe \
--zookeeper hadoop105:2181/kafka \
--topic 主题名称
生产
kafka-console-producer \
--broker-list Kafka地址:9092 \
--topic 主题名称
消费(--from-beginning会把主题以往所有数据都读取出来)
kafka-console-consumer \
--bootstrap-server Kafka地址:9092 \
--topic 主题名称 \
--from-beginning \
--group 消费者组名称
kafka-topics --create \
--replication-factor 1 \
--partitions 1 \
--bootstrap-server hadoop105:9092 \
--topic StressTesting
| 生产者参数 | 描述 | 备注 |
|---|---|---|
--throughput | 最大消息吞吐量限制,-1不限制 | messages/sec |
--print-metrics | 打印指标 | 默认不打印 |
--num-records | 生产多少条消息 | |
--record-size | 每条消息的大小 | bytes |
--producer-props | 生产者相关配置,如服务器地址 | 优先级高于--producer.config |
--producer.config | 生产者配置文件路径 |
kafka-producer-perf-test \
--throughput -1 \
--print-metrics \
--num-records 1000000 \
--record-size 1024 \
--producer-props bootstrap.servers=hadoop105:9092 \
--print-metrics \
--topic StressTesting
某写测试结果
| records sent | records/sec | MB/sec | avg latency | max latency |
|---|---|---|---|---|
| 1000000 | 24396.789383 | 23.82 | 1232.27 ms | 2096.00 ms |
| 消费者参数 | 描述 |
|---|---|
--messages | 消费的消息数量 |
kafka-consumer-perf-test \
--print-metrics \
--messages 1000000 \
--broker-list hadoop105:9092 \
--topic StressTesting
某读测试结果
| data.consumed.in.MB | MB.sec | data.consumed.in.nMsg | nMsg.sec | rebalance.time.ms | fetch.time.ms | fetch.MB.sec | fetch.nMsg.sec |
|---|---|---|---|---|---|---|---|
| 976.5625 | 56.8695 | 1000000 | 58234.3350 | 3087 | 14085 | 69.3335 | 70997.5151 |
broker_max_heap_size
建议给4~6G每个Broker
log.dirs
default.replication.factor
建议>1
log.retention
log.retention.ms:日志保留时长,设-1时无限制,建议3天
log.retention.bytes:每个主题分区保留在日志中的数据量,设-1时无限制
log.retention.check.interval.ms:如图示,每5分钟,日志清理程序 会根据日志保留策略 来清理符合删除条件的日志
可接收的单个消息的最大值(
message.max.bytes)
replica.fetch.max.bytes应大于message.max.bytes
按需调大
| 配置 | 说明 | 备注 |
|---|---|---|
delete.topic.enable | 是否允许删除主题 | CDH的Kafka默认启用,不用改 |
broker.id | broker 编号,唯一的,每台机都不同 | 通常不用改 |
num.network.threads | 处理网络请求的线程数 | |
num.io.threads | 处理网络请求的线程数 | |
num.partitions | 每个主题的默认分区数 | 默认1不用改 |
log.segment.bytes | 日志片段最大字节数 | 必须大于message.max.bytes |
