• Kafka


    消息中间件对比:
    1、吞吐、可靠性、性能在这里插入图片描述

    Kafka安装

    Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

    • Docker安装zookeeper

    下载镜像:

    docker pull zookeeper:3.4.14
    
    • 1

    创建容器

    docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    
    • 1
    • Docker安装kafka

    下载镜像:

    docker pull wurstmeister/kafka:2.12-2.3.1
    
    • 1

    创建容器

    docker run -d --name kafka \
    --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
    --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
    --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
    --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    --net=host wurstmeister/kafka:2.12-2.3.1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    kafka入门

    • 生产者发送消息,多个消费者只能有一个消费者接收到消息
    • 生产者发送消息,多个消费者都可以接收到消息

    (1)创建kafka-demo项目,导入依赖

    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-clientsartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    (2)生产者发送消息

    package com.heima.kafka.sample;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * 生产者
     */
    public class ProducerQuickStart {
    
        public static void main(String[] args) {
            //1.kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
            //发送失败,失败的重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,5);
            //消息key的序列化器
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            //消息value的序列化器
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    
            //2.生产者对象
            KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    
            //封装发送的消息
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
    
            //3.发送消息
            producer.send(record);
    
            //4.关闭消息通道,必须关闭,否则消息发送不成功
            producer.close();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    (3)消费者接收消息

    package com.heima.kafka.sample;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * 消费者
     */
    public class ConsumerQuickStart {
    
        public static void main(String[] args) {
            //1.添加kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
            //消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
            //消息的反序列化器
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            //2.消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    
            //3.订阅主题
            consumer.subscribe(Collections.singletonList("itheima-topic"));
    
            //当前线程一直处于监听状态
            while (true) {
                //4.获取消息
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key());
                    System.out.println(consumerRecord.value());
                }
            }
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    kafka高可用设计

    1、设计集群模式:

    Kafka的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了,另外一个机器就会替补上.在这里插入图片描述

    2、备份机制:

    Kafka定义了两类副本

    1. 领导者副本(Leader Replica)
    2. 追随者副本 (Follower Replica)
      追随者副本分为两类:
      1、一种是ISR副本,同步保存
      2、普通的副本,异步保存
      出现主节点宕机,会先选ISR副本中的一个成为新的主节点,保证数据一致性,没有ISR节点,再从普通节点中挑选
      针对全部节点宕机的情况,有两种策略:
      1、等待第一个ISR副本,保证了数据的尽可能一致
      2、等待一个复活的追随者,无论是ISR还是普通,提高系统的高可用性。

    kafka生产者详解

    1发送类型

    • 同步发送

      使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

    RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
    System.out.println(recordMetadata.offset());
    
    • 1
    • 2
    • 异步发送

      调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

    //异步消息发送
    producer.send(kvProducerRecord, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if(e != null){
                System.out.println("记录异常信息到日志表中");
            }
            System.out.println(recordMetadata.offset());
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2参数详解

    • ack

    代码的配置方式:

    //ack配置  消息确认机制
    prop.put(ProducerConfig.ACKS_CONFIG,"all");
    
    • 1
    • 2

    参数的选择说明

    确认机制说明
    acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
    acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
    acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    • retries

    生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

    代码中配置方式:

    //重试次数
    prop.put(ProducerConfig.RETRIES_CONFIG,10);
    
    • 1
    • 2
    • 消息压缩

    默认情况下, 消息发送时不会被压缩。

    代码中配置方式:

    //数据压缩
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
    
    • 1
    • 2
    压缩算法说明
    snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
    lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
    gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

    使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

    kafka消费者

    消息的有序性

    方法:一个topic分区能保证自己的数据是按照先后消费的,但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区,在单分区种,消息可以保证严格顺序消费

    提交和偏移量在这里插入图片描述

    自动提交:
    当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去,这样只是记录了规定时间内的最大偏移量,其实与数据提交的偏移量存在偏差,因此可能会出现数据的重复提交或者丢失
    手动提交
    当enableauto.commit被设置为false可以有以下三种提交方式

    • 提交当前偏移量(同步提交)
    • 异步提交
    • 同步和异步组合提交

    同步提交:commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
            try {
                consumer.commitSync();//同步提交当前最新的偏移量
            }catch (CommitFailedException e){
                System.out.println("记录提交失败的异常:"+e);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    异步提交:手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。消息没有重试机制

    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                if(e!=null){
                    System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
                }
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    同步和异步组合提交

    异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖

    举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

    try {
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
                System.out.println(record.key());
            }
            consumer.commitAsync();
        }
    }catch (Exception e){+
        e.printStackTrace();
        System.out.println("记录错误信息:"+e);
    }finally {
        try {
            consumer.commitSync();
        }finally {
            consumer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    springboot整合kafka

    1、在父类中的pop文件中导入依赖包

    ```xml
    
    
        org.springframework.kafka
        spring-kafka
    
    
        org.apache.kafka
        kafka-clients
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置

    spring:
      kafka:
        bootstrap-servers: 192.168.200.130:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    spring:
      kafka:
        bootstrap-servers: 192.168.200.130:9092
        consumer:
          group-id: ${spring.application.name}
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    传递消息为对象

    目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

    方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

    方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

    • 发送消息
    @GetMapping("/hello")
    public String hello(){
        User user = new User();
        user.setUsername("xiaowang");
        user.setAge(18);
    
        kafkaTemplate.send("user-topic", JSON.toJSONString(user));
    
        return "ok";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 接收消息
    package com.heima.kafka.listener;
    
    import com.alibaba.fastjson.JSON;
    import com.heima.kafka.pojo.User;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    @Component
    public class HelloListener {
    
        @KafkaListener(topics = "user-topic")
        public void onMessage(String message){
            if(!StringUtils.isEmpty(message)){
                User user = JSON.parseObject(message, User.class);
                System.out.println(user);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    Windows 注册表简介
    【TIDB】TiDB认证考试PTCA 练习题 题库
    中间件漏洞 | Apache-路径穿越升级版
    查找 - 顺序、二分和哈希查找
    c++介绍与入门基础(详细总结)
    K3wise 常用表及视图
    TensorFlow识别4种天气状态(CNN,模型ACC:93.78%)
    1018 Public Bike Management (30) & 1030 Travel Plan (30)
    grpc和protobuf在一起
    聚乙烯基吡啶阳离子功能化聚苯乙烯微球/二氧化锆/聚苯乙烯阳离子微球研究步骤
  • 原文地址:https://blog.csdn.net/qq_52302919/article/details/134406219