• Kafka 安装教程和基本操作


    一、简介

    Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。

    应用特性

    • 分布式存储:数据被自动分区并分布在集群的节点中。
    • 消息有序性Kafka 能确保从生产者传到消费者的记录都是有序的。
    • 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
    • 高吞吐量Kafka 支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。
    • 易扩展性:支持集群热扩展。
    • 高并发:支持数千个客户端同时读写。
    • 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
    • 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。

    应用场景

    Kafka 本质是 支持分布式的消息系统/消息中间件 。分析 Kafka 的应用场景等同于分析 消息中级件 的应用场景。通常,使用 消息系统 的 发布/订阅模型 功能来连接 生产者消费者。实现以下三大功能:

    • 生产者和消费者的解耦
    • 消息持久化 / 消息冗余
    • 消息缓冲 / 流量消峰

    具体应用场景有:

    • 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
    • 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
    • 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
    • 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
    • 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
    • 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。

    设计目标

    • 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
    • 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
    • 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
    • 横向扩展:支持在线水平热扩展

    二、kafka安装和配置

    1. zookeeper安装配置

    需要说明一下, 为了支持 Kafka 的集群功能, Zookeeper 必须使用集群模式部署。
    本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南

    2. kafka安装配置

    下载链接:Kafka Downloads

    下载页面中包含两种下载方式

    • : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译

    a) 安装

    [root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
    [root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
    [root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka
    

    b) 配置实例

    配置第一个 Kafka 实例

    # broker 编号,集群内必须唯一
    broker.id=1
    # 监听所有ip的9091端口,PLAINTEXT表示明文传输
    listeners=PLAINTEXT://:9091
    # 相当于listeners=PLAINTEXT://0.0.0.0:9091
    # 消息日志存放地址
    log.dirs=/usr/local/kafka/logs
    # ZooKeeper 地址,多个用,分隔   /kafka指定在zk上的目录
    zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
    

    配置第二个 Kafka 实例

    # broker 编号,集群内必须唯一
    broker.id=1
    # 监听所有ip的9092端口,PLAINTEXT表示明文传输
    listeners=PLAINTEXT://:9092
    # 消息日志存放地址
    log.dirs=/opt/kafka/logs
    # ZooKeeper 地址,多个用,分隔
    zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
    

    注:两个客户端的listeners中的port不能一样

    4) 服务管理

    # 启动服务 -daemon 表示后台启动
    $KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties
    
    
    # 查看服务
    jps -l
    	43330 org.apache.zookeeper.server.quorum.QuorumPeerMain
    	14356 org.elasticsearch.bootstrap.Elasticsearch
    	14583 org.logstash.Logstash
    	45976 kafka.Kafka  # kafka服务进程
    	
    netstat -anlpt | grep 9091
    	tcp6       0      0 :::9091                 :::*                    LISTEN      45976/java
    	tcp6       0      0 192.168.18.128:9091     192.168.18.128:49356    TIME_WAIT   -
    
    # 关闭服务
    $KAFKA_HOME/bin/kafka-server-stop.sh
    

    3. 常用操作

    1) 创建topic

     #两条命令效果一样
    bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
    bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu 
    

    在kafka1上创建一个topic,会自动同步到其他客户端

    • --create表示创建操作
    • --zookeeper 指定了 Kafka 连接的 ZooKeeper
    • --partitions 表示每个主题4个分区
    • --replication-factor 表示创建每个分区创建2个副本(副本因子)
    • --topic 表示主题名称。
      注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.

    2) 查看topic

    # 查看topic列表     #两条命令效果一样
    bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka 
    	__consumer_offsets
    	topic-demo
    	yumu
    
    # 查看topic详细信息   #两条命令效果一样
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu 
    	Topic: yumu	PartitionCount: 2	ReplicationFactor: 2	Configs:
    		Topic: yumu	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
    		Topic: yumu	Partition: 1	Leader: 1	Replicas: 2,1	Isr: 1,2
    

    3) 测试通信

    # 窗口1,启动生产者,向yumu主题发送消息
    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
    
    # 窗口2,启动消费者,订阅yumu主题
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
    
    # 窗口3,启动消费者,订阅yumu主题
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
    
    =====结果=====
    # 生产者
    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
    >hello, kafka!
    >once again.
    >
    # 消费者1
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
    hello, kafka!
    once again.
    
    # 消费者2
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
    hello, kafka!
    once again.
    
    # 查看所有消息
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning
    
    # 删除topic
    bin/kafka-topics.sh --delete --bootstrap-server localhost:9091  --topic yumu
    

    三、遇到的问题

    1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:

    [2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
    [2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
            at kafka.server.KafkaServer.startup(KafkaServer.scala:235)
            at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
            at kafka.Kafka$.main(Kafka.scala:82)
            at kafka.Kafka.main(Kafka.scala)
    [2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
    [2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
    [2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
    [2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
    [2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
    [2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
    [2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)
    

    原因:
    kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件

    cat meta.properties
    #
    #Sat Nov 07 21:43:51 CST 2020
    broker.id=1
    version=0
    cluster.id=MChFWWMBT9GJClVEriND5A
    

    第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)

    推荐阅读:

    下一篇:Kafka消息系统原理

    在这里插入图片描述

  • 相关阅读:
    阿里云云安全中心有必要买吗? 这些功能值得吗?
    零售业变革下,数智化供应链系统精细化库存管理,构建企业数字化供应链体系
    基于eNSP的IPv6校园网络规划与设计_综合实验
    C# 播放音频文件(播放提示音)
    Jmeter之测试元件-JSR223 PreProcessor
    Android使用高德地图实现运动轨迹绘制和轨迹回放
    java.lang.ClassNotFoundException: freemarker.cache.TemplateLoader
    Linux搜索查找命令【详细整理】
    简谈设计模式之建造者模式
    JCMsuite应用:光子晶体谐振腔光子晶体谐振腔
  • 原文地址:https://blog.csdn.net/wengjianhong2099/article/details/139136752