• Kafka Connect的内部结构和故障处理


    探索Connect的内部结构

    为了理解分布式模式下的Connect是如何承受故障的,首先应该理解它是如何通过联合使用内部主题和消费者组成员来存储其状态的。其次,应该熟悉Connect所使用的再平衡协议,用于将任务分散到各个worker上,并检测worker的故障。

    内部主题(Internal Topic)

    如第2章所述,Connect自动创建并使用3个主题:

    • 配置主题,通过config.storage.topic指定

    • 偏移量主题,通过offsets.storage.topic来指定

    • 状态主题,通过status.storage.topic来指定。

    在配置主题中,Connect存储用户已启动的所有连接器和任务的配置。每次用户更新连接器的配置或当连接器请求重新配置时(例如,它检测到可以启动更多任务),都会向该主题发送一条记录。这个主题是压缩主题,所以它总是为每个实体保留最后的状态,同时确保它不会使用大量的存储空间。

    在偏移量主题中,Connect存储source connectors的偏移量。出于同样的原因,这个主题也是压缩主题的。默认情况下,Connect会用几个分区来创建这个主题,因为每个source task会会定期使用它来写入其位置。sink connectors的偏移量是使用常规的Kafka消费者组来存储的。

    在状态主题中,Connect存储连接器和任务的当前状态。REST接口用户查询的数据以本主题为中心。它允许用户查询任何worker,并且仍然可以得到所有运行中的插件的状态。它也是经过压缩的,而且应该有多个分区。

    在启动时,如果这些主题不存在,Connect会自动创建它们。Connect集群中的所有worker必须使用相同的主题,但如果运行多个Connect集群,每个集群都需要有自己单独的主题。所有这3个主题中的数据存储格式都是JSON,因此可以使用普通的消费者来查看数据。

    例如,使用kafka-console-consumer.sh工具,下面是查看状态主题内容的方法:

    1. ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    2.                                --topic connect-status \
    3.                                --from-beginning \
    4.                                --property print.key=true
    5. status-connector-file-source {"state":"RUNNING","trace":null, "worker_id":"192.168.1.12:8083","generation":5}

    在这个例子中,运行时将 status.storage.topic 配置为 connect-status,一个名为 file-source 的连接器,并通过这个名称来推导出这个连接器的记录的键 status-connector-file-source

    消费者组成员

    除了主题,Connect还广泛使用了Kafka的消费者组成员API。

    首先,对于每个sink连接器,Connect运行时运行一个常规的消费者组,从Kafka中提取记录。这些组是以连接器的名字命名的,例如对于一个名为file-sink的连接器,组名是connect-file-sink。组中的每个消费者都在向一个任务提供记录。这些组和它们的偏移量可以使用常规的消费者组工具来检索,比如kafka-consumer-groups.sh

    此外,Connect使用组成员API将任务分配给worker,并确保每个分区只被消费一次。在启动时,Connect使用其配置中的group.id值创建一个组。这个组并不直接被消费组工具所看到,因为它不是一个典型的消费组,但它的工作方式基本相同。这就是为什么所有具有相同group.id值的worker都会成为同一个Connect集群的一部分。

    要成为一个群组的成员,worker,就像普通的消费者一样,必须定期发送心跳。一个心跳只是一个请求,它包含了组名、成员ID和一些其他的字段来识别发送者。它由所有worker以固定的时间间隔(由heartbeat.interval.ms指定,默认为3秒)发送给作为其组协调器的broker。如果一个worker停止发送心跳,协调器将检测到它,将该worker从组中移除,并触发一次再平衡。在再平衡期间,使用再平衡协议将任务分配给worker,这样worker就接管了工作。

    Connect故障处理

    现在已经了解了Connect是如何管理其状态的,下面来看看最常见的故障类型,看看如何处理它们。

    为了建立一个有弹性的管道,关键是要了解系统中的所有组件是如何处理故障的。在本节中,我们将重点关注Connect以及它是如何处理故障的,而忽略其他组件(如操作系统、执行和部署环境或硬件)。

    我们将涵盖以下故障:

    • Worker故障

    • 连接器/任务失败

    • Kafka/外部系统故障

    我们还将讨论如何使用死信队列来处理无法处理的记录。

    Worker故障

    在分布式模式下,Connect可以跨多个worker运行,建议用户至少使用2个worker,以便对单个worker的故障有弹性。

    例如,如果我们有3个workers 运行2个连接器(C1和C2),不同的任务可以像图3-7那样分布。

    图3-7 带有3个worker的Connect集群示例。连接器C1有三个任务(T1、T2、T3), C2有两个任务(T1和T2)

    在这种情况下,如果worker2因为崩溃或维护而离线,Kafka就不会再收到它的心跳,在很短的时间内,它会自动把worker2踢出组。这将迫使Connect将所有正在运行的任务重新平衡到剩余的worker身上。

    再平衡之后,任务分配可能看起来像图3-8:

    图3-8 Connect已将所有任务重新分配给剩余的worker

    当重新平衡发生时,原本在worker2上的任务就不会运行。这种机制在几分钟内就会触发并完成。它主要取决于以下配置:

    • session.timeout.ms

      workers连续两次心跳之间的最大持续时间

    • rebalance.timeout.ms

      当重新平衡发生时,workers重新加入组的最大持续时间

    • scheduled.rebalance.max.delay.ms

      调度再平衡的最大延迟时间

    当worker没有干净地停止时,它有可能没有为它正在处理的所有记录提交偏移量。因此在重新启动时,一些任务可能会重新处理一些记录。我们将在本章后面讨论这个问题以及它是如何影响交付语义的。

    所以,为了让Connect能够处理worker的故障,需要确保有足够的容量来容纳这些worker上的任务。Connect没有任何机制来限制在重新平衡期间可以分配给worker的任务数量。如果一个worker被分配了太多的任务,它的性能就会下降,最终任务就不会有任何进展。至少,应该在任何时候都有足够的能力来处理单个worker,以可靠地处理这些worker的滚动重启。

    连接器/任务失败

    另一种常见的故障类型是连接器或其任务之一崩溃。到目前为止,我们已经简化了Connect运行连接器时的具体情况。实际上,它必须运行一个连接器的实例和一个或多个任务的实例。Connect跟踪这两个实例的健康状况,并将它们与一个状态联系起来,这个状态可以是:

    • UNASSIGNED

      连接器或任务还没有被分配给worker。

    • RUNNING

      连接器或任务在worker上正确运行。

    • PAUSED

      连接器或任务已经被用户通过REST API停止。

    • FAILED

      连接器或任务遇到错误并崩溃。

    • DESTROYED

      一个连接器或任务刚刚被用户通过REST API删除,并且正在关闭。这种状态永远不会向最终用户公开。

    • RESTARTING

      连接器或任务在暂停后刚刚由用户通过REST API重新启动。

    连接器和任务的状态可以通过REST API检索。图3-9描述了不同状态之间可能的转换。

     图3-9. 连接器和任务的状态转换

    TIP

    Connect会发出详细的指标,跟踪每个连接器在每个状态下花费的时间。关于如何检索和监控指标的细节,请参见第11章。

    运行时使用实现Connector接口的连接器类来配置和生成任务。如果它的任何方法失败并抛出Exception,Connect运行时将在短时间补偿间隔后自动重试调用它。有些连接器可能执行额外的逻辑,例如连接到目标系统以发现资源,因此这种重试机制允许处理连接问题。当这种情况发生时,连接器将处于RUNNING状态,但没有一个任务会被创建。一个例外是start()方法,与其他方法相反,如果它抛出一个异常,会立即将连接器置于FAILED状态。

    每个任务也可能遇到错误。默认情况下,Connect会让任务崩溃,将其标记为FAILED,并且不会试图自动重启它。Connect会发出任务状态的指标,管理员必须对其进行监控以快速识别故障。一个任务的失败不会触发重新平衡。

    在一次性故障的情况下,管理员可以通过REST API重新启动FAILED任务。REST API还可以用于检索导致任务崩溃的Exception及其堆栈跟踪。在系统故障(例如无法处理的记录)的情况下,Connect提供了跳过它的可能性,并有选择地发出详细的日志消息,而不是使任务失败。这可以使用errors.tolerance配置对每个连接器进行配置。

    Kafka/外部系统故障

    由于Connect在Kafka和外部系统之间流动数据,任何一个系统的故障都会影响Connect。

    正如第2章所详述的,Kafka可以被配置为一个非常有弹性的系统。对于生产用例,Kafka集群必须有多个brokers,并被配置为提供最大的可用性。此外,Connect必须被配置为用多个副本来创建它的主题,这样它就不会因为单个broker故障而受到负面影响。这包括作为连接器的source 或 sink的主题,内部Connect主题和__consumer_offsets主题。在这种情况下,Connect会自动重新连接到Kafka,继续运行。

    另一方面,外部系统的故障必须由连接器来处理。根据系统和连接器的实现,它可能会被自动处理,也可能会使任务崩溃,需要人工干预来恢复。

    在构建管道之前,务必阅读连接器文档并了解外部系统的故障模式,以衡量Connect管道的弹性。请记住,有时对于相同的连接器有多个社区实现,需要选择一个满足需求的实现。然后,进行弹性测试,以验证连接器是否为用例提供所需的弹性。最后,监测外部系统和连接器的适当指标和日志是很重要的。

    死信队列

    在处理无法处理的记录时,对于sink连接器,Connect可以使用死信队列。死信队列(通常缩写为DLQ)是传统消息系统的一个概念,它基本上是一个存储无法处理或交付的记录的地方,而不是简单地忽略这些记录。在Connect中,死信队列是一个写入不可处理记录的主题(通过连接器配置中的errors.deadletterqueue.topic.name指定)。然而,Connect并没有为source连接器提供类似的机制,因为它不能将来自外部系统的记录转换为Kafka记录。

    注意

    Kafka 2.6中通过KIP-610添加了对sink连接器的死信队列的支持。

    让我们看一个使用死信队列的例子。当运行S3 sink连接器时,Connect运行时在将记录传递给连接器之前从Kafka主题读取记录。由于预计该主题将包含Avro记录,因此我们将连接器配置为一个Avro转换器。然而,如果主题中的一条记录不是Avro格式,例如应用程序发出了一条JSON记录,连接器将无法处理这条记录。与其让连接器失败或丢失这条记录,Connect可以把它转发到一个死信队列,并继续处理主题中的其他记录。连接器的配置将包含以下设置:

    1. {
    2.      "connector.class":  "io.confluent.connect.s3.S3SinkConnector",
    3.      "value.converter":  "io.confluent.connect.avro.AvroConverter",
    4.      "errors.tolerance": "all",
    5.      "errors.deadletterqueue.topic.name": "my-dlq"
    6. }

    这允许死信队列主题的内容由另一个机制处理,例如另一个连接器或一个消费者应用程序。

    图3-10显示了一个使用死信队列的例子。

    图3-10 无法处理的记录可以被送到一个死信队列中,由另一个机制来处理。

    流程开始时,S3 sink连接器配置了Avro,从输入主题接收记录(1)。Avro记录被正确处理并发送到S3(2)。如果一个记录不能被处理,它会被发送到为连接器配置的死信队列(3)。在这个例子中,另一个应用程序从死信队列(4)接收记录,处理它们并报告错误(5)。

  • 相关阅读:
    redis(其它操作、管道)、django中使用redis(通用方案、 第三方模块)、django缓存、celery介绍(celery的快速使用)
    图像处理中常见的几种插值方法:最近邻插值、双线性插值、双三次插值(附Pytorch测试代码)
    MySQL 入门教程
    一些 Docker 基础指令
    break pad源码编译--参考大佬博客的总结
    centos 8 yum源不能使用问题
    AI推介-大语言模型LLMs论文速览(arXiv方向):2024.03.05-2024.03.10—(2)
    轻量高效、灵活可扩展!了解下Alibaba QLExpress规则引擎的魅力
    如何获取standard cell各端口的作用
    Iptables官方教程-学习笔记5
  • 原文地址:https://blog.csdn.net/daydaylearn/article/details/126072516