• SpringBoot集成Kafka——如此简单


    今天下午本打算整理一下服务器资源,给每个服务器打个标签,标明部署了哪些服务,方便查找。

    发现之前搭建kafka的服务器到期没续费,被回收了。缺一套kafka的环境,闲来无事,搭一个吧,顺便踩踩坑。

    话不多说,开干。

     

    安装kafka

    官网:kafka.apache.org/downloads

    目前最新是3.2版本,也是稳定版,scala的版本可选2.12和2.13。推荐2.13,不太重要。

    下载,搞到服务器上。如下:

     

    解压,改名,同时建一个文件夹存放数据。简单修改一下配置,不搞那么复杂。

    vi config/server.properties
    

    主要修改数据存放目录,listener地址。

    1. ## 内网地址
    2. listeners=PLAINTEXT://13:9092
    3. ## 公网地址
    4. advertised.listeners=PLAINTEXT://3.13:9092
    5. ## 上面算是个坑,不过在本地安装就不需要管了,默认就行。
    6. log.dirs=/data/opt/kafka/logs

    接下来启动就好了,这台服务器已经装好了zk,就不用kafka自带的了,如果没有,需要先启动zk。

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    再启动kafka

    1. bin/kafka-server-start.sh config/server.properties
    2. nohup bin/kafka-server-start.sh config/server.properties & ## 后台

    jps 看一下是否启动成功

    创建topic

    建个topic玩一下,

    1. bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 127.0.0.1:9092
    2. ## 注意最后的IP地址,要和listener配置的一样。

    打开生产者,我们可以在这发消息,然后再消费者查看是否能收到。

    bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 127.0.0.1:9092
    

     

    打开消费者,新开一个终端。  

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.31.96:9092

    ok,收到消息了,么得问题。

    SpringBoot集成

    随便找个以前的工程,加入pom。偷个懒。

    1. <dependency>
    2.            <groupId>org.springframework.kafka</groupId>
    3.            <artifactId>spring-kafka</artifactId>
    4.            <version>2.5.0.RELEASE</version>
    5.        </dependency>

    配置kafka server地址:

    1. spring:
    2.   kafka:
    3.   bootstrap-servers: 101.43.138.173:9092
    4.   producer:
    5.     key-serializer: org.apache.kafka.common.serialization.StringSerializer
    6.     value-serializer: org.apache.kafka.common.serialization.StringSerializer
    7.   listener:
    8.     ack-mode: manual_immediate
    9.     missing-topics-fatal: false

    搞个controller测试一下

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.kafka.core.KafkaTemplate;
    3. import org.springframework.kafka.support.SendResult;
    4. import org.springframework.util.concurrent.ListenableFuture;
    5. import org.springframework.web.bind.annotation.GetMapping;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RequestParam;
    8. import org.springframework.web.bind.annotation.RestController;
    9. import java.util.concurrent.ExecutionException;
    10. @RestController
    11. @RequestMapping("/kafka")
    12. @Slf4j
    13. public class KafkaController {
    14.    private final KafkaTemplate<String,String> kafkaTemplate;
    15.    public KafkaController(KafkaTemplate<String,String> kafkaTemplate) {
    16.        this.kafkaTemplate = kafkaTemplate;
    17.   }
    18.    @GetMapping("/send")
    19.    public void send(@RequestParam(defaultValue = "hi")String msg) throws ExecutionException, InterruptedException {
    20.        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("quickstart-events", msg);
    21.        String res = listenableFuture.get().toString();
    22.        log.info("producer send result:[{}]",res);
    23.   }
    24. }

    测试一下

    1. producer send result:[SendResult [producerRecord=ProducerRecord(topic=quickstart-events, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = b3, value = [97, 57, 51, 56, 50, 99, 54, 102, 102, 101, 48, 54, 99, 100, 55, 57, 45, 57, 52, 52, 57, 100, 51, 100, 48, 99, 49, 49, 49, 56, 51, 54, 49, 45, 49])], isReadOnly = true), key=null, value=hi, timestamp=null), recordMetadata=quickstart-events-0@6]]

    看看消费者终端也收到了,么得问题,搞定。

    最后

    工作时间:1小时

    产出:一篇文章

    获赞:?

    做核酸去了,下期见。

     

     

  • 相关阅读:
    ElasticSearch 增删改查操作
    JavaScript基础(14)_in、hasOwnProperty、instanceof的用法、垃圾回收
    Tomcat下载与安装
    【小航的算法日记】数组
    3D人体姿态估计(教程+代码)
    XHbuilder 需要的 ipa 签名,超详细的教程,你不看吃亏的是自己!
    初识 Jenkins 持续集成
    模糊神经网络:基于模糊神经网络(Fuzzy Neural Networks,FNN)的数据分类(提供MATLAB代码)
    杰理之MIC 免电容方案需要设置【篇】
    理解循环神经网络
  • 原文地址:https://blog.csdn.net/m0_67698950/article/details/125428357