• 最简单的SpringCloudStream集成Kafka教程


    开发中,服务与服务之间通信通常会用到消息中间件,如果我们使用了某一个MQ,那么消息中间件与我们的系统算是高耦合。将来有一天,要替换成另外的MQ,我们的改动就会比较大。为了解决这个问题,我们可以使用Spring Cloud Stream 来整合我们的消息中间件,降低耦合度,使服务可以更多关注自己的业务逻辑等。

    今天为大家带来一个人人可实操的SpringCloudStream集成Kafka的快速入门示例。

    1 前言

    SpringCloudStream是一个构建高扩展性的事件消息驱动的微服务框架。简单点说就是帮助你操作MQ,可以与底层MQ框架解耦。将来想要替换MQ框架的时候会比较容易。

    Kafka是一个分布式发布 - 订阅消息系统,源于LinkedIn的一个项目,2011年成为开源Apache项目。

    ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册,Kafka的实现同时也依赖于zookeeper。

    2 Windows搭建简单的Kafka

    2.1 启动zookeeper

    使用Kafka首先需要启动zookeeper,windows中搭建zookeeper也很简单。以下几步即可完成:

    1. 下载zookeeper (本文使用3.7.0版本,下载链接在文章末尾。)
    2. 配置基本环境变量:
      1. 将conf文件夹下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目录dataDir。
      2. bin文件夹下面有zkEnv.cmd有zookeeper相关的配置,其中就包括JAVA_HOME,所以系统环境变量需要配置JAVA_HOME,或者直接用Java的路径来替换。
    3. 启动,在bin目录下运行zkServer.cmd脚本启动zookeeper。

    默认启动端口2181为。

    正常启动如下:

    2.2 搭建Kafka

    本地使用kafka同样也是如下的几个步骤:

    1. 下载Kafka(本文使用2.11版本,下载链接见文章末尾)
    2. 环境变量配置:
    3. 查看config文件下面的 server.properties配置文件中的zookeeper的配置zookeeper.connect=localhost:2181
    4. 在bin/windows文件夹下面kafka-run-class.bat文件中有JAVA_HOME的配置,同样也可以直接改成系统的Java路径.
    5. 在kafka根目录下使用如下命令启动kafka,并在zookeeper中注册。# .\bin\windows\kafka-server-start.bat .\config\server.properties
    6. 创建topic,在bin\windows目录下使用如下命令。创建名称为“test”的topickafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    7. 使用windows命令窗口的producer和consumer,在bin\windows目录下使用如下命令#test topic的消息生产者
      kafka-console-producer.bat --broker-list localhost:9092 --topic test#test topic的消息消费者
      kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test#test topic的消息消费者(从头消费)
      kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic

    kafka启动windows界面如下

    3 SpringCloudStream集成Kafka

    3.1 引入依赖

    由于我们直接使用Spring Cloud Stream 集成Kafka,官方也已经有现成的starter。

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    4. <version>2.1.0.RELEASE</version>
    5. </dependency>

    3.2 关于kafka的配置

    1. spring:
    2. application:
    3. name: shop-server
    4. cloud:
    5. stream:
    6. bindings:
    7. #配置自己定义的通道与哪个中间件交互
    8. input: #MessageChannel里InputOutput的值
    9. destination: test #目标主题 相当于kafka的topic
    10. output:
    11. destination: test1 #本例子创建了另外一个topic (test1)用于区分不同的功能区分。
    12. default-binder: kafka #默认的binder是kafka
    13. kafka:
    14. binder:
    15. zk-nodes: localhost:2181
    16. bootstrap-servers: localhost:9092 #kafka服务地址,集群部署的时候需要配置多个,
    17. consumer:
    18. group-id: consumer1
    19. producer:
    20. key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
    21. value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
    22. client-id: producer1
    23. server:
    24. port: 8100

    3.3 消费者示例

    首先需要定义SubscribableChannel 接口方法使用Input注解。

    1. public interface Sink {
    2. String INPUT = "input";
    3. @Input("input")
    4. SubscribableChannel input();
    5. }

    然后简单的使用 StreamListener 监听某一通道的消息。

    1. @Service
    2. @EnableBinding(Sink.class)
    3. public class MessageSinkHandler {
    4. @StreamListener(Sink.INPUT)
    5. public void handler(Message msg){
    6. System.out.println(" received message : "+msg);
    7. }
    8. }

    cloud stream配置中绑定了对应的Kafka topic,如下

    1. cloud:
    2. stream:
    3. bindings:
    4. #配置自己定义的通道与哪个中间件交互
    5. input: #SubscribableChannel里Input值
    6. destination: test #目标主题

    我们使用Kafka console producer 生产消息。

    kafka-console-producer.bat --broker-list localhost:9092 --topic test
    

    同时启动我们的示例SpringBoot项目,使用producer推送几条消息。

    我们同时启动一个Kafka console consumer

    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
    

    消费结果如下:

    Spring Boot 项目消费消息如下: 

    3.4 生产者示例

    首先需要定义生产者MessageChannel,这里会用到Output注解

    1. public interface KafkaSource {
    2. String OUTPUT = "output";
    3. @Output(KafkaSource.OUTPUT)
    4. MessageChannel output();
    5. }

    使用MessageChannel 发送消息。

    1. @Component
    2. public class MessageService {
    3. @Autowired
    4. private KafkaSource source;
    5. public Object sendMessage(Object msg) {
    6. source.output().send(MessageBuilder.withPayload(msg).build());
    7. return msg;
    8. }

    定义一个Rest API 来触发消息发送

    1. @RestController
    2. public class MessageController {
    3. @Autowired
    4. private MessageService messageService;
    5. @GetMapping(value = "/sendMessage/{msg}")
    6. public String sendMessage(@PathVariable("msg") String msg){
    7. messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
    8. return "sent message";
    9. }
    10. }

    配置中关于producer的配置如下

    1. cloud:
    2. stream:
    3. bindings:
    4. input:
    5. destination: test
    6. output:
    7. destination: test1 #目标topic

    启动SpringBoot App, 并触发如下API call

    http://localhost:8100/sendMessage/JavaNorthProducer

    我们同时启动一个Kafka console consumer,这里我们使用另一个test1 topic

    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1
    

    console consumer消费消息如下:

    总结

    本章初步介绍了Spring Cloud Stream 集成Kafka的简单示例,实现了简单的发布-订阅功能。但是Spring Cloud Stream肯定还有更多的功能,我们后续还将继续深入学习更多Stream的功能。

  • 相关阅读:
    MySQ之备份与恢复
    GO七天开发挑战:7天实现Web框架-Gee(day 4 分组控制Group)
    网络安全检测与防范 测试题(五)
    Unity之ShaderGraph如何实现无贴图水球效果
    0.96寸OLED屏显示测试和代码详细分析SPI通信
    基于鲲鹏服务器搭建简单的开源论坛系统(LAMP)实践分享
    Pwn 学习 fmt_test_2格式化字符串
    Python+pytest接口自动化之参数关联
    JS EventListener
    golang学习5,glang的web的restful接口
  • 原文地址:https://blog.csdn.net/lt_xiaodou/article/details/126527224