一、配置文件
- xxxxxx:
- kafka:
- bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
- producer:
- # 设置大于0的值,则客户端会将发送失败的记录重新发送
- retries: 3
- #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
- batch-size: 16384
- linger: 1
- # 设置生产者内存缓冲区的大小。#32M
- buffer-memory: 33554432
- # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
- # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
- # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
- acks: 1
- # 指定消息key和消息体的编解码方式 值的序列化方式
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- poll-timeout: 3000
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- auto-commit: false
- offset-reset: earliest
- records: 10
- session-timeout: 150000
- poll-interval: 360000
- request-timeout: 60000
二、KafkaConfig
- package com.xxxxxx.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.*;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Slf4j
- @Configuration
- @EnableKafka
- public class KafkaConfig {
-
- @Value("${xxxxxx.kafka.bootstrap-servers}")
- private String servers;
- @Value("${xxxxxx.kafka.producer.retries}")
- private int retries;
- @Value("${xxxxxx.kafka.producer.batch-size}")
- private int batchSize;
- @Value("${xxxxxx-afka.producer.linger}")
- private int linger;
- @Value("${xxxxxx.kafka.producer.buffer-memory}")
- private int bufferMemory;
- @Value("${xxxxxx.kafka.producer.acks}")
- private String acks;
- @Value("${xxxxxx.kafka.producer.key-serializer}")
- private String keyDeserializer;
- @Value("${xxxxxx.kafka.producer.value-serializer}")
- private String valueDeserializer;
-
- // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
- public Map
producerConfigs() { - Map
props = new HashMap<>(); -
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- //设置重试次数
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- //达到batchSize大小的时候会发送消息
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
- //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
- props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
- //缓冲区的值
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
- //序列化手段
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
-
- props.put(ProducerConfig.ACKS_CONFIG, acks);
-
- return props;
- }
-
- public ProducerFactory
producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- @Bean(name = "xxxxxxKafkaTemplate")
- public KafkaTemplate
kafkaTemplate() { - return new KafkaTemplate
(producerFactory()); - }
-
- }
三、生产者
-
- @Resource(name = "xxxxxxKafkaTemplate")
- private KafkaTemplate kafkaTemplate;
kafkaTemplate.send(topic, message);