• Springboot中使用kafka


    首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。

    此文背景的环境是windows,linux流程也差不多。

    • 解压在D盘下或者什么地方,注意不要放在桌面等绝对路径太长的地方

    • 打开config中的 zookeeper.properties,自己选择性修改clientPort,不想改也行

    • 修改config中的 server.properties

    1.取消 advertised.listeners 注释,修改kafka地址与端口。如果要集群部署,broker.id不能重复,1号机是0,2号机是1这样的。

    2.修改 zookeeper.connect 为你上面zookeeper.properties中配置的地址

    • 配置好了,尝试开启kafka。

    来到bin/windows,shift右键在此处打开cmd,输入 zookeeper-server-start.bat ../../config/zookeeper.properties,等待其启动。(注意你config的路径不要写错)

    启动完再开一个cmd,输入 kafka-server-start.bat ../../config/server.properties

    如果在此环节出现问题,请查看logs中的日志,面向csdn。

    我遇到的问题是 他显示什么什么路径太长了,问题的原因是我把他放桌面上了。

    • 新建springboot项目,pom中添加依赖

      org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.projectlombok lombok true com.alibaba fastjson 1.2.28
    • 配置application.yml,写启动类,controller,创建User类,创建consumer

    application.yml

    spring:
      application:
        name: kafka
      kafka:
        bootstrap-servers: localhost:9092 #这个是你server.properties中配置的
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group #这个去config/consumer.properties中查看和修改
                                    # 不过好像不一样也不影响?
    server:
      port: 8001
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    controller

    @RestController
    public class ProducerController {
    ?
        @Autowired
        KafkaTemplate kafka;
    ?
        @RequestMapping("register")
        public String register(User user) {
            String message = JSON.toJSONString(user);
            System.out.println("接收到用户信息:" + message);
            kafka.send("register", message);
            //kafka.send(String topic, @Nullable V data) {
            return "OK";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    user

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class User implements Serializable {
    ?
        private String id;
    ?
        private String name;
    ?
        private Integer age;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    consumer

    @Configuration
    public class Consumer {
    ?
        @KafkaListener(topics = "register")
        public void consume(String message) {
            System.out.println("接收到消息:" + message);
            User user = JSON.parseObject(message, User.class);
            System.out.println("正在为 " + user.getName() + " 办理注册业务...");
            System.out.println("注册成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    此时启动springboot,然而报错了

    org.springframework.context.ApplicationContextException: 
        Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
    
    nested exception is java.lang.IllegalStateException:
         Topic(s) [register] is/are not present and missingTopicsFatal is true
    
    • 1
    • 2
    • 3
    • 4
    • 5

    为什么呢?

    请检查zookeeper和kafka的cmd上有没有他们启动失败的消息,如果有就重新启动下,记得先开zookeeper再开kafka。

    然后确认你的kafka上有没有"register"这个topic,此时我是没有的,而consumer又加了 @KafkaListener(topics = "register") 注解,于是他就启动失败了。

    有两种解决方式:

    1.比较傻X的方式,先将@KafkaListener注释掉,启动springboot后访问localhost:8001/register,他send的时候就会自行创建topic,再取消注释重新启动就可以了。

    2.cmd方式:输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic register

    然后我们再启动,已经启动成功了。访问 localhost:8001/register?name=JamesBond&age=55

    我们可以看到数据已经成功送到那里了。

    然后来测试一下速度

    @RequestMapping("test")
    public String test() {
        System.out.println("发送开始" + System.currentTimeMillis() % 10000);
        for (int i = 0; i < 1000; i++) {
            kafka.send("test", JSON.toJSONString(new User((1289312+i)+"",
                    "User" + i, (int)(Math.random() * 100), info)));
        }
        System.out.println("发送结束" + System.currentTimeMillis() % 10000);
        return "OK";
    }
    
    @KafkaListener(topics = "test")
    public void test(String message) {
        System.out.println("--" + System.currentTimeMillis() % 10000 + "--");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    console:

    发送开始1267
    --1384--
    --1384--
    ...
    --1715--
    --1715--
    发送结束1715
    --1715--
    --1715--
    ...
    --1734--
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    对比RabbitMQ

    发送开始5692
    --5766--
    --5766--
    ...
    --5973--
    --5974--
    发送结束5976
    --5977--
    --5977--
    ...
    --6456--
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    kafka:

    发送结束 - 发送开始=448ms

    接收结束 - 接收开始=350ms

    整体耗时: 467ms

    rabbit:

    发送结束 - 发送开始=284ms

    接收结束 - 接收开始=690ms

    整体耗时: 764ms

    OK既然我会用了 我就去学一下kafka基本知识了

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    软件测试面试指导,做到有备无患,offer手到擒来
    java框架 Spring之 AOP 面向切面编程 切入点表达式 AOP通知类型 Spring事务
    重磅推荐 | 朱嘉明:元宇宙——创意、思想、意识协作的下一代网络
    CANAPE中加载DBC后,如何在脚本中获取到DBC内的信号量
    CMU DLSys 课程笔记 2 - ML Refresher / Softmax Regression
    android本地分享
    最佳实践-SQL语法校验
    计算机软件著作权评估
    Apache Hop Transforms Samples【持续完善中】
    极智AI | 谈谈昇腾 CANN AIPP
  • 原文地址:https://blog.csdn.net/m0_67394002/article/details/126114372