• Spring Kafka生产者实现


    需求

    我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

    pom.xml

    <dependency>
    	<groupId>org.springframework.kafkagroupId>
    	<artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。

    application.yml

    spring:
      kafka:
        producer:
    	  # kafka集群地址
          bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
    	  client-id: producer-dev
          # SASL_PLAINTEXT 接入方式
          security:
            protocol: SASL_PLAINTEXT
          # 序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            # SASL 采用 SCRAM-SHA-256 方式
            sasl:
              mechanism: SCRAM-SHA-256
        # jaas配置
        jaas:
          options:
            username: kafkauser
            password: kafkapwd
          enabled: true
          login-module: org.apache.kafka.common.security.scram.ScramLoginModule
          control-flag: required
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    以上,是关于Spring Kafka的全部配置。下面摘要出来的配置,是可以单独配置在配置中心的:

    topic:
      # 接收消息的主题配置
      save: hello_kafka_topic
    spring:
      kafka:
        producer:
          client-id: producer-dev
          # kafka集群地址
          bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
        # jaas配置
        jaas:
          options:
            username: kafkauser
            password: kafkapwd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Java

    KafkaProducerService.java

    
    public interface KafkaProducerService {
    
        /**
         * 转发消息到kafka
         */
        void sendToKafka(String msg);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    KafkaProducerServiceImpl.java

    
    
    import cn.com.xxx.service.KafkaProducerService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.kafka.core.KafkaProducerException;
    import org.springframework.kafka.core.KafkaSendCallback;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import javax.annotation.Resource;
    
    /**
     * 转发消息到kafka
     */
    @RefreshScope
    @Slf4j
    @Service
    public class KafkaProducerServiceImpl implements KafkaProducerService {
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        /**
         * kafka接收消息的主题
         */
        @Value("${topic.save}")
        private String topic;
    
    
        @Override
        public void sendToKafka(String msg) {
            log.info(String.format("$$$$ => Producing message: %s", msg));
    
            ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);
    
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);
            future.addCallback(new KafkaSendCallback<String, String>() {
    
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("成功发消息:{}给Kafka:{}", msg, result);
                }
    
                @Override
                public void onFailure(KafkaProducerException ex) {
                    log.error("发消息:{}给Kafka:{}", msg, recordKafka, ex);
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。

    总结

    这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本,所以,这里的使用的回调类是ListenableFuture类。

    参考:

  • 相关阅读:
    K8s安全一
    Sqlalchemy异步操作不完全指北
    golang 库之「依赖注入」
    第十四届蓝桥杯(web应用开发)模拟赛2期 -大学组
    机器学习之scikit-learn基础教程
    c语言编程实例
    ArcGIS Pro 转换Smart3D生成的倾斜3D模型数据osgb——创建集成网格场景图层包
    施磊老师 C++ 课程笔记--自己记录用
    【编程题 】抄送列表(详细注释 易懂)
    【100个 Unity实用技能】☀️ | UGUI Text中加入超链接文本,可直接点击跳转
  • 原文地址:https://blog.csdn.net/fxtxz2/article/details/134051100