• Spring Boot与运行在Kubernetes上的ksqlDB集成教程


    在本文中,您将学习如何在 Kubernetes 上运行 ksqlDB 并将其与 Spring Boot 一起使用。您还将了解如何基于 Strimzi 运算符在 Kubernetes 上运行 Kafka。

    为了将 Spring Boot 与 ksqlDB 服务器集成,我们将利用 ksqlDB 提供的轻量级 Java 客户端。此客户端支持拉取和推送查询。它还提供了用于插入行和创建表或流的 API。 您可以在此处 的 ksqlDB 文档中阅读有关它的更多信息。

    我们的示例 Spring Boot 应用程序非常简单。我们将使用 Spring Cloud Stream Supplierbean 生成事件并将其发送到 Kafka 主题。有关使用 Spring Cloud Stream 的 Kafka 的更多信息,请参阅以下 文章 。

    另一方面,我们的应用程序使用 kSQL 查询从 Kafka 主题获取数据。它还KTable在启动时创建。

    源代码

    如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库 。然后进入transactions-service目录。之后,您应该按照我的指示进行操作。让我们开始。

    先决条件

    我们将使用几种工具。你需要有:

    • Kubernetes 集群——它可能是一个单节点的本地集群,例如 Minikube 或 Kind。就个人而言,我在 Docker 桌面上使用 Kubernetes
    • kubectlCLI – 与集群交互
    • Helm——我们将使用它在 Kubernetes 上安装 ksqlDB 服务器。如果您没有 Helm,则必须安装它

    使用 Strimzi 在 Kubernetes 上运行 Kafka

    当然,我们需要一个 Kafka 实例来执行我​们的练习。有几种方法可以在 Kubernetes 上运行 Kafka。我将向您展示如何使用基于运算符的方法来实现。第一步,您需要在集群上安装 OLM(Operator Lifecycle Manager)。为此,您只需在 Kubernetes 上下文中执行以下命令:

    1. $ curl -L https:<font><i>//github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh</i></font><font>
    2. $ chmod +x install.sh
    3. $ ./install.sh v0.21.2
    4. </font>

    然后,您可以继续安装 Strimzi 操作员。这只是一个命令。

    1. $ kubectl create -f https:<font><i>//operatorhub.io/install/stable/strimzi-kafka-operator.yaml</i></font><font>
    2. </font>

    现在,我们可以在 Kubernetes 上创建一个 Kafka 集群。让我们从练习的专用命名空间开始:

    $ kubectl create ns kafka

    我假设你有一个单节点 Kubernetes 集群,所以我们还创建了一个单节点 Kafka。Kafka这是带有CRD的 YAML 清单。您可以在路径下的存储库中找到它k8s/cluster.yaml。

    1. apiVersion: kafka.strimzi.io/v1beta2
    2. kind: Kafka
    3. metadata:
    4. name: my-cluster
    5. spec:
    6. entityOperator:
    7. topicOperator: {}
    8. userOperator: {}
    9. kafka:
    10. config:
    11. <b>default</b>.replication.factor: 1
    12. inter.broker.protocol.version: <font>"3.2"</font><font>
    13. min.insync.replicas: 1
    14. offsets.topic.replication.factor: 1
    15. transaction.state.log.min.isr: 1
    16. transaction.state.log.replication.factor: 1
    17. listeners:
    18. - name: plain
    19. port: 9092
    20. tls: false
    21. type: internal
    22. - name: tls
    23. port: 9093
    24. tls: <b>true</b>
    25. type: internal
    26. replicas: 1
    27. storage:
    28. type: jbod
    29. volumes:
    30. - id: 0
    31. type: persistent-claim
    32. size: 30Gi
    33. deleteClaim: <b>true</b>
    34. version: 3.2.0
    35. zookeeper:
    36. replicas: 1
    37. storage:
    38. type: persistent-claim
    39. size: 10Gi
    40. deleteClaim: <b>true</b>
    41. </font>

    让我们将它应用到命名空间中的 Kubernetes kafka:

    $ kubectl apply -f k8s/cluster.yaml -n kafka

    您应该会看到一个 Kafka 实例和一个 Zookeeper 实例。如果 pod 正在运行,则意味着您在 Kubernetes 上安装了 Kafka。

    1. $ kubectl get pod -n kafka
    2. NAME READY STATUS RESTARTS AGE
    3. my-cluster-entity-<b>operator</b>-68cc6bc4d9-qs88p 3/3 Running 0 46m
    4. my-cluster-kafka-0 1/1 Running 0 48m
    5. my-cluster-zookeeper-0 1/1 Running 0 48m

    Kafka 在集群内以
    namemy-cluster-kafka-bootstrap和 port可用9092。

    1. kubectl get svc -n kafka
    2. NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
    3. my-cluster-kafka-bootstrap ClusterIP 10.108.109.255 <none> 9091/TCP,9092/TCP,9093/TCP 47m
    4. my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 47m
    5. my-cluster-zookeeper-client ClusterIP 10.102.10.251 <none> 2181/TCP 47m
    6. my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 47m

    在 Kubernetes 上运行 KsqlDB 服务器

    KsqlDB 服务器是 Confluent 平台的一部分。由于我们不是在 Kubernetes 上安装整个 Confluent Platform,而只是一个开源的 Kafka 集群,我们需要单独安装 KsqlDB Server。让我们用 Helm 来做。KSQL 服务器没有“官方”Helm 图表。因此,我们应该直接去 GitHub 上的 Confluent Helm 仓库:

    1. $ git clone https:<font><i>//github.com/confluentinc/cp-helm-charts.git</i></font><font>
    2. $ cd cp-helm-charts
    3. </font>

    在这个存储库中,您可以为每个单独的 Confluent 组件找到单独的 Helm 图表,包括控制中心或 KSQL Server。我们的图表在存储库中的位置是charts/cp-ksql-server. 我们需要在安装过程中覆盖一些默认设置。首先,我们必须禁用无头模式。在无头模式下,KSQL Server 不公开 HTTP 端点并从输入脚本加载查询。我们的 Spring Boot 应用程序将通过 HTTP 连接到服务器。在下一步中,我们应该覆盖 Kafka 集群的默认地址和仍然6.1.0存在的 KSQL Server 的默认版本。我们将使用最新版本7.1.1。这是helm您应该在 Kubernetes 集群上运行的命令:

    1. $ helm install cp-ksql-server \
    2. --set ksql.headless=false \
    3. --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    4. --set imageTag=7.1.1 \
    5. charts/cp-ksql-server -n kafka

    让我们验证 KSQL 是否在集群上运行:

    1. $ kubectl get pod -n kafka | grep ksql
    2. cp-ksql-server-679fc98889-hldfv 2/2 Running 0 2m11s

    HTTP 端点可用于 namecp-ksql-server和 port下的其他应用程序8088:

    1. $ kubectl get svc -n kafka | grep ksql
    2. cp-ksql-server ClusterIP 10.109.189.36 <none> 8088/TCP,5556/TCP 3m25s

    现在,我们的 Kubernetes 集群上运行着所需的全部人员。因此,我们可以继续进行 Spring Boot 应用程序的实现。

    将 Spring Boot 与 ksqlDB 集成

    我没有发现 Spring Boot 和 ksqlDB 之间有任何开箱即用的集成。因此,我们将ksqldb-api-client直接使用。首先,我们需要包含 ksqlDB Maven 存储库和一些依赖项:

    1. <dependencies>
    2. ...
    3. <dependency>
    4. <groupId>io.confluent.ksql</groupId>
    5. <artifactId>ksqldb-api-client</artifactId>
    6. <version>0.26.0</version>
    7. </dependency>
    8. <dependency>
    9. <groupId>io.confluent.ksql</groupId>
    10. <artifactId>ksqldb-udf</artifactId>
    11. <version>0.26.0</version>
    12. </dependency>
    13. <dependency>
    14. <groupId>io.confluent.ksql</groupId>
    15. <artifactId>ksqldb-common</artifactId>
    16. <version>0.26.0</version>
    17. </dependency>
    18. </dependencies>
    19. <repositories>
    20. <repository>
    21. <id>ksqlDB</id>
    22. <name>ksqlDB</name>
    23. <url>https:<font><i>//ksqldb-maven.s3.amazonaws.com/maven/</url></i></font><font>
    24. </repository>
    25. </repositories>
    26. </font>

    之后,我们可以定义一个@Bean返回 ksqlDBClient实现的 Spring。由于我们将在与 KSQL Server 相同的命名空间中运行我们的应用程序,因此我们需要提供 Kubernetes 服务名称作为主机名。

    1. @Configuration
    2. <b>public</b> <b>class</b> KSQLClientProducer {
    3. @Bean
    4. Client ksqlClient() {
    5. ClientOptions options = ClientOptions.create()
    6. .setHost(<font>"cp-ksql-server"</font><font>)
    7. .setPort(8088);
    8. <b>return</b> Client.create(options);
    9. }
    10. }
    11. </font>

    我们的应用程序通过 HTTP 端点与 KSQL Server 交互。KTable它在启动时创建一个单曲。为此,我们需要调用executeStatementKSQL Clientbean 实例上的方法。我们正在创建 SOURCE 表以启用对其运行 拉取查询 。该表从transactions主题中获取数据。它期望传入事件中的 JSON 格式。

    1. <b>public</b> <b>class</b> KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {
    2. <b>private</b> <b>static</b> <b>final</b> Logger LOG = LoggerFactory.getLogger(KTableCreateListener.<b>class</b>);
    3. <b>private</b> Client ksqlClient;
    4. <b>public</b> KTableCreateListener(Client ksqlClient) {
    5. <b>this</b>.ksqlClient = ksqlClient;
    6. }
    7. @Override
    8. <b>public</b> <b>void</b> onApplicationEvent(ContextRefreshedEvent event) {
    9. <b>try</b> {
    10. String sql = <font>""</font><font></font><font>"
    11. CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
    12. id BIGINT PRIMARY KEY,
    13. sourceAccountId BIGINT,
    14. targetAccountId BIGINT,
    15. amount INT
    16. ) WITH (
    17. kafka_topic='transactions',
    18. value_format='JSON'
    19. );
    20. </font><font>""</font><font></font><font>";
    21. ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
    22. LOG.info(</font><font>"Result: {}"</font><font>, result.queryId().orElse(<b>null</b>));
    23. } <b>catch</b> (ExecutionException | InterruptedException e) {
    24. LOG.error(</font><font>"Error: "</font><font>, e);
    25. }
    26. }
    27. }
    28. </font>

    创建表后,我们可以对其运行一些查询。有非常简单的查询。我们正在尝试查找与特定帐户相关的所有交易和所有交易。

    1. @RestController
    2. @RequestMapping(<font>"/transactions"</font><font>)
    3. <b>public</b> <b>class</b> TransactionResource {
    4. <b>private</b> <b>static</b> <b>final</b> Logger LOG = LoggerFactory.getLogger(TransactionResource.<b>class</b>);
    5. Client ksqlClient;
    6. <b>public</b> TransactionResource(Client ksqlClient) {
    7. <b>this</b>.ksqlClient = ksqlClient;
    8. }
    9. @GetMapping
    10. <b>public</b> List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
    11. StreamedQueryResult sqr = ksqlClient
    12. .streamQuery(</font><font>"SELECT * FROM transactions_view;"</font><font>)
    13. .get();
    14. Row row;
    15. List<Transaction> l = <b>new</b> ArrayList<>();
    16. <b>while</b> ((row = sqr.poll()) != <b>null</b>) {
    17. l.add(mapRowToTransaction(row));
    18. }
    19. <b>return</b> l;
    20. }
    21. @GetMapping(</font><font>"/target/{accountId}"</font><font>)
    22. <b>public</b> List<Transaction> getTransactionsByTargetAccountId(@PathVariable(</font><font>"accountId"</font><font>) Long accountId)
    23. throws ExecutionException, InterruptedException {
    24. StreamedQueryResult sqr = ksqlClient
    25. .streamQuery(</font><font>"SELECT * FROM transactions_view WHERE sourceAccountId="</font><font> + accountId + </font><font>";"</font><font>)
    26. .get();
    27. Row row;
    28. List<Transaction> l = <b>new</b> ArrayList<>();
    29. <b>while</b> ((row = sqr.poll()) != <b>null</b>) {
    30. l.add(mapRowToTransaction(row));
    31. }
    32. <b>return</b> l;
    33. }
    34. <b>private</b> Transaction mapRowToTransaction(Row row) {
    35. Transaction t = <b>new</b> Transaction();
    36. t.setId(row.getLong(</font><font>"ID"</font><font>));
    37. t.setSourceAccountId(row.getLong(</font><font>"SOURCEACCOUNTID"</font><font>));
    38. t.setTargetAccountId(row.getLong(</font><font>"TARGETACCOUNTID"</font><font>));
    39. t.setAmount(row.getInteger(</font><font>"AMOUNT"</font><font>));
    40. <b>return</b> t;
    41. }
    42. }
    43. </font>

    使用 Spring Cloud Stream 向主题发送事件

    最后,我们可以进行练习的最后一部分。我们需要生成测试数据并将其发送到 Kafkatransactions主题。实现它的最简单方法是使用 Spring Cloud Stream Kafka 模块。首先,让我们添加以下 Maven 依赖项:

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    4. </dependency>

    然后,我们可以创建一个基于 Spring Supplierbean 的生产者。Supplierbean 不断生成新事件并将其发送到目标通道。默认情况下,它每秒重复一次该操作。

    1. @Configuration
    2. <b>public</b> <b>class</b> KafkaEventProducer {
    3. <b>private</b> <b>static</b> <b>long</b> transactionId = 0;
    4. <b>private</b> <b>static</b> <b>final</b> Random r = <b>new</b> Random();
    5. @Bean
    6. <b>public</b> Supplier<Message<Transaction>> transactionsSupplier() {
    7. <b>return</b> () -> {
    8. Transaction t = <b>new</b> Transaction();
    9. t.setId(++transactionId);
    10. t.setSourceAccountId(r.nextLong(1, 100));
    11. t.setTargetAccountId(r.nextLong(1, 100));
    12. t.setAmount(r.nextInt(1, 10000));
    13. Message<Transaction> o = MessageBuilder
    14. .withPayload(t)
    15. .setHeader(KafkaHeaders.MESSAGE_KEY, <b>new</b> TransactionKey(t.getId()))
    16. .build();
    17. <b>return</b> o;
    18. };
    19. }
    20. }

    当然,我们还需要提供我们的 Kafka 集群的地址和频道的目标主题名称。Kafka的地址是在部署阶段注入的。

    1. spring.kafka.bootstrap-servers = ${KAFKA_URL}
    2. spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

    最后,让我们在 Kubernetes 上部署我们的 Spring Boot。这是包含 KubernetesDeployment和Service定义的 YAML 清单:

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: transactions
    5. spec:
    6. selector:
    7. matchLabels:
    8. app: transactions
    9. template:
    10. metadata:
    11. labels:
    12. app: transactions
    13. spec:
    14. containers:
    15. - name: transactions
    16. image: piomin/transactions-service
    17. env:
    18. - name: KAFKA_URL
    19. value: my-cluster-kafka-bootstrap:9092
    20. ports:
    21. - containerPort: 8080
    22. ---
    23. apiVersion: v1
    24. kind: Service
    25. metadata:
    26. name: transactions
    27. spec:
    28. type: ClusterIP
    29. selector:
    30. app: transactions
    31. ports:
    32. - port: 8080

    让我们在命名空间中部署应用程序kafka:

    $ kubectl apply -f k8s/deployment.yaml -n kafka

    在 Kubernetes 上测试 ksqlDB

    在 Kubernetes 上部署应用程序后,让我们启用port-forward在本地端口上对其进行测试:

    $ kubectl port-forward service/transactions 8080:8080

    现在,我们可以测试我们的两个 HTTP 端点。让我们从搜索所有事务的端点开始:

    $ curl http://localhost:8080/transactions

    然后,您可以调用端点来搜索与 相关的所有事务targetAccountId,例如:

    $ curl http://localhost:8080/transactions/target/10

    最后的想法

    在本文中,我想展示如何在 Kubernetes 上使用 ksqlDB。我们使用 Spring Boot 和 Spring Cloud Stream 等框架与 Kafka 和 ksqlDB 进行交互。您可以了解如何使用 Strimzi 运算符在 Kubernetes 上运行 Kafka 集群,或者如何直接从 Helm 存储库部署 KSQL Server。

     

     

  • 相关阅读:
    Web前端:Vue有哪些优缺点?
    已解决java.lang.ClassCircularityError: 类循环依赖错误的正确解决方法,亲测有效!!!
    使用微信公众号搭建免费查券返利机器人来赚佣金详细教程分享
    Linus Torvalds接受来自微软的Linux Hyper-V升级
    130、★LeetCode-115.不同的子序列
    论文笔记:DETR: End-to-End Object Detection with Transformers (from 李沐老师and朱老师)
    DRM系列(4)之drmModePageFlip
    在Mac M1笔记本上跑大语言模型llama3的4个步骤?(install、pull、run、ask)
    【java爬虫】使用vue+element-plus编写一个简单的管理页面
    【数学知识】—— 快速幂 / 扩展欧几里得算法
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/125458549