

## 必须要配置的参数
bootstrap.servers : 连接kafaf集群地址清单,建议多配节点地址,某个节点宕机可以连接其他地址。
key.serializer 和 value.serializer :往kafka发送消息之前必须要序列化,kafka通过key计算出消息应该到哪一个分区。value 消息体内容。
client.id : kafka 对应生产者id,不设置会生成一个非空的字符串
## producer,config:简Key配置,方便开发不用去记Key到底是什么
## kafkaProducer 是线程安全的



Kafka生产者代码:
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- public class NormalProducer {
-
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "normal-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // kafka 消息的重试机制: RETRIES_CONFIG该参数默认是0:
- properties.put(ProducerConfig.RETRIES_CONFIG, 3);
-
- // 可重试异常, 意思是执行指定的重试次数 如果到达重试次数上限还没有发送成功, 也会抛出异常信息
- // NetworkException
- // LeaderNotAvailableException
-
- // 不可重试异常
- // RecordTooLargeException
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
- User user = new User("001", "xiao xiao");
- // Const.TOPIC_NORMAL 是一个新的主题,kafka默认是可以在没有主题的情况下创建的
- // 自动创建主题的特性,在生产环境中一定是禁用的
- ProducerRecord<String, String> producerRecord =
- new ProducerRecord<String, String>(Const.TOPIC_NORMAL, JSON.toJSONString(user));
- /**
- * //一条消息 必须通过key 去计算出来实际的partition, 按照partition去存储的
- * ProducerRecord(
- * topic=topic_normal,
- * partition=null,
- * headers=RecordHeaders(headers = [], isReadOnly = false),
- * key=null,
- * value={"id":"001","name":"xiao xiao"},
- * timestamp=null)
- */
- System.err.println("新创建消息:" + producerRecord);
-
-
- // 一个参数的send方法 本质上也是异步的 返回的是一个future对象; 可以实现同步阻塞方式
- /**
- Future<RecordMetadata> future = producer.send(producerRecord);
- RecordMetadata recordMetadata = future.get();
- System.err.println(String.format("分区:%s, 偏移量: %s, 时间戳: %s",
- recordMetadata.partition(),
- recordMetadata.offset(),
- recordMetadata.timestamp()));
- */
-
- // 带有两个参数的send方法 是完全异步化的。在回调Callback方法中得到发送消息的结果
- producer.send(producerRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
- if(null != exception) {
- exception.printStackTrace();
- return;
- }
- System.err.println(String.format("分区:%s, 偏移量: %s, 时间戳: %s",
- recordMetadata.partition(),
- recordMetadata.offset(),
- recordMetadata.timestamp()));
- }
- });
-
- producer.close();
-
- }
-
- }
Kafka消费者代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class NormalConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "normal-group");
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Collections.singletonList(Const.TOPIC_NORMAL));
- System.err.println("normal consumer started...");
-
- try {
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic,topicPartition.partition(), size));
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- String value = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
Kafka生产者重要参数详解
acks:指定发送消息后,broker段至少有多少副本接收到消息:默认为acks=1;
acks=0:生产者发送消息之后,不要等待任务服务端的响应。
acks=-1 acks=all:生产者在消息发送之后,需要等待isr中的所有副本都成功写入消息之后,才能够收到来自服务端的成功响应。
并不是asks=-1,all就一定会被投递成功,因为有的队列在osr中
想要100%投递成立,还要
-min.insync.replicas=2,容易影响性能。
允许消息多发。
max.request.size:该参数用来限制生产者客户端能发送的消息的最大值,默认为1M。
retries 重试次数,默认为0。
retrybackoff.msretries:从重试时间间隔 默认100毫秒
compression.type:指定压缩方式,默认为“none”,可选gzip,snappy,lz4
connections.max.idle.ms:这个参数用来指定链接空闲多久之后关闭,默认540000ms,即9分钟
---------------------------------------------------------------------------------------------------------
下面是批量发送部分:
linger.ms:指定生产者发送ProducerBatch之前等待更多的消息加入producerBatch的时间,默认值为0,就像是等人上车的时间。
batch.size:累计多少条消息,则一次进行批量发送,就是满多少人即发车的意思。
buffer.memory:提升缓存性能参数,默认32M。
---------------------------------------------------------------------------------------------------------
receive.buffer.bytes:设置socket接收消息缓冲区 默认32KB。
send.buffer.bytes:设置socket发送消息缓冲区 默认128KB。
---------------------------------------------------------------------------------------------------------
request.timeout.ms:配置producer等待请求响应的最长时间,默认30000ms。
Kafka生产者拦截器

生产者拦截器代码:
- public interface Const {
-
- String TOPIC_QUICKSTART = "topic-quickstart";
-
- String TOPIC_NORMAL = "topic-normal";
-
- String TOPIC_INTERCEPTOR = "topic-interceptor";
-
- String TOPIC_SERIAL = "topic-serial";
-
- String TOPIC_PARTITION = "topic-partition";
-
- String TOPIC_MODULE = "topic-module";
-
- String TOPIC_CORE = "topic-core";
-
- String TOPIC_REBALANCE = "topic-rebalance";
-
- String TOPIC_MT1 = "topic-mt1";
-
- String TOPIC_MT2 = "topic-mt2";
-
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class InterceptorProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "interceptor-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // 添加生产者拦截器属性, 生产者拦截器可以配置多个
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
- for(int i = 0; i <10; i ++) {
- User user = new User("00" + i, "张三");
- ProducerRecord<String, String> record =
- new ProducerRecord<String, String>(Const.TOPIC_INTERCEPTOR,
- JSON.toJSONString(user));
- producer.send(record);
- }
- producer.close();
- }
- }
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- import java.util.Map;
-
- public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
-
- private volatile long success = 0;
-
- private volatile long failure = 0;
-
- @Override
- public void configure(Map<String, ?> configs) {
- }
-
- // 发送消息之前的切面拦截
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- System.err.println("----------- 生产者发送消息前置拦截器 ----------");
- String modifyValue = "prefix-" + record.value();
- return new ProducerRecord<String, String>(record.topic(),
- record.partition(),
- record.timestamp(),
- record.key(),
- modifyValue,
- record.headers());
- }
-
- // 发送消息之后的切面拦截
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- System.err.println("----------- 生产者发送消息后置拦截器 ----------");
- if(null == exception) {
- success ++;
- } else {
- failure ++;
- }
- }
-
- @Override
- public void close() {
- double successRatio = (double)success/(success + failure);
- System.err.println(String.format("生产者关闭,发送消息的成功率为:%s %%", successRatio * 100));
- }
-
- }
消费者拦截器代码:
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class InterceptorConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "interceptor-group");
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
- // 添加消费端拦截器属性
- properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- try (consumer) {
- consumer.subscribe(Collections.singletonList(Const.TOPIC_INTERCEPTOR));
- System.err.println("interceptor consumer started...");
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
- for (int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- String value = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffset = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffset));
- }
- }
- }
- }
- }
- }
- import org.apache.kafka.clients.consumer.ConsumerInterceptor;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.OffsetAndMetadata;
- import org.apache.kafka.common.TopicPartition;
-
- import java.util.Map;
-
- public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
-
- @Override
- public void configure(Map<String, ?> configs) {
- // TODO Auto-generated method stub
-
- }
-
- // onConsume:消费者接到消息处理之前的拦截器
- @Override
- public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
- System.err.println("------ 消费者前置处理器,接收消息 --------");
- return records;
- }
-
- @Override
- public void onCommit(Map
offsets ) { - offsets.forEach((tp, offset) -> {
- System.err.println("消费者处理完成," + "分区:" + tp + ", 偏移量:" + offset);
- });
- }
-
- @Override
- public void close() {
- }
-
- }
Kafka之序列化和反序列化:

生产者序列化代码:
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class User {
-
- private String id;
-
- private String name;
-
- }
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.common.serialization.Serializer;
-
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.util.Map;
-
- public class UserSerializer implements Serializer<User> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- }
-
- @Override
- public byte[] serialize(String topic, User user) {
- if(null == user) {
- return null;
- }
- byte[] idBytes, nameBytes;
- try {
- String id = user.getId();
- String name = user.getName();
- if(id != null) {
- idBytes = id.getBytes("UTF-8");
- } else {
- idBytes = new byte[0];
- }
- if(name != null) {
- nameBytes = name.getBytes("UTF-8");
- } else {
- nameBytes = new byte[0];
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + idBytes.length + nameBytes.length);
- // 4个字节 也就是一个 int类型 : putInt 盛放 idBytes的实际真实长度
- buffer.putInt(idBytes.length);
- // put bytes[] 实际盛放的是idBytes真实的字节数组,也就是内容
- buffer.put(idBytes);
- buffer.putInt(nameBytes.length);
- buffer.put(nameBytes);
- return buffer.array();
-
- } catch (UnsupportedEncodingException e) {
- // handle exption...
- }
- return new byte[0];
- }
-
- @Override
- public void close() {
- }
-
- }
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class SerializerProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.221:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "serial-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 添加序列化 value
- // properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
-
- KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
- for(int i = 0; i <10; i ++) {
- User user = new User("00" + i, "张三");
- ProducerRecord<String, User> record =
- new ProducerRecord<String, User>(Const.TOPIC_SERIAL, user);
- producer.send(record);
- }
- producer.close();
- }
- }
消费者反序列化代码:
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.common.errors.SerializationException;
- import org.apache.kafka.common.serialization.Deserializer;
-
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.util.Map;
-
- public class UserDeserializer implements Deserializer
{ -
- @Override
- public void configure(Map
configs, boolean isKey) { -
- }
-
- @Override
- public User deserialize(String topic, byte[] data) {
- if(data == null) {
- return null;
- }
- if(data.length < 8) {
- throw new SerializationException("size is wrong, must be data.length >= 8");
- }
- ByteBuffer buffer = ByteBuffer.wrap(data);
- // idBytes 字节数组的真实长度
- int idLen = buffer.getInt();
- byte[] idBytes = new byte[idLen];
- buffer.get(idBytes);
-
- // nameBytes 字节数组的真实长度
- int nameLen = buffer.getInt();
- byte[] nameBytes = new byte[nameLen];
- buffer.get(nameBytes);
- String id ,name;
- try {
- id = new String(idBytes, "UTF-8");
- name = new String(nameBytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new SerializationException("deserializing error! ", e);
- }
- return new User(id, name);
- }
-
- @Override
- public void close() {
-
- }
-
- }
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class DeserializerConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // 反序列化属性参数配置
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "serial-group");
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
- KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Collections.singletonList(Const.TOPIC_SERIAL));
- System.err.println("serial consumer started...");
- try {
- while(true) {
- ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, User>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, User> consumerRecord = partitionRecords.get(i);
- User user = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", user.getName(), offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
Kafka之分区器

通过供应商的ID做一个路由,去决定消息应该放到哪个分区中。
如果把所有数据随机的放到某个partation中,那么就会造成数据混乱,因为消息队列是顺序消费的(topic中的数据是先进先出)就会造成部分消费端所对应的业务数据被延迟或者阻塞消费。最好的方法就是根据业务进行分区,不同的业务数据单独放到对应的partation中,一个业务数据对应一个partation,通过业务数据进行分区。

分区器代码:
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import org.apache.kafka.common.PartitionInfo;
- import org.apache.kafka.common.utils.Utils;
-
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicInteger;
-
- public class CustomPartitioner implements Partitioner {
-
- private AtomicInteger counter = new AtomicInteger(0);
-
- @Override
- public void configure(Map
configs) { - }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes,
- Object value, byte[] valueBytes, Cluster cluster) {
- List
partitionList = cluster.partitionsForTopic(topic); - int numOfPartition = partitionList.size();
- System.err.println("---- 进入自定义分区器,当前分区个数:" + numOfPartition);
- if(null == keyBytes) {
- return counter.getAndIncrement() % numOfPartition;
- } else {
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numOfPartition;
- }
- }
-
- @Override
- public void close() {
- }
-
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class PartitionProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "partition-producer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 添加配置属性,自定义分区器
- properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
- for(int i = 0; i <10; i ++) {
- User user = new User("00" + i, "张三");
- ProducerRecord<String, String> record =
- new ProducerRecord<String, String>(Const.TOPIC_PARTITION,
- JSON.toJSONString(user));
- producer.send(record);
- }
- producer.close();
- }
- }
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class PartitionConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "partition-group");
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Collections.singletonList(Const.TOPIC_PARTITION));
- System.err.println("partition consumer started...");
- try {
- while(true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for(TopicPartition topicPartition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- String topic = topicPartition.topic();
- int size = partitionRecords.size();
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- String value = consumerRecord.value();
- long offset = consumerRecord.offset();
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
- public interface Const {
-
- String TOPIC_QUICKSTART = "topic-quickstart";
-
- String TOPIC_NORMAL = "topic-normal";
-
- String TOPIC_INTERCEPTOR = "topic-interceptor";
-
- String TOPIC_SERIAL = "topic-serial";
-
- String TOPIC_PARTITION = "topic-partition";
-
- String TOPIC_MODULE = "topic-module";
-
- String TOPIC_CORE = "topic-core";
-
- String TOPIC_REBALANCE = "topic-rebalance";
-
- String TOPIC_MT1 = "topic-mt1";
-
- String TOPIC_MT2 = "topic-mt2";
-
- }