• linux部署kafka并集成springboot


    简单实现,不涉及集群及更多扩展的kafka知识。

    1. kafka linux部署(单节点)

    kafka官网:http://kafka.apache.org/

    应用版本:kafka_2.12-3.0.0

    前置环境:jdk1.8

    1.1 kafka配置

    文件目录:config/server.properties

    修改项:

    log.dirs=/tmp/kafka-logslog.dirs=/data/kafka-logs

    listeners=PLAINTEXT://0.0.0.0:9092

    advertised.listeners=PLAINTEXT://192.168.62.133:9092 #虚拟机ip

    1.2 zookeeper配置

    文件目录:config/zookeeper.properties

    修改项:dataDir=/tmp/zookeeper → dataDir=/data/zookeeper

    linux tmp文件夹默认有清除机制,所以数据文件不要放在tmp目录下

    2. 启动服务

    2.1 启动zookeeper服务

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
    
    • 1

    2.2 启动kafka服务

    ./kafka-server-start.sh -daemon ../config/server.properties &
    
    • 1

    启动顺序要注意,先启动zookeeper,后kafka。

    3.3 jps检查启动是否成功

    [root@template ~]# jps
    7793 Kafka
    24361 QuorumPeerMain
    21049 Jps

    3.4 测试

    创建一个topic

    ./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 8 --topic test
    
    • 1

    查看topic是否创建成功

    ./kaf-topics.sh --bootstrap-server localhost:9092 --list
    
    • 1

    测试发送消息

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    • 1

    Hello World

    启动消费者

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1

    Hello World

    3. SpringBoot集成kafka

    3.1 maven

    <!--Kafka 依赖-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.1.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里spring-kafka没有指定版本,使用 Spring Boot 时(并且您尚未使用 start.spring.io 创建项目),省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本

    3.2 application.properties

    server.port=8077
    spring.application.name=gas-kafka-sync
    
    spring.kafka.bootstrap-servers=192.168.62.133:9092
    spring.kafka.producer.retries=3
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.acks=1
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.group-id=default-group
    spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual_immediate
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.3 消息发送接口

    根据项目场景,一般是对接第三方数据。由于不保证第三方拥有直接对接kafka的能力,这里提供一个公共的消息发送接口。对方只需要传入topic和数据对象(Object)到DataDto即可。

    package com.gsafety.bg.kafka.service;
    
    import cn.hutool.json.JSONUtil;
    import com.gsafety.bg.kafka.model.DataDto;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    /**
     * @author Mr.wanter
     * @time 2022-7-18 0018
     * @description
     */
    
    @Service
    @AllArgsConstructor
    @Slf4j
    public class KafkaProducerImpl implements KafkaProducerService {
    
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        @Override
        public void sendMsg(DataDto dto) {
            //1 构建消息数据
            String msg = JSONUtil.toJsonStr(dto.getData());
            //2.发送消息
            ListenableFuture future = kafkaTemplate.send(dto.getTopic(), msg);
            // 回调函数
            future.addCallback(
                    new ListenableFutureCallback<SendResult<Integer, String>>() {
                        @Override
                        public void onFailure(Throwable ex) {
                            log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, dto.getTopic(), msg);
                        }
    
                        @Override
                        public void onSuccess(SendResult<Integer, String> result) {
                            log.info("kafka sendMessage success topic = {}, data = {}", dto.getTopic(), msg);
                        }
                    });
            // 未指定分区发送
            // kafkaTemplate.send(TOPIC_NAME, msg);
            // 指定分区发送
            // kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    3.4 消费接口

    消费接口根据业务场景,对固定topic下的数据进行解析入库。

    package com.gsafety.bg.gas.service.impl;
    
    import cn.hutool.json.JSONUtil;
    import com.gsafety.bg.gas.dao.QuakeAlarmDao;
    import com.gsafety.bg.gas.model.QuakeAlarmEntity;
    import com.gsafety.bg.gas.model.dto.QuakeAlarmDto;
    import com.gsafety.bg.gas.service.mapping.QuakeAlarmMapping;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import javax.transaction.Transactional;
    
    /**
     * @author Mr.wanter
     * @time 2022-7-15 0015
     * @description
     */
    @Component
    @Slf4j
    @AllArgsConstructor
    @Transactional(rollbackOn = Exception.class)
    public class KafkaConsumer {
    
        private final QuakeAlarmDao dao;
        private final QuakeAlarmMapping mapping;
    	//自动提交时Acknowledgment 参数会报错
        @KafkaListener(id = "0", topics = "quake_alarm", groupId = "default-group")
        public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
            try {
                String value = record.value();
                QuakeAlarmDto quakeAlarmDto = JSONUtil.toBean(value, QuakeAlarmDto.class);
                QuakeAlarmEntity entity = mapping.convertFrom(quakeAlarmDto);
                QuakeAlarmEntity save = dao.save(entity);
                log.info("kafka mq:{}", record.value());
                log.info("entity:{}", save);
                //消费成功后手动提交offset
                ack.acknowledge();
            } catch (Exception e) {
    			ack.acknowledge();
                log.error("异常原因:{}", e.getCause());
    			//如果对方传入的json数据不符合约定的对象结构,那么这条数据视为垃圾数据,直接消费掉。但是如果此时数据库挂了导致数据没有存入数据库,但是kafka已经被消费怎么办?
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    4. 遇到的问题

    4.1 kafka启动不成功

    检查服务器内存是否够用。

    4.2 boot启动后日志一直连接localhost/172.0.0.1

    kafka没有对外暴漏ip地址

    # 允许外部端口连接                                            
    listeners=PLAINTEXT://0.0.0.0:9092  
    # 外部代理地址                                                
    advertised.listeners=PLAINTEXT://192.168.62.133:9092
    
    • 1
    • 2
    • 3
    • 4

    4.3 消费端改为手动提交offset后报错

    配置文件对应修改:

    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual_immediate
    
    • 1
    • 2

    4.4 删除topic及数据

    1. 关闭kafka、zookeeper服务
    2. 删除/data/kafka-logs目录下所有数据
    3. 删除/data/zookeeper/version-2目录下所有数据
    4. 重启

    参考:

    https://blog.csdn.net/qq_41969358/article/details/123032806

    https://www.cnblogs.com/along21/p/10278100.html

    https://blog.csdn.net/qq_41432730/article/details/121924814

    https://docs.spring.io/spring-kafka/reference/html/#getting-started

    https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka

  • 相关阅读:
    微信小程序 按钮颜色
    机器学习-04-分类算法-02贝叶斯算法
    不止于Kubernetes,开发人员应着眼于更多适合云原生应用的范式
    K8s云原生存储Rook详解
    SaaSBase:什么是涂色scrm?
    假设检验3
    JavaSE类和对象
    Retrofit项目 - Android和Java的类型安全的HTTP客户端
    2023年 MOOC《计算机网络》—— 第四章CSMA/CD作业答案解析(手写版)
    SpringBoot中“@SpringBootApplication“自动配置原理《第七课》
  • 原文地址:https://blog.csdn.net/zhanghuaiyu_35/article/details/125884643