本项目是微服务集成环境下使用。Boot版本2.5.2,Cloud版本2020.0.3,Alibaba版本2021.1.
本期涉及 Kafka集成在某个微服务中,不在父类Pom引用做公共处理(父POM有集成内部的,本次搭建一般公开模式)。
SpringCloud, SpringBoot 项目引用版本:
2.5.2 2020.0.3 2021.1
Kafka 版本
org.springframework.kafka spring-kafka 2.7.3 org.apache.avro avro 1.11.0
如下:(kafka节点在spring下)
配置可以自动加载,也可以通过后面的配置类Bean覆盖自定义其他参数。
###########【Kafka集群】########### # 如果连接的为kafka集群,这里配置多个地址,并以“,”隔开 kafka: ###########【初始化生产者配置】########### producer: bootstrap-servers: 192.168.4.113:30092 retries: 3 # 重试次数 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) # 0表示不进行消息接收是否成功的确认 1表示当Leader接收成功时确认 all -1表示Leader和Follower都接收成功时确认 acks: 0 # 批量大小 batch-size: 16384 # 提交延时 properties: linger: ms: 0 # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: 33554432 # 生产端缓冲区大小 key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer ############ 消费者 ####################### consumer: bootstrap-servers: 192.168.4.113:30092 group-id: xihai-test enable-auto-commit: false key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要 properties: deserializer: key: delegate: class: org.apache.kafka.common.serialization.StringDeserializer value: delegate: ## io.confluent.kafka.serializers.KafkaAvroDeserializer class: io.confluent.kafka.serializers.KafkaAvroDeserializer listener: # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数 concurrency: 4 # 自动提交关闭,需要设置手动消息确认 ack-mode: manual_immediate # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误 missing-topics-fatal: false # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance poll-timeout: 600000
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProviderConfig { /** * XX基地 192.168.xxx.194 : 9092 * XX K8s 192.168.xxx.113 : 30092 */ final static String KAFKA_HOST = "192.168.4.xxx"; final static String KAFKA_PORT = "30092"; final static String KAFKA_SERVER = KAFKA_HOST + ":" + KAFKA_PORT; static MapinitProducerOneConfig() { //生产者配置 Map props = new HashMap<>(); //连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); //重试,0为不启用重试机制 props.put(ProducerConfig.RETRIES_CONFIG, 0); //控制批处理大小,单位为字节 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //提交延时 props.put(ProducerConfig.LINGER_MS_CONFIG, 0); //生产端缓冲区 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //生产者消息的键的序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //生产者消息的值的序列化器 JsonSerializer props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean("producerOneFactory") public ProducerFactory producerOneFactory() { return new DefaultKafkaProducerFactory (initProducerOneConfig()); } @Bean(name = "ProducerOne") public KafkaTemplate getKafkaTemplateOne() { return new KafkaTemplate (producerOneFactory()); } }
采用其中的一种发送回调模式,获取发送消息的回调信息。
@Resource private KafkaTemplatekafkaTemplate;
@PostMapping("/send-obj") public void sendMessageObj(@RequestBody HeiHeMessageDto dto) { JSONObject jsonObject = new JSONObject(); jsonObject.put("id","id-00001" ); jsonObject.put("applyId","applyId0001" ); jsonObject.put("sfzh","610428198912125191" ); jsonObject.put("spDeptId", 5839); HeiHeMessage heiHeMessage = new HeiHeMessage(); // 发-XXX heiHeMessage.setUserIds(Arrays.asList("1628xxx8844xxx").toString()); heiHeMessage.setMessage(dto.getMessage()); heiHeMessage.setMessageParentType(dto.getMessageParentType()); heiHeMessage.setMessageSonType(dto.getMessageSonType()); heiHeMessage.setSupplyData(jsonObject.toJSONString()); heiHeMessage.setSendTime( DateUtils.getDateTime() ); heiHeMessageService.save(heiHeMessage); dto.setCreatedAt( DateUtils.getDateTime() ); dto.setMessageParentType(ConstantKey.ShenPiZhongXin); dto.setMessageSonType(ConstantKey.xinXiBianGeng); dto.setMessage(ConstantKey.MSG_UPDATE_APPLY); kafkaTemplate.send(KafkaTopicConstant.TOPIC, dto ) // 回调 .addCallback(new ListenableFutureCallback>() { @Override public void onFailure(@NonNull Throwable throwable) { log.error("失败!!!主题[{}]发送消息[{}]失败", "xa1h2p_octupus", dto, throwable); } @Override public void onSuccess(SendResult result) { log.info("成功! 主题[{}]发送消息[{}]成功,发送结果:{}", KafkaTopicConstant.HEIHE_TOPIC, dto, result); } }); log.info("KafkaProducerController-sendMessageObj-END"); }
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.*; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; //@Configuration @Slf4j public class KafkaMyConsumerConfig { /** * 哈XX地 192.168.104.xxx : 9092 * 西XX K8s 192.168.4.xxx : 30092 */ final static String KAFKA_HOST = "192.168.4.xxx"; final static String KAFKA_PORT = "30092"; final static String KAFKA_SERVER = KAFKA_HOST + ":" + KAFKA_PORT; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; public MapmyConsumerConfigs() { Map propsMap = new HashMap<>(16); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); //自动提交的时间间隔,自动提交开启时生效 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录) //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类) propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); // StringDeserializer JsonDeserializer propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class ); propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class ); // propsMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey"); // propsMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue") propsMap.put(JsonDeserializer.TRUSTED_PACKAGES, "*");// com.xahengpin.qzx.rygl.api.model propsMap.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false"); return propsMap; } @Bean("myConsumerFactory") public ConsumerFactory
消费有异常,无法正常确认消费。消息进入死信队列。
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @Component public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { @Override @NonNull public Object handleError(@NonNull Message> message, @NonNull ListenerExecutionFailedException exception) { return new Object(); } @Resource private KafkaTemplatekafkaTemplate; @Override @NonNull public Object handleError(@NonNull Message> message, @NonNull ListenerExecutionFailedException exception, Consumer, ?> consumer) { System.out.println("消息详情:" + message); System.out.println("异常信息::" + exception); System.out.println("消费者详情::" + consumer.groupMetadata()); System.out.println("监听主题::" + consumer.listTopics()); //return KafkaListenerErrorHandler.super.handleError(message, exception, consumer); //获取消息处理异常主题 MessageHeaders headers = message.getHeaders(); Map offsetsToReset = new HashMap<>(); String topic = "test"+ ".DLT"; //放入死信队列 kafkaTemplate.send(topic,message.getPayload()); return message; } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = {"xa1hp2_oct1upus", "test"}, errorHandler = "myKafkaListenerErrorHandler", containerFactory = "kafkaListenerContainerFactory") public void consumerTestTopic(ConsumerRecord