• springboot - 2.7.3版本 - (七)整合Kafka


     一,Java常用的MQ介绍:盘点 常见MQ : 消息队列总览_Java学术趴的博客-CSDN博客_java mq消息队列详解

    二,Kafka依赖Zookeeper,所以先安装Zookeeper

    1)官方下载地址:Apache Downloads

    2)解压到自己的安装目录,找到/conf/zoo_sample.cfg,复制并重命名为zoo.cfg,修改配置:

    1. # the directory where the snapshot is stored.
    2. # do not use /tmp for storage, /tmp here is just
    3. # example sakes.
    4. # 自定义,修改为本地磁盘路径,建议与zookeeper存放路径一致
    5. dataDir=F:/qihh/install/apache-zookeeper-3.7.1-bin/data
    6. # the port at which the clients will connect
    7. clientPort=2181
    8. #最后一行添加以下配置,如果不添加会出现disabled错误
    9. audit.enable=true

    3) 双击启动:/bin/zkServer.cmd

     

    三,安装Kafka

    1)官方下载地址:Apache Kafka

    2)解压到自己的安装目录,找到配置文件修改 /config/server.properties:

    1. # A comma separated list of directories under which to store log files
    2. log.dirs=F:/qihh/install/kafka_2.13-3.2.3/kafka-logs
    3. # root directory for all kafka znodes.
    4. zookeeper.connect=localhost:2181

    3)以配置文件方式启动

    - 先启动zookeeper

    - 打开cmd命令窗口,进入kafaka安装目录,执行以下命令

    .\bin\windows\kafka-server-start.bat .\config\server.properties

    四,Kafaka可视化工具kafkatool下载

    1)下载地址: Offset Explorer

    Kafka - 可视化工具(Kafka Tool)快速入门_51CTO博客_kafka可视化工具

    五,在项目中的使用

    1)pom.xml添加依赖包

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    2)application.yml 添加配置文件,放在spring目录下面:

    1. spring:
    2. # =========================================================================
    3. kafka:
    4. bootstrap-servers: localhost:9092
    5. #初始化生产者配置
    6. producer:
    7. #重试次数
    8. retries: 0
    9. #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
    10. acks: 1
    11. #批量大小
    12. batch-size: 65536
    13. #提交延时:当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
    14. properties.linger.ms: 0
    15. #生产端缓冲区大小
    16. buffer-memory: 524288
    17. #Kafka提供的序列化和反序列化类
    18. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    19. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    20. #初始化消费者配置
    21. consumer:
    22. #是否自动提交偏移量
    23. enable-auto-commit: true
    24. #提交offset延时(接收到消息后多久提交offset)
    25. auto.commit.interval.ms: 1000
    26. #默认的消费者组,代码中可以热键修改
    27. group-id: test
    28. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    29. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    30. # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    31. auto-offset-reset: latest
    32. properties:
    33. # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    34. session.timeout.ms: 120000
    35. # 消费请求超时时间
    36. request.timeout.ms: 180000
    37. #序列化和反序列化方式
    38. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    39. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    40. # ===========================================================================

    3)添加测试代码,结构如下: 

    生产消息:

    1. package com.qi.study.springboot.controller;
    2. import javax.annotation.Resource;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import org.springframework.kafka.core.KafkaTemplate;
    6. import org.springframework.kafka.support.SendResult;
    7. import org.springframework.util.concurrent.ListenableFuture;
    8. import org.springframework.util.concurrent.ListenableFutureCallback;
    9. import org.springframework.web.bind.annotation.RequestMapping;
    10. import org.springframework.web.bind.annotation.RequestMethod;
    11. import org.springframework.web.bind.annotation.RequestParam;
    12. import org.springframework.web.bind.annotation.RestController;
    13. import com.qi.study.springboot.result.JsonResult;
    14. import com.qi.study.springboot.util.JsonResultBuilder;
    15. @RestController
    16. @RequestMapping("/demo")
    17. public class KafkaProviderController {
    18. private final static Logger LOGGER = LoggerFactory.getLogger(KafkaProviderController.class);
    19. @Resource
    20. private KafkaTemplate kafkaTemplate;
    21. @RequestMapping(value="/kafka/sendMsg",method=RequestMethod.POST)
    22. public JsonResult sendMsg(@RequestParam String message) {
    23. LOGGER.info("KafkaController-发送kafka消息:{}",message);
    24. try{
    25. //生产消息
    26. ListenableFuture> listenableFuture = kafkaTemplate.send("test1","HelloWorld", message);
    27. listenableFuture.addCallback(new ListenableFutureCallback>() {
    28. @Override
    29. public void onSuccess(SendResult result) {
    30. LOGGER.info("KafkaController-发送kafka消息成功:{}", result);
    31. }
    32. @Override
    33. public void onFailure(Throwable ex) {
    34. LOGGER.error("KafkaController-发送kafka消息失败:{}", ex.getMessage());
    35. }
    36. });
    37. return JsonResultBuilder.ok();
    38. }catch (Exception e){
    39. LOGGER.error("KafkaController-发送kafka消息异常:{}", e.getMessage());
    40. return JsonResultBuilder.error();
    41. }
    42. }
    43. }

    消费消息:

    1. package com.qi.study.springboot.component;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import org.springframework.kafka.annotation.KafkaListener;
    6. import org.springframework.stereotype.Component;
    7. @Component
    8. public class KafkaConsumer {
    9. private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
    10. @KafkaListener(topics = "test1")
    11. public void onMassage(ConsumerRecord record) {
    12. String topic = record.topic();
    13. String msg = record.value();
    14. LOGGER.info("消费者接受消息:topic-->{},msg-->{}",topic,msg);
    15. }
    16. }

    4)启动测试

    - 启动zookeeper

    - 启动kafka

    - 启动springboot

     

     

    六,源代码下载(含以上所需安装包): https://download.csdn.net/download/MyNoteBlog/86729096

  • 相关阅读:
    解决POI的SXSSFSheet 创建excel下拉框,下拉框内容过多时不显示的问题
    案例突破——再探策略模式
    【OBS】stream-labs-desktop 编译运行采坑全攻略
    学习OpenCV——cv::inpaint函数(三)
    springBoot配置拦截器
    NLP基础——Bag of Words 词袋法
    ARP协议介绍与ARP协议的攻击手法
    LCR 181 字符串中的单词反转
    算法-二叉树-简单-二叉树的遍历
    如何使用Linux DataEase数据可视化分析工具结合内网穿透实现远程办公
  • 原文地址:https://blog.csdn.net/MyNoteBlog/article/details/127117902