• 2、Kafka进阶提升-生产者


     

      ## 必须要配置的参数

    1.  bootstrap.servers : 连接kafaf集群地址清单,建议多配节点地址,某个节点宕机可以连接其他地址。

    2. key.serializer 和 value.serializer :往kafka发送消息之前必须要序列化,kafka通过key计算出消息应该到哪一个分区。value 消息体内容。

    3. client.id : kafka 对应生产者id,不设置会生成一个非空的字符串

     ## producer,config:简Key配置,方便开发不用去记Key到底是什么

     ## kafkaProducer 是线程安全的

     

     

    Kafka生产者代码:

    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.*;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.util.Properties;
    7. import java.util.concurrent.ExecutionException;
    8. public class NormalProducer {
    9. public static void main(String[] args) throws InterruptedException, ExecutionException {
    10. Properties properties = new Properties();
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    12. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "normal-producer");
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. // kafka 消息的重试机制: RETRIES_CONFIG该参数默认是0
    16. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
    17. // 可重试异常, 意思是执行指定的重试次数 如果到达重试次数上限还没有发送成功, 也会抛出异常信息
    18. // NetworkException
    19. // LeaderNotAvailableException
    20. // 不可重试异常
    21. // RecordTooLargeException
    22. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    23. User user = new User("001", "xiao xiao");
    24. // Const.TOPIC_NORMAL 是一个新的主题,kafka默认是可以在没有主题的情况下创建的
    25. // 自动创建主题的特性,在生产环境中一定是禁用的
    26. ProducerRecord<String, String> producerRecord =
    27. new ProducerRecord<String, String>(Const.TOPIC_NORMAL, JSON.toJSONString(user));
    28. /**
    29. * //一条消息 必须通过key 去计算出来实际的partition, 按照partition去存储的
    30. * ProducerRecord(
    31. * topic=topic_normal,
    32. * partition=null,
    33. * headers=RecordHeaders(headers = [], isReadOnly = false),
    34. * key=null,
    35. * value={"id":"001","name":"xiao xiao"},
    36. * timestamp=null)
    37. */
    38. System.err.println("新创建消息:" + producerRecord);
    39. // 一个参数的send方法 本质上也是异步的 返回的是一个future对象; 可以实现同步阻塞方式
    40. /**
    41. Future<RecordMetadata> future = producer.send(producerRecord);
    42. RecordMetadata recordMetadata = future.get();
    43. System.err.println(String.format("分区:%s, 偏移量: %s, 时间戳: %s",
    44. recordMetadata.partition(),
    45. recordMetadata.offset(),
    46. recordMetadata.timestamp()));
    47. */
    48. // 带有两个参数的send方法 是完全异步化的。在回调Callback方法中得到发送消息的结果
    49. producer.send(producerRecord, new Callback() {
    50. @Override
    51. public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
    52. if(null != exception) {
    53. exception.printStackTrace();
    54. return;
    55. }
    56. System.err.println(String.format("分区:%s, 偏移量: %s, 时间戳: %s",
    57. recordMetadata.partition(),
    58. recordMetadata.offset(),
    59. recordMetadata.timestamp()));
    60. }
    61. });
    62. producer.close();
    63. }
    64. }

    Kafka消费者代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.time.Duration;
    9. import java.util.Collections;
    10. import java.util.List;
    11. import java.util.Properties;
    12. public class NormalConsumer {
    13. public static void main(String[] args) {
    14. Properties properties = new Properties();
    15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    18. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "normal-group");
    19. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    20. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    21. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    22. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    23. consumer.subscribe(Collections.singletonList(Const.TOPIC_NORMAL));
    24. System.err.println("normal consumer started...");
    25. try {
    26. while(true) {
    27. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    28. for(TopicPartition topicPartition : records.partitions()) {
    29. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
    30. String topic = topicPartition.topic();
    31. int size = partitionRecords.size();
    32. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic,topicPartition.partition(), size));
    33. for(int i = 0; i < size; i++) {
    34. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
    35. String value = consumerRecord.value();
    36. long offset = consumerRecord.offset();
    37. long commitOffser = offset + 1;
    38. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
    39. }
    40. }
    41. }
    42. } finally {
    43. consumer.close();
    44. }
    45. }
    46. }

    Kafka生产者重要参数详解

    acks:指定发送消息后,broker段至少有多少副本接收到消息:默认为acks=1;

    acks=0:生产者发送消息之后,不要等待任务服务端的响应。

    acks=-1 acks=all:生产者在消息发送之后,需要等待isr中的所有副本都成功写入消息之后,才能够收到来自服务端的成功响应。

    并不是asks=-1,all就一定会被投递成功,因为有的队列在osr中

    想要100%投递成立,还要

    -min.insync.replicas=2,容易影响性能。 

    允许消息多发。

    max.request.size:该参数用来限制生产者客户端能发送的消息的最大值,默认为1M。

    retries 重试次数,默认为0。

    retrybackoff.msretries:从重试时间间隔   默认100毫秒

    compression.type:指定压缩方式,默认为“none”,可选gzip,snappy,lz4

    connections.max.idle.ms:这个参数用来指定链接空闲多久之后关闭,默认540000ms,即9分钟

    ---------------------------------------------------------------------------------------------------------

    下面是批量发送部分:

    linger.ms:指定生产者发送ProducerBatch之前等待更多的消息加入producerBatch的时间,默认值为0,就像是等人上车的时间。

    batch.size:累计多少条消息,则一次进行批量发送,就是满多少人即发车的意思。

    buffer.memory:提升缓存性能参数,默认32M。

    ---------------------------------------------------------------------------------------------------------

    receive.buffer.bytes:设置socket接收消息缓冲区 默认32KB。

    send.buffer.bytes:设置socket发送消息缓冲区 默认128KB。

    ---------------------------------------------------------------------------------------------------------

    request.timeout.ms:配置producer等待请求响应的最长时间,默认30000ms。

    Kafka生产者拦截器

     生产者拦截器代码:

    1. public interface Const {
    2. String TOPIC_QUICKSTART = "topic-quickstart";
    3. String TOPIC_NORMAL = "topic-normal";
    4. String TOPIC_INTERCEPTOR = "topic-interceptor";
    5. String TOPIC_SERIAL = "topic-serial";
    6. String TOPIC_PARTITION = "topic-partition";
    7. String TOPIC_MODULE = "topic-module";
    8. String TOPIC_CORE = "topic-core";
    9. String TOPIC_REBALANCE = "topic-rebalance";
    10. String TOPIC_MT1 = "topic-mt1";
    11. String TOPIC_MT2 = "topic-mt2";
    12. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import org.apache.kafka.common.serialization.StringSerializer;
    8. import java.util.Properties;
    9. public class InterceptorProducer {
    10. public static void main(String[] args) {
    11. Properties properties = new Properties();
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "interceptor-producer");
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. // 添加生产者拦截器属性, 生产者拦截器可以配置多个
    17. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());
    18. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    19. for(int i = 0; i <10; i ++) {
    20. User user = new User("00" + i, "张三");
    21. ProducerRecord<String, String> record =
    22. new ProducerRecord<String, String>(Const.TOPIC_INTERCEPTOR,
    23. JSON.toJSONString(user));
    24. producer.send(record);
    25. }
    26. producer.close();
    27. }
    28. }
    1. import org.apache.kafka.clients.producer.ProducerInterceptor;
    2. import org.apache.kafka.clients.producer.ProducerRecord;
    3. import org.apache.kafka.clients.producer.RecordMetadata;
    4. import java.util.Map;
    5. public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
    6. private volatile long success = 0;
    7. private volatile long failure = 0;
    8. @Override
    9. public void configure(Map<String, ?> configs) {
    10. }
    11. // 发送消息之前的切面拦截
    12. @Override
    13. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    14. System.err.println("----------- 生产者发送消息前置拦截器 ----------");
    15. String modifyValue = "prefix-" + record.value();
    16. return new ProducerRecord<String, String>(record.topic(),
    17. record.partition(),
    18. record.timestamp(),
    19. record.key(),
    20. modifyValue,
    21. record.headers());
    22. }
    23. // 发送消息之后的切面拦截
    24. @Override
    25. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    26. System.err.println("----------- 生产者发送消息后置拦截器 ----------");
    27. if(null == exception) {
    28. success ++;
    29. } else {
    30. failure ++;
    31. }
    32. }
    33. @Override
    34. public void close() {
    35. double successRatio = (double)success/(success + failure);
    36. System.err.println(String.format("生产者关闭,发送消息的成功率为:%s %%", successRatio * 100));
    37. }
    38. }

     消费者拦截器代码:

    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.time.Duration;
    9. import java.util.Collections;
    10. import java.util.List;
    11. import java.util.Properties;
    12. public class InterceptorConsumer {
    13. public static void main(String[] args) {
    14. Properties properties = new Properties();
    15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    18. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "interceptor-group");
    19. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    20. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    21. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    22. // 添加消费端拦截器属性
    23. properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
    24. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    25. try (consumer) {
    26. consumer.subscribe(Collections.singletonList(Const.TOPIC_INTERCEPTOR));
    27. System.err.println("interceptor consumer started...");
    28. while (true) {
    29. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    30. for (TopicPartition topicPartition : records.partitions()) {
    31. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
    32. String topic = topicPartition.topic();
    33. int size = partitionRecords.size();
    34. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
    35. for (int i = 0; i < size; i++) {
    36. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
    37. String value = consumerRecord.value();
    38. long offset = consumerRecord.offset();
    39. long commitOffset = offset + 1;
    40. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffset));
    41. }
    42. }
    43. }
    44. }
    45. }
    46. }
    1. import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    2. import org.apache.kafka.clients.consumer.ConsumerRecords;
    3. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    4. import org.apache.kafka.common.TopicPartition;
    5. import java.util.Map;
    6. public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
    7. @Override
    8. public void configure(Map<String, ?> configs) {
    9. // TODO Auto-generated method stub
    10. }
    11. // onConsume:消费者接到消息处理之前的拦截器
    12. @Override
    13. public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
    14. System.err.println("------ 消费者前置处理器,接收消息 --------");
    15. return records;
    16. }
    17. @Override
    18. public void onCommit(Map offsets) {
    19. offsets.forEach((tp, offset) -> {
    20. System.err.println("消费者处理完成," + "分区:" + tp + ", 偏移量:" + offset);
    21. });
    22. }
    23. @Override
    24. public void close() {
    25. }
    26. }

    Kafka之序列化和反序列化:

     生产者序列化代码:

    1. import lombok.AllArgsConstructor;
    2. import lombok.Data;
    3. import lombok.NoArgsConstructor;
    4. @Data
    5. @NoArgsConstructor
    6. @AllArgsConstructor
    7. public class User {
    8. private String id;
    9. private String name;
    10. }
    1. import com.lvxiaosha.kafka.api.User;
    2. import org.apache.kafka.common.serialization.Serializer;
    3. import java.io.UnsupportedEncodingException;
    4. import java.nio.ByteBuffer;
    5. import java.util.Map;
    6. public class UserSerializer implements Serializer<User> {
    7. @Override
    8. public void configure(Map<String, ?> configs, boolean isKey) {
    9. }
    10. @Override
    11. public byte[] serialize(String topic, User user) {
    12. if(null == user) {
    13. return null;
    14. }
    15. byte[] idBytes, nameBytes;
    16. try {
    17. String id = user.getId();
    18. String name = user.getName();
    19. if(id != null) {
    20. idBytes = id.getBytes("UTF-8");
    21. } else {
    22. idBytes = new byte[0];
    23. }
    24. if(name != null) {
    25. nameBytes = name.getBytes("UTF-8");
    26. } else {
    27. nameBytes = new byte[0];
    28. }
    29. ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + idBytes.length + nameBytes.length);
    30. // 4个字节 也就是一个 int类型 : putInt 盛放 idBytes的实际真实长度
    31. buffer.putInt(idBytes.length);
    32. // put bytes[] 实际盛放的是idBytes真实的字节数组,也就是内容
    33. buffer.put(idBytes);
    34. buffer.putInt(nameBytes.length);
    35. buffer.put(nameBytes);
    36. return buffer.array();
    37. } catch (UnsupportedEncodingException e) {
    38. // handle exption...
    39. }
    40. return new byte[0];
    41. }
    42. @Override
    43. public void close() {
    44. }
    45. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import com.lvxiaosha.kafka.api.User;
    3. import org.apache.kafka.clients.producer.KafkaProducer;
    4. import org.apache.kafka.clients.producer.ProducerConfig;
    5. import org.apache.kafka.clients.producer.ProducerRecord;
    6. import org.apache.kafka.common.serialization.StringSerializer;
    7. import java.util.Properties;
    8. public class SerializerProducer {
    9. public static void main(String[] args) {
    10. Properties properties = new Properties();
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.221:9092");
    12. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "serial-producer");
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    14. // 添加序列化 value
    15. // properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
    17. KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
    18. for(int i = 0; i <10; i ++) {
    19. User user = new User("00" + i, "张三");
    20. ProducerRecord<String, User> record =
    21. new ProducerRecord<String, User>(Const.TOPIC_SERIAL, user);
    22. producer.send(record);
    23. }
    24. producer.close();
    25. }
    26. }

    消费者反序列化代码:

    1. import com.lvxiaosha.kafka.api.User;
    2. import org.apache.kafka.common.errors.SerializationException;
    3. import org.apache.kafka.common.serialization.Deserializer;
    4. import java.io.UnsupportedEncodingException;
    5. import java.nio.ByteBuffer;
    6. import java.util.Map;
    7. public class UserDeserializer implements Deserializer {
    8. @Override
    9. public void configure(Map configs, boolean isKey) {
    10. }
    11. @Override
    12. public User deserialize(String topic, byte[] data) {
    13. if(data == null) {
    14. return null;
    15. }
    16. if(data.length < 8) {
    17. throw new SerializationException("size is wrong, must be data.length >= 8");
    18. }
    19. ByteBuffer buffer = ByteBuffer.wrap(data);
    20. // idBytes 字节数组的真实长度
    21. int idLen = buffer.getInt();
    22. byte[] idBytes = new byte[idLen];
    23. buffer.get(idBytes);
    24. // nameBytes 字节数组的真实长度
    25. int nameLen = buffer.getInt();
    26. byte[] nameBytes = new byte[nameLen];
    27. buffer.get(nameBytes);
    28. String id ,name;
    29. try {
    30. id = new String(idBytes, "UTF-8");
    31. name = new String(nameBytes, "UTF-8");
    32. } catch (UnsupportedEncodingException e) {
    33. throw new SerializationException("deserializing error! ", e);
    34. }
    35. return new User(id, name);
    36. }
    37. @Override
    38. public void close() {
    39. }
    40. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import com.lvxiaosha.kafka.api.User;
    3. import org.apache.kafka.clients.consumer.ConsumerConfig;
    4. import org.apache.kafka.clients.consumer.ConsumerRecord;
    5. import org.apache.kafka.clients.consumer.ConsumerRecords;
    6. import org.apache.kafka.clients.consumer.KafkaConsumer;
    7. import org.apache.kafka.common.TopicPartition;
    8. import org.apache.kafka.common.serialization.StringDeserializer;
    9. import java.time.Duration;
    10. import java.util.Collections;
    11. import java.util.List;
    12. import java.util.Properties;
    13. public class DeserializerConsumer {
    14. public static void main(String[] args) {
    15. Properties properties = new Properties();
    16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    18. // properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    19. // 反序列化属性参数配置
    20. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
    21. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "serial-group");
    22. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    23. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    24. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    25. KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties);
    26. consumer.subscribe(Collections.singletonList(Const.TOPIC_SERIAL));
    27. System.err.println("serial consumer started...");
    28. try {
    29. while(true) {
    30. ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));
    31. for(TopicPartition topicPartition : records.partitions()) {
    32. List<ConsumerRecord<String, User>> partitionRecords = records.records(topicPartition);
    33. String topic = topicPartition.topic();
    34. int size = partitionRecords.size();
    35. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
    36. for(int i = 0; i < size; i++) {
    37. ConsumerRecord<String, User> consumerRecord = partitionRecords.get(i);
    38. User user = consumerRecord.value();
    39. long offset = consumerRecord.offset();
    40. long commitOffser = offset + 1;
    41. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", user.getName(), offset, commitOffser));
    42. }
    43. }
    44. }
    45. } finally {
    46. consumer.close();
    47. }
    48. }
    49. }

    Kafka之分区器

     

     
    

    通过供应商的ID做一个路由,去决定消息应该放到哪个分区中。

    如果把所有数据随机的放到某个partation中,那么就会造成数据混乱,因为消息队列是顺序消费的(topic中的数据是先进先出)就会造成部分消费端所对应的业务数据被延迟或者阻塞消费。最好的方法就是根据业务进行分区,不同的业务数据单独放到对应的partation中,一个业务数据对应一个partation,通过业务数据进行分区。

    分区器代码:

    1. import org.apache.kafka.clients.producer.Partitioner;
    2. import org.apache.kafka.common.Cluster;
    3. import org.apache.kafka.common.PartitionInfo;
    4. import org.apache.kafka.common.utils.Utils;
    5. import java.util.List;
    6. import java.util.Map;
    7. import java.util.concurrent.atomic.AtomicInteger;
    8. public class CustomPartitioner implements Partitioner {
    9. private AtomicInteger counter = new AtomicInteger(0);
    10. @Override
    11. public void configure(Map configs) {
    12. }
    13. @Override
    14. public int partition(String topic, Object key, byte[] keyBytes,
    15. Object value, byte[] valueBytes, Cluster cluster) {
    16. List partitionList = cluster.partitionsForTopic(topic);
    17. int numOfPartition = partitionList.size();
    18. System.err.println("---- 进入自定义分区器,当前分区个数:" + numOfPartition);
    19. if(null == keyBytes) {
    20. return counter.getAndIncrement() % numOfPartition;
    21. } else {
    22. return Utils.toPositive(Utils.murmur2(keyBytes)) % numOfPartition;
    23. }
    24. }
    25. @Override
    26. public void close() {
    27. }
    28. }
    1. import com.alibaba.fastjson.JSON;
    2. import com.lvxiaosha.kafka.api.Const;
    3. import com.lvxiaosha.kafka.api.User;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import org.apache.kafka.common.serialization.StringSerializer;
    8. import java.util.Properties;
    9. public class PartitionProducer {
    10. public static void main(String[] args) {
    11. Properties properties = new Properties();
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    13. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "partition-producer");
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. // 添加配置属性,自定义分区器
    17. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    18. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    19. for(int i = 0; i <10; i ++) {
    20. User user = new User("00" + i, "张三");
    21. ProducerRecord<String, String> record =
    22. new ProducerRecord<String, String>(Const.TOPIC_PARTITION,
    23. JSON.toJSONString(user));
    24. producer.send(record);
    25. }
    26. producer.close();
    27. }
    28. }
    1. import com.lvxiaosha.kafka.api.Const;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.time.Duration;
    9. import java.util.Collections;
    10. import java.util.List;
    11. import java.util.Properties;
    12. public class PartitionConsumer {
    13. public static void main(String[] args) {
    14. Properties properties = new Properties();
    15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
    16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    18. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "partition-group");
    19. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    20. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    21. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    22. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    23. consumer.subscribe(Collections.singletonList(Const.TOPIC_PARTITION));
    24. System.err.println("partition consumer started...");
    25. try {
    26. while(true) {
    27. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    28. for(TopicPartition topicPartition : records.partitions()) {
    29. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
    30. String topic = topicPartition.topic();
    31. int size = partitionRecords.size();
    32. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));
    33. for(int i = 0; i < size; i++) {
    34. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
    35. String value = consumerRecord.value();
    36. long offset = consumerRecord.offset();
    37. long commitOffser = offset + 1;
    38. System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));
    39. }
    40. }
    41. }
    42. } finally {
    43. consumer.close();
    44. }
    45. }
    46. }
    1. public interface Const {
    2. String TOPIC_QUICKSTART = "topic-quickstart";
    3. String TOPIC_NORMAL = "topic-normal";
    4. String TOPIC_INTERCEPTOR = "topic-interceptor";
    5. String TOPIC_SERIAL = "topic-serial";
    6. String TOPIC_PARTITION = "topic-partition";
    7. String TOPIC_MODULE = "topic-module";
    8. String TOPIC_CORE = "topic-core";
    9. String TOPIC_REBALANCE = "topic-rebalance";
    10. String TOPIC_MT1 = "topic-mt1";
    11. String TOPIC_MT2 = "topic-mt2";
    12. }

  • 相关阅读:
    写了个基于 MacOS + iTerm2 自动打开窗口执行命令的工具
    Ventory制作多系统启动u盘 和 安装 windows10+ubuntu双系统
    模仿抖音直播商城带货打赏功能做一个app系统
    Java并发编程核心概念
    请小心ETCD的Compact
    如何用cmd命令进行:①删除文件夹中指定文件;②删除文件夹中所有文件;③删除文件夹。
    DSPE-PEG-amine 474922-26-4 生物分子修饰磷脂-聚乙二醇-氨基
    css、css3、scss的区别与联系
    玉米社:SEM竞价推广转化成本高?做好细节转化率蹭蹭往上涨
    Linux网络和系统管理
  • 原文地址:https://blog.csdn.net/Xx13624558575/article/details/126775372