• Java Spring Cloud XIX 之 Kafka II


    Java Spring Cloud XIX 之 Kafka II

    1.Kafka使用Demo

    不要关闭Zookeeper和Kafka的dos窗口

    我们再csmall项目中编写一个简单的Demo学习Kafka的使用

    在csmall-cart-webapi模块中

    添加依赖

    <dependency>
        <groupId>com.google.code.gsongroupId>
        <artifactId>gsonartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    修改yml文件配置

    spring:
      kafka:
        # 定义kafka的位置
        bootstrap-servers: localhost:9092
        # 话题的分组名称,是必须配置的
        # 为了区分当前项目和其他项目使用的,防止了不同项目相同话题的干扰或错乱
        # 本质是在话题名称前添加项目名称为前缀来防止的
        consumer:
          group-id: csmall
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    SpringBoot启动类中添加注解

    @SpringBootApplication
    @EnableDubbo
    // 启动kafka的功能
    @EnableKafka
    // 为了测试kafka,我们可以周期性的发送消息到消息队列
    // 使用SpringBoot自带的调度工具即可
    @EnableScheduling
    public class CsmallCartWebapiApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(CsmallCartWebapiApplication.class, args);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    下面我们就可以实现周期性的向kafka发送消息并接收的操作了

    编写消息的发送

    cart-webapi包下创建kafka包

    包中创建Producer类来发送消息

    // 我们需要周期性的向Kafka发送消息
    // 需要将具备SpringBoot调度功能的类保存到Spring容器才行
    @Component
    public class Producer {
    
        // 能够实现将消息发送到Kafka的对象
        // 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
        // KafkaTemplate<[话题名称的类型],[传递消息的类型]>
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        // 每隔10秒向Kafka发送信息
        int i=1;
        // fixedRate是周期运行,单位毫秒 10000ms就是10秒
        @Scheduled(fixedRate = 10000)
        // 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
        public void sendMessage(){
            // 实例化一个Cart类型对象,用于发送消息
            Cart cart=new Cart();
            cart.setId(i++);
            cart.setCommodityCode("PC100");
            cart.setPrice(RandomUtils.nextInt(100)+200);
            cart.setCount(RandomUtils.nextInt(5)+1);
            cart.setUserId("UU100");
            // 将cart对象转换为json格式字符串
            Gson gson=new Gson();
            // 执行转换
            String json=gson.toJson(cart);
            System.out.println("本次发送的消息为:"+json);
            // 执行发送
            // send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型
            kafkaTemplate.send("myCart",json);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    创建一个叫Consumer的类来接收消息

    // 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0
    @Component
    public class Consumer {
    
        // SpringKafka框架接收Kafka中的消息使用监听机制
        // SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
        // 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
        @KafkaListener(topics = "myCart")
        // 上面注解和下面方法关联,方法的参数就是接收到的消息
        public void received(ConsumerRecord<String,String> record){
            // 方法参数类型必须是ConsumerRecord
            // ConsumerRecord<[话题名称类型],[消息类型]>
            // 获取消息内容
            String json=record.value();
            // 要想在java中使用,需要转换为java对象
            Gson gson=new Gson();
            // 将json转换为java对象,需要提供转换目标类型的反射
            Cart cart=gson.fromJson(json,Cart.class);
            System.out.println("接收到对象为:"+cart);
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    我是将军;我一直都在,。!

  • 相关阅读:
    8 张图 | 剖析 Eureka 的首次同步注册表
    3D打印CLI文件格式的读取
    TensorFlow简介
    Arcgis提取玉米种植地分布,并以此为掩膜提取遥感影像
    时序预测 | MATLAB实现CNN-GRU卷积门控循环单元时间序列预测(风电功率预测)
    Sentinel实现服务降级并与api解耦
    R语言作图——Heatmap(热图)
    STM32——SDIO的学习(驱动SD卡)(实战篇)
    基于Python的IMDB电影评论文本分类
    使用vnc远程centos桌面
  • 原文地址:https://blog.csdn.net/letterljhx/article/details/126985642