• Kafka在企业级应用中的实践


    alt

    前言

    前面说了很多Kafka的性能优点,有些童鞋要说了,这Kafka在企业开发或者企业级应用中要怎么用呢?今天咱们就来简单探究一下。

    1、 使用 Kafka 进行消息的异步处理

    Kafka 提供了一个可靠的消息传递机制,使得企业能够将不同组件之间的通信解耦,实现高效的异步处理。在企业级应用中,可以通过以下步骤来使用 Kafka 进行消息的异步处理:

    1. 创建一个或多个主题(topic)用于存储消息。主题可以按照业务逻辑进行划分,每个主题可以有多个分区(partition)。
    2. 生产者(Producer)将消息发送到指定的主题中。
    3. 消费者(Consumer)从主题订阅消息,并将其处理逻辑与生产者解耦。消费者可以根据需求选择不同的消费模式,如订阅所有消息或只订阅特定分区的消息。
    4. 消费者可以将处理结果发送到其他系统,或者将消息转发到其他 Kafka 主题中进行进一步处理。

    通过使用 Kafka 进行消息的异步处理,企业可以实现高效、可伸缩的系统架构,并且降低各个组件之间的耦合程度。

    2、 Kafka 的消息转发和备份机制

    Kafka 借助其分布式的架构和复制机制,实现了消息的转发和备份,确保数据的可靠性和持久性:

    1. 消息转发:Kafka 通过将消息分发到多个分区来实现消息的转发,每个分区可以由多个消费者订阅。分区之间的消息转发通过消费者群组协调器(Consumer Group Coordinator)来实现,协调器负责将消息均匀地分发给消费者。
    2. 备份机制:Kafka 将每个分区的消息进行副本(Replica)备份,并将副本分布在不同的 Broker 节点上。如果某个 Broker 节点发生故障,可以通过副本在其他节点上进行数据的恢复,确保数据的可靠性和持久性。

    通过消息转发和备份机制,Kafka 实现了高可用性和数据冗余,保证了数据流的可靠性和持久性。

    3、 Kafka Connect 和 Kafka Streams 的用途和特性

    1. Kafka Connect:是 Kafka 提供的一个工具,用于将外部系统和 Kafka 进行连接。通过 Kafka Connect,企业可以轻松地实现数据的导入和导出,与各种数据源(如数据库、文件系统)进行集成,并且可以自定义开发 Connectors,与特定的数据源进行交互。Kafka Connect 实现了高性能、可伸缩的数据传输,并且提供了故障恢复和数据转换等功能。

    使用 Kafka Connect 在 Java 中有两种方式:Standalone 模式和分布式模式。

    1. Standalone 模式:
    import org.apache.kafka.connect.runtime.ConnectorConfig;
    import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
    import org.apache.kafka.connect.runtime.Connect;
    import java.util.Properties;

    public class KafkaConnectStandaloneApp {
        public static void main(String[] args) throws InterruptedException {
            // 创建配置
            Properties props = new Properties();
            props.setProperty(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.setProperty(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
            props.setProperty(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
            
            // 创建 Standalone 模式的 Kafka Connect
            Connect connect = new Connect(new StandaloneConfig(props));
            connect.start(); // 启动 Kafka Connect
            Thread.sleep(5000); // 等待一段时间
            
            // 停止 Kafka Connect
            connect.stop();
        }
    }
    • 1
    1. 分布式模式:
    import org.apache.kafka.connect.runtime.ConnectorConfig;
    import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
    import org.apache.kafka.connect.runtime.Connect;
    import java.util.Properties;

    public class KafkaConnectDistributedApp {
        public static void main(String[] args) throws InterruptedException {
            // 创建配置
            Properties props = new Properties();
            props.setProperty(DistributedConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            
            // 创建分布式模式的 Kafka Connect
            Connect connect = new Connect(new DistributedConfig(props));
            connect.start(); // 启动 Kafka Connect
            Thread.sleep(5000); // 等待一段时间
            
            // 停止 Kafka Connect
            connect.stop();
        }
    }
    • 1

    注意:上述示例代码中的配置项可以根据实际需要进行调整,例如连接到的 Kafka 服务器地址,序列化器等。 2. Kafka Streams:是一个轻量级的流处理库,用于对 Kafka 主题的数据进行实时处理和转换。通过 Kafka Streams,企业可以构建实时的数据处理应用程序,实现数据的实时计算、流合并、按键分组和聚合等功能。Kafka Streams 提供了高性能的流处理和事件驱动的架构,并且与 Kafka 生态系统的其他组件无缝集成,提供了可扩展、容错的流处理解。 引入jar包

    <dependencies>
        <dependency>
            <groupId>org.apache.kafkagroupId>
            <artifactId>kafka-streamsartifactId>
            <version>2.8.0version>
        dependency>
    dependencies>
    • 1
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.Printed;
    import org.apache.kafka.streams.kstream.Produced;

    import java.util.Properties;

    public class KafkaStreamsApp {
        public static void main(String[] args) {
            // 创建配置
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            // 创建流构建器
            StreamsBuilder builder = new StreamsBuilder();

            // 从输入主题接收数据
            builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
                    .peek((k, v) -> System.out.println("Received: key=" + k + ", value=" + v))
                    .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

            // 创建 Kafka Streams 应用程序
            KafkaStreams streams = new KafkaStreams(builder.build(), props);

            // 启动应用程序
            streams.start();

            // 添加关闭钩子以优雅地关闭应用程序
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }
    }
    • 1

    顶尖架构师栈

    关注回复关键字

    【C01】超10G后端学习面试资源

    【IDEA】最新IDEA激活工具和码及教程

    【JetBrains软件名】 最新软件激活工具和码及教程

    工具&码&教程

    本文由 mdnice 多平台发布

  • 相关阅读:
    数据结构(7-2广度~~7-15)所有代码
    【PyTorch深度学习项目实战100例】—— 基于ViT(Vision_Transformer)识别七龙珠超级赛亚人 | 第52例
    https域名下 请求http图片链接 被自动变成https请求
    票据传递攻击
    《围城》笔记
    Vue2 零基础入门 Vue2 零基础入门第五天 5.3 自定义指令
    P1441 砝码称重 (状压
    Java手写最长公共子序列算法算法和最长公共子序列算法应用拓展案例
    测试人生 | 阿里实习 90 天:从实习生的视角谈谈个人成长
    c++ 区分接口继承和实现继承
  • 原文地址:https://blog.csdn.net/Dc253061007/article/details/133673680