• kafka从零到1的全过程


    最近公司做了技术选型,基于效率的原因最后选定的mq是kafka,之前用过rocketmq,但是没用过kafka,学习一下,记录一下

    Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用Scala 语言编写,目前是Apache 的开源项目。

    1、kafka的架构

    kafka的架构和常规的mq 是一样的,producer,consumer,kafka broker ,简单说明下

    • Producer:消息⽣产者,向 Kafka Broker 发消息的客户端。
    • Consumer:消息消费者,从 Kafka Broker 取消息的客户端。Kafka支持持久化,生产者退出后,未消费的消息仍可被消费。
    • Broker:⼀台 Kafka 机器就是⼀个 Broker。⼀个集群(kafka cluster)由多个 Broker 组成。⼀个 Broker 可以容纳多个 Topic。
    • Message:消息是Kafka中最基本的数据单元,主要由key和value构成;真正有效的消息是value数据,key只作为消息路由分区使用,kafka根据key决定将当前消息存储在哪个分区。

    在Kafka中,客户端和服务器之间的通信是通过简单,高性能,语言无关的TCP协议完成的

    2、kafka重要组件

    topic:消息类别,Kafka 按照topic 来分类消息(即使如此,kafka仍然有点对点和广播发布类型)

    partition:topic 的分区,一个 topic 可以包含多个 partition,topic 消息保存在各个partition 上zookeeper

    Consumer Group:消费者分组,每个Consumer 必须属于一个 group

    zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性

    3、kafka消息流程

    ZooKeeper用于分布式系统的协调,Kafka使用ZooKeeper也是基于相同的原因。ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

    每个分区都有 一个leader以及0个或者多个follower,在创建topic的时,kafka会将不同分区的leader均匀的分配在每个broker上。

    比如主题test有partition1,partition2,partition3共3个分区,其中partition1有3个副本,一个leader,两个follower。

    正常使用kafka是感觉不到leader,follower的存在的,所有的读写操作都是由leader处理,

    而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。所以,可以这样说:

    kafka中的leader负责处理读写操作,而follower只负责副本数据的同步如果leader出现故障,其他follower会被重新选举为leader,follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中。

    4、demo

    说了这么多自己搞一下吧,来点代码

    pom.xml 增加依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    在application.yml中增加配置

    1. spring:
    2. application:
    3. name: transferApp
    4. kafka:
    5. bootstrap-servers: 172.26.1.202:9092
    6. producer: # producer 生产者
    7. retries: 0 # 重试次数
    8. acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
    9. batch-size: 16384 # 批量大小
    10. buffer-memory: 33554432 # 生产端缓冲区大小
    11. key-serializer: org.apache.kafka.common.serialization.StringSerializer

    KafkaProducerService 工具类,可以在其他的地方使用。

    1. @Slf4j
    2. @Service
    3. public class KafkaProducerService {
    4. @Autowired
    5. private KafkaTemplate kafkaTemplate;
    6. public void sendMessageSync(String topic, String key, String message) throws Exception {
    7. //可以指定最长等待时间,也可以不指定
    8. kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    9. log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
    10. //指定key,kafka根据key进行hash,决定存入哪个partition
    11. // kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
    12. //存入指定partition
    13. // kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
    14. }
    15. }

    5、几个学的过程中的困惑点

    1.zooKeeper 的使用

    zookeeper 在开始的时候没懂有什么用,因为自己没有想到集群的问题,有集群一定要有manager,所以zooKeeper 就是扮演这个角色,进行调度管理,

    可以类比rocketmq 中的nameserver,但是据说在新版本中取消了zookeeper,自己重写了一个类似的

    2、Consumer个数与分区数的关系

    topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。

    即分区数决定了同组消费者个数的上限。如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。

    3、Consumer消费Partition的分配策略

    Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。当以下事件发生时,Kafka 将会进行一次分区分配:

    一个 Consumer Group 内新增消费者

    消费者离开当前所属的Consumer Group,包括shuts down 或 crashes

    4、kafka的一些配置参数

    linger.ms

    因为producer发布消息会批量合并,批量会有个队列,如果队列满了就会立即发送出去,如果不满则等待,如果等待时间超过了linger.ms,则立即发送

    bootstrap.servers

    broker集群地址,可以设置一到多个,建议至少设置为2个,若在应用程序启动的时候,一个broker节点宕机,还可以对另一个已提供节点进行连接。

    retries

    生产者重试次数,默认值为0。发送消息可能失败导致broker没有收到,比如网络抖动、leader副本选举等,生产者可以配置retries的值,

    如果重试达到设定次数,生产者才会放弃重试并抛出异常。

    6、Kafka 缺点?

    由于是批量发送,数据并非真正的实时;

    •不支持mqtt协议;

    •不支持物联网传感数据直接接入;

    •仅支持统一分区内消息有序,无法实现全局消息有序;

    •监控不完善,需要安装插件;

    •依赖zookeeper进行元数据管理;

  • 相关阅读:
    Shell 脚本编程
    《持续交付:发布可靠软件的系统方法》- 读书笔记(十五)
    bootstrap 主题
    HBase Shell启动缓慢及操作耗时长的原因分析与解决
    linux环境部署jmeter并执行测试
    护士人文修养复习资料
    校园快餐店网上订餐管理系统(JSP+MySQL+MyEclipse)
    前端状态管理工具pinia:pinia是什么?相较于Vuex,pinia有什么优势,如何手动添加pinia到Vue3项目中
    Vue(1)
    FPGA 多路视频处理:图像缩放+视频拼接显示,HDMI采集,提供2套工程源码和技术支持
  • 原文地址:https://blog.csdn.net/perfect2011/article/details/126544982