• Spring cloud stream实现Kafka的消息收发


    用Spring cloud stream可以很方便的实现对Kafka消息的收发,以下是我按照Spring官网的例子实现的一个Kafka的应用。

    这个例子是实现一个电信公司收集用户消费电信服务,并计算费用的场景。包括了三个应用程序。

    1.记录用户使用电信服务时长:

    这个应用将模拟生成用户的话单,包括了用户ID,语音呼叫时长,数据业务流量的信息,并把话单信息发送到Kafka。

    用Spring boot创建一个名为usage-detail-sender-kafka的应用,依赖里面选择Spring for Apache Kafka, Spring cloud stream, Acutator

    新建一个名为UsageDetail的类,代码如下:

    1. package cn.roygao.usagedetailsender;
    2. public class UsageDetail {
    3. private String userId;
    4. private long duration;
    5. private long data;
    6. public String getUserId() {
    7. return this.userId;
    8. }
    9. public void setUserId(String userId) {
    10. this.userId = userId;
    11. }
    12. public long getDuration() {
    13. return this.duration;
    14. }
    15. public void setDuration(long duration) {
    16. this.duration = duration;
    17. }
    18. public long getData() {
    19. return this.data;
    20. }
    21. public void setData(long data) {
    22. this.data = data;
    23. }
    24. }

    新建一个名为UsageDetailSender的类,实现发送Kafka消息的功能,代码如下:

    1. package cn.roygao.usagedetailsender;
    2. import java.util.Random;
    3. import java.util.function.Supplier;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class UsageDetailSender {
    8. private String[] users = {"user1", "user2", "user3", "user4", "user5"};
    9. @Bean
    10. public Supplier sendEvents() {
    11. return () -> {
    12. UsageDetail usageDetail = new UsageDetail();
    13. usageDetail.setUserId(this.users[new Random().nextInt(5)]);
    14. usageDetail.setDuration(new Random().nextInt(300));
    15. usageDetail.setData(new Random().nextInt(700));
    16. return usageDetail;
    17. };
    18. }
    19. }

    这个代码里面使用了函数式编程的模式,Supplier实现了一个发送Kafka消息的功能。

    在application.properties里面添加以下配置:

    1. spring.cloud.stream.function.bindings.sendEvents-out-0=output
    2. spring.cloud.stream.bindings.output.destination=usage-detail
    3. # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
    4. server.port=0

    这个配置的第一行的意思是给这个Kafka的输出取一个别名,因为默认的这个Supplier的输出是绑定到spring.cloud.stream.function.bindings.xxxx-out-0的,其中xxxx代表这个输出函数的名字,在这里是sendEvents,因此我们用一个名为output的来代替xxxx-out-0这个名字。

    在第二行配置就是设定这个输出的目的地消息主题是usage-detail

    然后运行./mvnw clean package来编译。

    2. 计算用户使用电信服务的费用:

    这个应用将订阅Kafka的相关消息主题,接收应用1生成的用户的话单,然后计算相关的费用,并把费用发送到另外一个Kafka的消息主题。

    用Spring boot创建一个名为usage-cost-processor-kafka的应用,依赖里面选择Spring for Apache Kafka, Spring cloud stream, Acutator

    新建一个名为UsageDetail的类,如以上应用1的代码。

    新建一个名为UsageCostDetail的类,代码如下:

    1. package cn.roygao.usagecostprocessor;
    2. public class UsageCostDetail {
    3. private String userId;
    4. private double callCost;
    5. private double dataCost;
    6. public String getUserId() {
    7. return this.userId;
    8. }
    9. public void setUserId(String userId) {
    10. this.userId = userId;
    11. }
    12. public double getCallCost() {
    13. return this.callCost;
    14. }
    15. public void setCallCost(double callCost) {
    16. this.callCost = callCost;
    17. }
    18. public double getDataCost() {
    19. return this.dataCost;
    20. }
    21. public void setDataCost(double dataCost) {
    22. this.dataCost = dataCost;
    23. }
    24. @Override
    25. public String toString() {
    26. return "{" +
    27. " userId='" + getUserId() + "'" +
    28. ", callCost='" + getCallCost() + "'" +
    29. ", dataCost='" + getDataCost() + "'" +
    30. "}";
    31. }
    32. }

    新建一个UsageCostProcessor的类,用于接收应用1发出的消息,计算费用之后再发消息到Kafka,代码如下:

    1. package cn.roygao.usagecostprocessor;
    2. import java.util.function.Function;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class UsageCostProcessor {
    7. private double ratePerSecond = 0.1;
    8. private double ratePerMB = 0.05;
    9. @Bean
    10. public Function processUsageCost() {
    11. return usageDetail -> {
    12. UsageCostDetail usageCostDetail = new UsageCostDetail();
    13. usageCostDetail.setUserId(usageDetail.getUserId());
    14. usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
    15. usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
    16. return usageCostDetail;
    17. };
    18. }
    19. }

    这里使用了Function来定义一个Kafka的接收和发送,其中第一个参数表示接收的消息,第二个参数表示发送的消息。

    在application.properties里面添加以下配置:

    1. spring.cloud.stream.function.bindings.processUsageCost-in-0=input
    2. spring.cloud.stream.function.bindings.processUsageCost-out-0=output
    3. spring.cloud.stream.bindings.input.destination=usage-detail
    4. spring.cloud.stream.bindings.output.destination=usage-cost
    5. # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
    6. server.port=0

    3. 接收用户电信费用:

    这个应用将订阅Kafka的主题,接收应用2发送的费用信息并在日志中打印出来。

    新建一个名为usage-cost-logger-kafka的应用,依赖里面选择Spring for Apache Kafka, Spring cloud stream, Acutator

    新建一个名为UsageCostDetail的类,如以上应用2的代码。

    新建一个名为UsageCostLogger的类,代码如下:

    1. package cn.roygao.usagecostlogger;
    2. import java.util.function.Consumer;
    3. import org.slf4j.Logger;
    4. import org.slf4j.LoggerFactory;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. @Configuration
    8. public class UsageCostLogger {
    9. private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerKafkaApplication.class);
    10. @Bean
    11. public Consumer process() {
    12. return usageCostDetail -> {
    13. logger.info(usageCostDetail.toString());
    14. };
    15. }
    16. }

    这里使用了Consumer来实现对Kafka消息的消费。

    在application.properties里面添加以下的配置:

    1. spring.cloud.stream.function.bindings.process-in-0=input
    2. spring.cloud.stream.bindings.input.destination=usage-cost
    3. # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
    4. server.port=0

    部署测试

    以上三个应用编译好之后,我们就可以进行测试了。首先启动Kafka,这里我采用运行Docker的方式来启动Kafka,下载以下的docker-compose文件:

    1. curl --silent --output docker-compose.yml \
    2. https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.0-post/cp-all-in-one/docker-compose.yml

    然后运行docker compose up启动,之后在浏览器访问localhost:9021即可查看Kafka控制台的信息。

    分别运行以上编译好的应用,然后在Kafka的控制台里面可以看到有usage-detail, usage-cost这两个主题的信息,并且在第三个应用输出的日志中可以看到打印出来的用户费用的信息。

    以上就是一个简单的应用Spring cloud stream编写Kafka应用的例子。

  • 相关阅读:
    Ubuntu终端自动补全
    RabbitMQ 安装使用
    C++ Qt开发:QFileSystemModel文件管理组件
    this is incompatible with sql_mode=only_full_group_by
    Solon 1.8.0 发布,云原生微服务开发框架
    Protobuf: 高效数据传输的秘密武器
    GO语言开山篇(一):学习方向
    OpenHarmony其他工具类—lua
    YOLOv8轻量化模型:DCNV3结合c2f | CVPR2023
    还记得这首是什么歌吗?
  • 原文地址:https://blog.csdn.net/gzroy/article/details/127738006