• Spring for Apache Kafka概述和简单入门


    一、概述

    Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。

    二、准备工作

    注意:进行工作开始之前至少要有一个 Apache Kafka 环境

    2.1、依赖

    • 使用 Spring Boot
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    使用 Spring Boot 时,省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本

    • 使用 Spring
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.7.14</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用Spring 时必须要申明使用的版本。

    2.2、版本兼容性

    • Apache Kafka 客户端 2.7.0 或 2.8.0

    • Spring 框架 5.3.x 或 Spring Boot 2.7.x

    • 最低 Java 版本:8

    在 Spring Boot 应用程序中使用 Apache Kafka 时, Apache Kafka 依赖项版本由 Spring Boot 的依赖项管理确定。
    想要使用不同于Spring Boot版本的 Apache Kafka时需要覆盖所有关联的依赖项。
    尤其在使用嵌入式 Kafka 代理时特别要注意。

    2.3、依赖覆盖

    在 Spring Boot 应用程序中使用 Apache Kafka 时, Apache Kafka 依赖项版本由 Spring Boot 的依赖项管理确定。
    如果要使用kafka-clients或的不同版本kafka-streams(例如 2.x), 则需要覆盖所有关联的依赖项。
    尤其是在 spring-kafka-test 中使用嵌入式 Kafka 代理时。

    并非所有的 Spring Boot都会向下兼容

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.7.14</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <version>2.7.14</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>{kafka-version}</version>
    </dependency>
    
    <!-- 可选  仅在使用 kafka 流时需要 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>{kafka-version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>{kafka-version}</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.13</artifactId>
        <version>{kafka-version}</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.13</artifactId>
        <version>{kafka-version}</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    三、Spring Boot消费者

    3.1、应用程序

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1")
                    .partitions(10)
                    .replicas(1)
                    .build();
        }
    
        @KafkaListener(id = "myId", topics = "topic1")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2、配置项

    spring.kafka.consumer.auto-offset-reset=earliest //从提交的offset开始消费;无提交的offset时,从头开始消费
    
    • 1

    四、Srping Boot生产者

    4.1、应用程序

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1")
                    .partitions(10)
                    .replicas(1)
                    .build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("topic1", "test");
            };
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    五、不使用 Spring Boot

    在不使用 Spring Boot 时必须定义几个基础的Bean。

    @Configuration
    @EnableKafka
    public class Config {
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                            kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerProps());
        }
    
        private Map<String, Object> consumerProps() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // ...
            return props;
        }
    
        @Bean
        public Sender sender(KafkaTemplate<Integer, String> template) {
            return new Sender(template);
        }
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(senderProps());
        }
    
        private Map<String, Object> senderProps() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            //...
            return props;
        }
    
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
            return new KafkaTemplate<Integer, String>(producerFactory);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    在 Spring 上下文之外创建侦听器容器,则必须满足容器实现的所有接口,否则有些功能会出现异常工作。

    不包括 Spring Boot的完整示例如下:

    public class Sender {
    
    	public static void main(String[] args) {
    		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
    		context.getBean(Sender.class).send("test", 42);
    	}
    
    	private final KafkaTemplate<Integer, String> template;
    
    	public Sender(KafkaTemplate<Integer, String> template) {
    		this.template = template;
    	}
    
    	public void send(String toSend, int key) {
    		this.template.send("topic1", key, toSend);
    	}
    
    }
    
    public class Listener {
    
        @KafkaListener(id = "listen1", topics = "topic1")
        public void listen1(String in) {
            System.out.println(in);
        }
    
    }
    
    @Configuration
    @EnableKafka
    public class Config {
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                            kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerProps());
        }
    
        private Map<String, Object> consumerProps() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // ...
            return props;
        }
    
        @Bean
        public Sender sender(KafkaTemplate<Integer, String> template) {
            return new Sender(template);
        }
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(senderProps());
        }
    
        private Map<String, Object> senderProps() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            //...
            return props;
        }
    
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
            return new KafkaTemplate<Integer, String>(producerFactory);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
  • 相关阅读:
    【Word】页眉编辑小技巧
    商城小程序代客下单程序开发演示
    Redis - 分布式锁和事务
    LuatOS-SOC接口文档(air780E)-- io - io操作(扩展)
    Java19虚拟线程都来了,我正在写的线程代码会被淘汰掉吗?
    jvm 各个版本支持的参数
    python之 flask 框架(2)项目拆分的 执行逻辑
    空闲空间管理和文件系统结构的优化策略
    量化交易 - 策略回测
    【AAC文件数据解析和ADTS frame】
  • 原文地址:https://blog.csdn.net/qq_35241329/article/details/132760160