🎉🎉欢迎光临🎉🎉
🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀
🌟特别推荐给大家我的最新专栏《Spring 狂野之旅:从入门到入魔》 🚀
本专栏带你从Spring入门到入魔!
这是苏泽的个人主页可以看到我其他的内容哦👇👇
当我们谈论 Spring Kafka 时,可以把它想象成一位非常出色的邮递员,但不是运送普通的信件,而是处理大量的有趣和有用的数据。这位邮递员擅长与 Kafka 进行互动,并且以一种高级抽象和易用的方式处理数据。
这位邮递员的任务是将数据从一个地方传送到另一个地方,就像我们寄送包裹一样。他知道如何与 Kafka 进行通信,了解如何与输入和输出主题建立联系。
当有人将数据放入输入主题时,这位邮递员会立即接收到通知,并迅速将数据取出。然后,他会对这些数据进行各种有趣的转换和处理操作,就像是一个巧手的魔术师一样。他可以将数据转换成不同的格式、进行聚合、过滤、连接和分流等操作。
一旦数据处理完毕,这位邮递员会将数据装入一个特殊的包裹,并标上目的地的地址,这个目的地就是输出主题。然后,他会快速地把包裹发送出去,确保数据能够按时到达。
Spring Kafka 就像是这位邮递员的工具箱,提供了许多有用的工具和功能,使他的工作更加轻松。它提供了简单且声明性的 API,让我们可以用一种直观的方式定义数据的处理逻辑和流处理拓扑。
那么正文开始
目录
实现有效的消费者组管理:以下是一些实现有效消费者组管理的关键考虑因素:
首先,在 pom.xml 文件中添加以下 Maven 依赖:
Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。
实时数据流处理对于现代业务来说非常重要。随着互联网的快速发展和数字化转型的加速,企业面临着大量的数据产生和处理的挑战。实时数据流处理能够帮助企业实时地捕获、处理和分析数据,从而使企业能够做出及时的决策、提供个性化的服务和优化业务流程。实时数据流处理还可以帮助企业发现潜在的机会和风险,并迅速采取行动。
在开始学习 Spring Kafka 之前,了解 Apache Kafka 的核心概念和组件是非常重要的。一些核心概念包括:
介绍 Spring Kafka 的基本用法和集成方式:
Spring Kafka 提供了简单而强大的 API,用于在 Spring 应用程序中使用 Kafka。它提供了以下核心功能:
KafkaTemplate
类可以方便地将消息发布到 Kafka 主题。@KafkaListener
注解,可以轻松地创建消息消费者,并处理来自 Kafka 主题的消息。在 Spring Kafka 中发布消息到 Kafka 主题,你可以使用 KafkaTemplate
类的 send()
方法。通过指定要发送的主题和消息内容,可以将消息发送到 Kafka。
要消费 Kafka 主题中的消息,你可以使用 @KafkaListener
注解来创建一个消息监听器。通过指定要监听的主题和消息处理方法,可以在接收到消息时触发相应的逻辑。
- @Autowired
- private KafkaTemplate
kafkaTemplate; -
- public void publishMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- }
要消费 Kafka 主题中的消息,你可以使用 @KafkaListener
注解来创建一个消息监听器。通过指定要监听的主题和消息处理方法,可以在接收到消息时触发相应的逻辑。
- @KafkaListener(topics = "myTopic")
- public void consumeMessage(String message) {
- // 处理接收到的消息
- System.out.println("Received message: " + message);
- }
理解消息的序列化和反序列化:
在 Kafka 中,消息的序列化和反序列化是非常重要的概念。当消息被发送到 Kafka 时,它们需要被序列化为字节流。同样地,在消息被消费时,它们需要被反序列化为原始的数据格式。
Spring Kafka 提供了默认的序列化和反序列化机制,可以根据消息的类型自动进行转换。对于常见的数据类型,如字符串、JSON、字节数组等,Spring Kafka 已经提供了相应的序列化和反序列化实现。此外,你也可以自定义序列化和反序列化器来处理特定的消息格式。
例如,你可以使用 StringSerializer 和 StringDeserializer 来序列化和反序列化字符串消息:
- @Configuration
- public class KafkaConfig {
-
- @Bean
- public ProducerFactory
producerFactory() { - Map
config = new HashMap<>(); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(config);
- }
-
- @Bean
- public ConsumerFactory
consumerFactory() { - Map
config = new HashMap<>(); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return new DefaultKafkaConsumerFactory<>(config);
- }
-
- @Bean
- public KafkaTemplate
kafkaTemplate() { - return new KafkaTemplate<>(producerFactory());
- }
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory());
- return factory;
- }
- }
消费者组是一组具有相同消费者组ID的消费者,它们共同消费一个或多个 Kafka 主题的消息。消费者组的作用是实现消息的并行处理和负载均衡。通过将主题的分区分配给消费者组中的不同消费者,可以实现消息的并行处理,提高处理吞吐量和降低延迟。消费者组还提供了容错性,当某个消费者出现故障时,其他消费者可以接管其分区并继续处理消息。
消费者组ID的选择:为每个消费者组选择一个唯一的ID,确保不同的消费者组之间互不干扰。
分区分配策略:选择适当的分区分配策略,确保分配给消费者的分区负载均衡,并避免某些消费者负载过重或空闲。
动态扩缩容:根据负载情况和处理需求,动态地增加或减少消费者的数量,以实现弹性的消费者组管理。
监控和健康检查:监控消费者组的运行状态,及时发现并处理故障消费者,确保消费者组的稳定运行。
假设有一个在线电商平台,用户可以在平台上购买商品。平台需要处理用户的订单,并将订单信息发送到一个 Kafka 主题中。订单处理包括验证订单、生成发货单、更新库存等操作。
在这个场景中,可以使用消费者组来实现订单处理的并行处理和负载均衡。具体步骤如下:
创建一个名为"order"的 Kafka 主题,用于接收用户的订单信息。
创建一个消费者组,比如名为"order-processing-group"的消费者组。
启动多个消费者实例,加入到"order-processing-group"消费者组中。每个消费者实例都会订阅"order"主题,并独立地消费订单消息。
Kafka 会根据消费者组的配置,将"order"主题的分区均匀地分配给消费者组中的消费者实例。每个消费者实例将独立地处理分配给它的分区上的订单消息。
当有新的订单消息到达"order"主题时,Kafka 会将消息分配给消费者组中的一个消费者实例。消费者实例会处理订单消息,执行验证、生成发货单、更新库存等操作。
具体实现:
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- public class OrderConsumer {
- private static final String TOPIC = "order";
- private static final String GROUP_ID = "order-processing-group";
- private static final String BOOTSTRAP_SERVERS = "localhost:9092";
-
- public static void main(String[] args) {
- // 创建消费者配置
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // 创建 Kafka 消费者
- KafkaConsumer
consumer = new KafkaConsumer<>(props); -
- // 订阅主题
- consumer.subscribe(Collections.singletonList(TOPIC));
-
- // 消费消息
- while (true) {
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord
record : records) { - String orderMessage = record.value();
- // 执行订单处理操作,例如验证订单、生成发货单、更新库存等
- processOrder(orderMessage);
- }
- }
- }
-
- private static void processOrder(String orderMessage) {
- // 实现订单处理逻辑
- System.out.println("Processing order: " + orderMessage);
- // TODO: 执行订单处理的具体业务逻辑
- }
- }
- // 创建拓扑建造器
- StreamsBuilder builder = new StreamsBuilder();
-
- // 创建输入流
- KStream
inputStream = builder.stream("input-topic"); -
- // 进行数据转换和处理操作
- KStream
outputStream = inputStream - .mapValues(value -> value.toUpperCase())
- .filter((key, value) -> value.startsWith("A"));
-
- // 将处理结果输出到输出主题
- outputStream.to("output-topic");
-
- // 创建 Kafka Streams 实例
- KafkaStreams streams = new KafkaStreams(builder.build(), props);
-
-
-
org.springframework.kafka -
spring-kafka -
2.8.1 -
-
-
org.springframework.kafka -
spring-kafka-test -
2.8.1 -
test -
-
- import org.apache.kafka.clients.admin.NewTopic;
- import org.apache.kafka.common.serialization.Serdes;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.KafkaHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.messaging.handler.annotation.Payload;
-
- @SpringBootApplication
- @EnableKafka
- public class SpringKafkaApp {
-
- public static void main(String[] args) {
- SpringApplication.run(SpringKafkaApp.class, args);
- }
-
- // 创建输入和输出主题
- @Bean
- public NewTopic inputTopic() {
- return new NewTopic("input-topic", 1, (short) 1);
- }
-
- @Bean
- public NewTopic outputTopic() {
- return new NewTopic("output-topic", 1, (short) 1);
- }
-
- // 定义流处理拓扑
- @KafkaListener(topics = "input-topic")
- public void processInputMessage(@Payload String message,
- @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
- // 在这里进行数据转换和处理操作
- String processedMessage = message.toUpperCase();
-
- // 发送处理结果到输出主题
- kafkaTemplate().send("output-topic", processedMessage);
- }
-
- // 创建 KafkaTemplate 实例
- @Bean
- public KafkaTemplate
kafkaTemplate() { - return new KafkaTemplate<>(producerFactory());
- }
-
- // 创建 ProducerFactory 实例
- @Bean
- public ProducerFactory
producerFactory() { - Map
configProps = new HashMap<>(); - configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(configProps);
- }
- }
通过
@EnableKafka
注解启用 Spring Kafka。通过
@Bean
注解创建了输入主题和输出主题的NewTopic
实例。使用
@KafkaListener
注解的方法作为消息监听器,监听名为 "input-topic" 的输入主题。在
processInputMessage
方法中,我们可以进行数据转换和处理操作。在这个示例中,我们将收到的消息转换为大写。然后,我们使用
KafkaTemplate
将处理结果发送到名为 "output-topic" 的输出主题。通过
@Bean
注解创建了KafkaTemplate
和ProducerFactory
的实例,用于发送消息到 Kafka。