• centos7系统安装kafka


    一、准备
    确保服务器上已经搭建完成JDK,zookeeper服务;
    如果未搭建完成,请移步参考以下文章:


    安装zookeeper:

    1.准备

    zookeeper官网地址:Apache ZooKeeper

    下载安装方式

    • 使用wget命令行下载
      wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

    • 采用下载安装包的方式:Index of /zookeeper/stable

    由于要安装在远程服务器上,故采用第一种方式安装;

    注意:要下载源码包,否则启动客户端是启动失败;后缀带-bin.tar.gz
    启动报错如下:
    ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... FAILED TO START
    看下zookeeper日志文件具体报错信息:
    错误: 找不到或无法加载主类 org.apache.zookeeper.ZooKeeperMain

    2、安装与配置

    创建和解压
    tar -zxvf apache-zookeeper-3.5.9.tar.gz
    重命名:mv apache-zookeeper-3.5.9 zookeeper

    创建数据存储目录与日志目录
    进入zookeeper解压缩后的目录,新建数据文件夹dataDir和日志文件夹dataLogDir
    命令:mkdir dataDir和mkdir dataLogDir

    conf配置文件
    进入配置目录,赋值拷贝样本文件
    命令:cp zoo_sample.cfg zoo.cfg

    修改 zoo.cfg文件内容
    1.修改数据存储文件地址,按照上面建立的目录,小编的如下/opt/software/zookeeper/dataLogDir
     

    1. [root@localhost kafka-logs]# cat /opt/zk/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
    2. # The number of milliseconds of each tick
    3. tickTime=2000 #服务心跳时间
    4. # The number of ticks that the initial
    5. # synchronization phase can take
    6. initLimit=10 #投票选举新leader的初始化时间
    7. # The number of ticks that can pass between
    8. # sending a request and getting an acknowledgement
    9. syncLimit=5 #leader与follower心跳检测最大容忍时间
    10. # the directory where the snapshot is stored.
    11. # do not use /tmp for storage, /tmp here is just
    12. # example sakes.
    13. dataDir=/opt/zk/apache-zookeeper-3.8.0-bin/data #数据目录
    14. dataLogDir=/opt/zk/apache-zookeeper-3.8.0-bin/log #日志目录
    15. pidfile=/var/run/zookeeper.pid
    16. # the port at which the clients will connect
    17. clientPort=2181 #zk端口
    18. # the maximum number of client connections.
    19. # increase this if you need to handle more clients
    20. #maxClientCnxns=60
    21. #
    22. # Be sure to read the maintenance section of the
    23. # administrator guide before turning on autopurge.
    24. #
    25. # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    26. #
    27. # The number of snapshots to retain in dataDir
    28. #autopurge.snapRetainCount=3
    29. # Purge task interval in hours
    30. # Set to "0" to disable auto purge feature
    31. #autopurge.purgeInterval=1
    32. ## Metrics Providers
    33. #
    34. # https://prometheus.io Metrics Exporter
    35. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    36. #metricsProvider.httpHost=0.0.0.0
    37. #metricsProvider.httpPort=7000
    38. #metricsProvider.exportJvmInfo=true
    • 配置环境变量

    1. 打开/etc/profile 文件:vim /etc/profile
    2. 新增以下配置:
    1. #ZOOKEEPER_HOME
    2. export ZOOKEEPER_HOME=/opt/software/zookeeper
    3. export PATH=$ZOOKEEPER_HOME/bin:$PATH

    3.重新加载配置:source /etc/profile

    3、运行

    • 启动命令:./zkServer.sh start
    • 停止命令:./zkServer.sh stop
    • 查看运行状态:./zkServer.sh status

    kafka:

    kafka官网下载:Apache Download Mirrors

    Index of /dist/kafka/2.2.2

    二、安装与配置

    • 解压缩tar -zxvf kafka_2.11-2.2.0.tgz

    • 重命名mv kafka_2.11-2.2.0 kafka

    1. [root@localhost kafa]# ls -al
    2. 总用量 164020
    3. drwxr-xr-x. 4 root root 95 816 08:26 .
    4. drwxr-xr-x. 8 root root 82 518 16:09 ..
    5. drwxr-xr-x. 8 root root 119 816 08:33 kafka
    6. -rw-r--r--. 1 root root 63999924 8月 15 17:22 kafka_2.11-2.2.0.tgz
    7. -rw-r--r--. 1 root root 103956099 8月 15 16:03 kafka_2.13-3.2.1.tgz
    • 创建logs文件,用于存储kafka日志:
      在kafka安装目录下创建:mkdir kafka-logs
      /opt/kafka/kafka-logs

    • 修改server.properties配置文件

      1. [root@localhost kafka]# cat config/server.properties | grep -v "#" | grep -v "^$"
      2. broker.id=1
      3. listeners=PLAINTEXT://0.0.0.0:9092
      4. advertised.listeners=PLAINTEXT://192.168.1.36:9092
      5. num.network.threads=3
      6. num.io.threads=8
      7. socket.send.buffer.bytes=102400
      8. socket.receive.buffer.bytes=102400
      9. socket.request.max.bytes=104857600
      10. log.dirs=/opt/kafa/kafka/kafka-logs
      11. num.partitions=1
      12. num.recovery.threads.per.data.dir=1
      13. offsets.topic.replication.factor=1
      14. transaction.state.log.replication.factor=1
      15. transaction.state.log.min.isr=1
      16. log.retention.hours=168
      17. log.segment.bytes=1073741824
      18. log.retention.check.interval.ms=300000
      19. zookeeper.connect=localhost:2181
      20. zookeeper.connection.timeout.ms=6000
      21. group.initial.rebalance.delay.ms=0

      解释:

      1. #一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
      2. broker.id=1
      3. #listeners=PLAINTEXT://:9092
      4. #advertised.listeners=PLAINTEXT://your.host.name:9092
      5. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS
      6. # broker 处理消息的最大线程数,一般情况下不需要去修改
      7. num.network.threads=3
      8. # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
      9. num.io.threads=8
      10. # socket的发送缓冲区(SO_SNDBUF)
      11. socket.send.buffer.bytes=102400
      12. # socket的接收缓冲区 (SO_RCVBUF)
      13. socket.receive.buffer.bytes=102400
      14. # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
      15. socket.request.max.bytes=104857600
      16. #kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
      17. log.dirs=/opt/software/kafka/kafka-logs
      18. # 每个topic的分区个数,更多的partition会产生更多的segment file
      19. num.partitions=1
      20. num.recovery.threads.per.data.dir=1
      21. offsets.topic.replication.factor=1
      22. transaction.state.log.replication.factor=1
      23. transaction.state.log.min.isr=1
      24. # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
      25. #log.flush.interval.messages=10000
      26. # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
      27. #log.flush.interval.ms=1000
      28. # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
      29. log.retention.hours=168
      30. #log.retention.bytes=1073741824
      31. # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
      32. log.segment.bytes=1073741824
      33. # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
      34. log.retention.check.interval.ms=300000
      35. # Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:prot
      36. zookeeper.connect=localhost:2181
      37. # 连接zk的超时时间
      38. zookeeper.connection.timeout.ms=6000
      39. # ZooKeeper集群中leader和follower之间的同步实际
      40. group.initial.rebalance.delay.ms=0

    四、运行


    1.启动zookeeper服务


    由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;
    如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:
    使用安装包中的脚本启动单节点Zookeeper 实例:
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    注意:以下命令,小编均选择在kafka安装根目录下执行;

    2.启动kafka服务


    以下方式任选其一

    kafka目录启动
    使用kafka-server-start.sh 启动kafka 服务:

    bin/kafka-server-start.sh config/server.properties


    这种命令执行并不是后台进程运行,故使用以下命令

    bin/kafka-server-start.sh -daemon config/server.properties


    命令需要切换到kafka的bin目录下,

    ./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties

    3.创建topic。只有一个broker,所以这里一个topic,5个分区,1个副本(不能大于broker数量)

    1. [root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic wubo
    2. Created topic wubo.

    4 查看描述 Topic 信息 

    1. [root@localhost kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic wubo
    2. Topic:wubo PartitionCount:5 ReplicationFactor:1 Configs:
    3. Topic: wubo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
    4. Topic: wubo Partition: 1 Leader: 1 Replicas: 1 Isr: 1
    5. Topic: wubo Partition: 2 Leader: 1 Replicas: 1 Isr: 1
    6. Topic: wubo Partition: 3 Leader: 1 Replicas: 1 Isr: 1
    7. Topic: wubo Partition: 4 Leader: 1 Replicas: 1 Isr: 1

    说明:
    第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
    Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
    Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
    Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。
     

    5.查看topic

    1. [root@localhost kafka]# kafka-topics.sh --list --zookeeper localhost:2181
    2. __consumer_offsets
    3. jettech
    4. test
    5. wubo

    6 .产生消息:kafka-console-producer.sh

    1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic wubo
    2. >"jettech hello world"
    3. >^C[root@localhost kafka]#

    使用Ctrl+C退出生成消息;

    7. 消费消息:kafka-console-consumer.sh

    使用kafka-console-consumer.sh 接收消息并在终端打印
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)
     

    1. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wubo --from-beginning
    2. "jettech hello world"

    8.删除topic

    kafka-topics.sh --delete --zookeeper localhost:2181 --topic wubo

  • 相关阅读:
    聚焦关键要素|打造智慧园区综合运营决策平台解决方案
    D Mocha and Railgun
    codeforces (C++ Morning)
    人工智能 迁移学习
    SpringCloud
    ArrayList与顺序表
    无胁科技-TVD每日漏洞情报-2022-11-22
    LeetCode简单题之最好的扑克手牌
    Numpy计算中的@、*、mutiply、dot的区分(含Python代码)
    Unity5 脚本的生命周期、执行顺序
  • 原文地址:https://blog.csdn.net/Michaelwubo/article/details/126358950