• kafka学习-基本概念与简单实战


    目录

    1、核心概念

    消息和批次

    Topic和Partition

    Replicas

    Offset

    broker和集群

    生产者和消费者

    2、开发实战

    2.1、消息发送

    介绍

    代码实现

    2.2、消息消费

    介绍

    代码实现

    2.3、SpringBoot Kafka

    pom

    application.yaml

    KafkaConfig

    producer

    consumer


    1、核心概念

    消息和批次

            kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。

            批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。

    Topic和Partition

            topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。

            partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)

            主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。

            主题的分区策略有如下几种:

    1. 直接指定分区;
    2. 根据消息的key散列取模得出分区;
    3. 轮询指定分区。

    Replicas

    1. 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
    2. 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
    3. 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
    4. 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。

    Offset

            偏移量。

            生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。

            消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。

    broker和集群

    broker为一个独立的kafka服务器;一个kafka集群里有多个broker。

            broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。

            集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。

            每个集群都有一个broker是集群控制器,作用如下:

    1. 将分区分配给首领分区的broker;
    2. 监控broker,首领分区切换

    生产者和消费者

            生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:

    1. 直接指定分区;
    2. 根据消息的key散列取模得出分区;
    3. 轮询指定分区。

            消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。

    2、开发实战

    2.1、消息发送

    介绍

    • 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
    • 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。        

    代码实现

    1. public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    2. Map configs = new HashMap<>();
    3. // 设置连接Kafka的初始连接⽤到的服务器地址
    4. // 如果是集群,则可以通过此初始连接发现集群中的其他broker
    5. configs.put("bootstrap.servers", "node1:9092");
    6. // 设置key和value的序列化器
    7. configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    8. configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    9. configs.put("acks", "1");
    10. KafkaProducer producer = new KafkaProducer(configs);
    11. // 用于封装Producer的消息
    12. ProducerRecord record = new ProducerRecord(
    13. "topic_1", // 主题名称
    14. 0, // 分区编号,现在只有⼀个分区,所以是0
    15. 0, // 数字作为key
    16. "message 0" // 字符串作为value
    17. );
    18. // 发送消息,同步等待消息的确认
    19. // producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
    20. // 使用回调异步等待消息的确认
    21. producer.send(record, new Callback() {
    22. @Override
    23. public void onCompletion(RecordMetadata metadata, Exception exception) {
    24. if (exception == null) {
    25. System.out.println(
    26. "主题:" + metadata.topic() + "\n"
    27. + "分区:" + metadata.partition() + "\n"
    28. + "偏移量:" + metadata.offset() + "\n"
    29. + "序列化的key字节:" + metadata.serializedKeySize() + "\n"
    30. + "序列化的value字节:" + metadata.serializedValueSize() + "\n"
    31. + "时间戳:" + metadata.timestamp()
    32. );
    33. } else {
    34. System.out.println("有异常:" + exception.getMessage());
    35. }
    36. }
    37. });
    38. // 关闭连接
    39. producer.close();
    40. }

    2.2、消息消费

    介绍

            消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

    代码实现

    1. public static void main(String[] args) {
    2. Map configs = new HashMap<>();
    3. // 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
    4. // 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
    5. configs.put("bootstrap.servers", "node1:9092");
    6. // key和value的反序列化器
    7. configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
    8. configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    9. configs.put("group.id", "consumer.demo");
    10. // 创建消费者对象
    11. KafkaConsumer consumer = new KafkaConsumer(configs);
    12. final Pattern pattern = Pattern.compile("topic_[0-9]");
    13. // 消费者订阅主题或分区
    14. // consumer.subscribe(pattern);
    15. // consumer.subscribe(pattern, new ConsumerRebalanceListener() {
    16. final List topics = Arrays.asList("topic_1");
    17. consumer.subscribe(topics, new ConsumerRebalanceListener() {
    18. @Override
    19. public void onPartitionsRevoked(Collection partitions) {
    20. partitions.forEach(tp -> {
    21. System.out.println("剥夺的分区:" + tp.partition());
    22. });
    23. }
    24. @Override
    25. public void onPartitionsAssigned(Collection partitions) {
    26. partitions.forEach(tp -> {
    27. System.out.println(tp.partition());
    28. });
    29. }
    30. });
    31. // 拉取订阅主题的消息
    32. final ConsumerRecords records = consumer.poll(3_000);
    33. // 获取topic_1主题的消息
    34. final Iterable> topic1Iterable = records.records("topic_1");
    35. // 遍历topic_1主题的消息
    36. topic1Iterable.forEach(record -> {
    37. System.out.println("========================================");
    38. System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
    39. System.out.println("消息的key:" + record.key());
    40. System.out.println("消息的值:" + record.value());
    41. System.out.println("消息的主题:" + record.topic());
    42. System.out.println("消息的分区号:" + record.partition());
    43. System.out.println("消息的偏移量:" + record.offset());
    44. });
    45. // 关闭消费者
    46. consumer.close();
    47. }

    2.3、SpringBoot Kafka

    pom

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.kafkagroupId>
    8. <artifactId>spring-kafkaartifactId>
    9. dependency>
    10. dependencies>

    application.yaml

    1. spring:
    2. kafka:
    3. bootstrap-servers: node1:9092 # 用于建立初始连接的broker地址
    4. producer:
    5. key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
    6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    7. batch-size: 16384 # 默认的批处理记录数
    8. buffer-memory: 33554432 # 32MB的总发送缓存
    9. consumer:
    10. key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    12. group-id: spring-kafka-02-consumer # consumer的消费组id
    13. enable-auto-commit: true # 是否自动提交消费者偏移量
    14. auto-commit-interval: 100 # 每隔100ms向broker提交一次偏移量
    15. auto-offset-reset: earliest # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量

    KafkaConfig

    1. @Configuration
    2. public class KafkaConfig {
    3. @Bean
    4. public NewTopic topic1() {
    5. return new NewTopic("ntp-01", 5, (short) 1);
    6. }
    7. @Bean
    8. public NewTopic topic2() {
    9. return new NewTopic("ntp-02", 3, (short) 1);
    10. }
    11. }

    producer

    1. @RestController
    2. public class KafkaSyncProducerController {
    3. @Autowired
    4. private KafkaTemplate template;
    5. @RequestMapping("send/sync/{message}")
    6. public String sendSync(@PathVariable String message) {
    7. ListenableFuture future = template.send(new ProducerRecord("topic-spring-02", 0, 1, message));
    8. try {
    9. // 同步等待broker的响应
    10. Object o = future.get();
    11. SendResult result = (SendResult) o;
    12. System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. } catch (ExecutionException e) {
    16. e.printStackTrace();
    17. }
    18. return "success";
    19. }
    20. }
    21. @RestController
    22. public class KafkaAsyncProducerController {
    23. @Autowired
    24. private KafkaTemplate template;
    25. @RequestMapping("send/async/{message}")
    26. public String asyncSend(@PathVariable String message) {
    27. ProducerRecord record = new ProducerRecord("topic-spring-02", 0, 3, message);
    28. ListenableFuture> future = template.send(record);
    29. // 添加回调,异步等待响应
    30. future.addCallback(new ListenableFutureCallback>(){
    31. @Override
    32. public void onFailure(Throwable throwable) {
    33. System.out.println("发送失败: " + throwable.getMessage());
    34. }
    35. @Override
    36. public void onSuccess(SendResult result) {
    37. System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
    38. }
    39. });
    40. return "success";
    41. }
    42. }

    consumer

    1. @Component
    2. public class MyConsumer {
    3. @KafkaListener(topics = "topic-spring-02")
    4. public void onMessage(ConsumerRecord record) {
    5. Optional> optional = Optional.ofNullable(record);
    6. if (optional.isPresent()) {
    7. System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
    8. }
    9. }
    10. }

    以上内容为个人学习理解,如有问题,欢迎在评论区指出。

    部分内容截取自网络,如有侵权,联系作者删除。

  • 相关阅读:
    【C#学习】backgroundWorker控件
    Python 全栈系列183 元数据的规划与命名
    JAVA整理学习实例(四)数据结构
    不要给我发土味情话啦!🐷
    Python中ndarray对象和list(列表)的相互转换
    如何批量将长视频分割成短视频?详细操作步骤一目了然
    第7章 C语言的函数指针数组 (四)
    Mac配置JDK安装Jmeter
    开源社区赋能,Walrus 用户体验再升级
    【Javascript】数组的基本操作
  • 原文地址:https://blog.csdn.net/weixin_37672801/article/details/132702555