• Java整合Kafka实现生产及消费


    前提条件

    项目环境

    1. 创建maven项目。
    2. pom.xml文件中引入kafka依赖。
    <dependencies>
            <dependency>
                <groupId>org.apache.kafkagroupId>
                <artifactId>kafka_2.11artifactId>
                <version>2.1.0version>
            dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    创建Topic

    创建topic命名为testtopic并指定2个分区。

    ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2
    
    • 1

    生产消息

    public class Producer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 生产参数配置
            Properties properties = new Properties();
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            int i=0;
            while (true) {
                //生产消息
                Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", "key"+i, "value"+i));
                //获取生产的数据信息
                RecordMetadata recordMetadata = future.get();
                System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());
                Thread.sleep(1000);
                i+=1;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    生产者参数配置

    // ACK机制,默认为1 (0,1,-1)
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "");
    // Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
    properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, ""); 
    // Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
    properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, ""); 
    // 生产者客户端发送消息的最大值,默认1M
    properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ""); 
    // 发送消息异常时重试次数,默认为0
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "");   
    // 重试间隔时间,默认100
    properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "");    
    // 生产消息自定义分区策略类
    properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "");
    // 开启幂等 ,默认true
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    更多配置信息查看ProducerConfig类

    生产自定义分区策略

    1. 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。
    public class PartitionPolicy implements Partitioner {
    
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
    
        private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if (null == counter) {
                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if (currentCounter != null) {
                    counter = currentCounter;
                }
            }
    
            return counter.getAndIncrement();
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    1. 参数配置。
    properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionPolicy.class.getName());
    
    • 1

    生产到指定分区

    ProducerRecord有指定分区的构造方法,设置分区号
    public ProducerRecord(String topic, Integer partition, K key, V value)

    Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", 1, "key"+i, "value"+i));
    
    • 1

    消费消息

    public class Consumer {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            //约定的编解码
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
            //默认为自动提交
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            //当设置为自动提交时,默认5秒自动提交
            //properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
            //
            //properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
            //订阅topic
            kafkaConsumer.subscribe(Arrays.asList("testtopic"));
            Set<TopicPartition> assignment = kafkaConsumer.assignment();
            ConsumerRecords<String, String> records = null;
            while (assignment.size() == 0) {
                records = kafkaConsumer.poll(Duration.ofMillis(100));
                assignment = kafkaConsumer.assignment();
            }
            /*//1.根据时间戳获取 offset,设置 offset
            Map offsetsForTimes=new HashMap<>();
            for (TopicPartition topicPartition : assignment) {
                offsetsForTimes.put(topicPartition,1669972273941L);
            }
            Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);
            offsetAndTimestampMap.forEach((tp,offsettime)->{
                kafkaConsumer.seek(tp,offsettime.offset());
            });*/
            /*//2.指定从头开始消费
            kafkaConsumer.seekToBeginning(assignment);*/
            /*//3.指定从某offset开始消费
            kafkaConsumer.seek(tp,0);*/
            while (true) {
                if (records.isEmpty()) {
                    Thread.sleep(3000);
                } else {
                    System.out.printf("records count:" + records.count());
                    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                    while (iterator.hasNext()) {
                        ConsumerRecord<String, String> record = iterator.next();
                        System.out.println(" time:" + record.timestamp() + " key:" + record.key() + " value:" + record.value() + " partition:" + record.partition() + " offset:" + record.offset());
                    }
                    kafkaConsumer.commitSync();
                }
                records = kafkaConsumer.poll(Duration.ofMillis(0));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    消费参数配置

    // 消费者必须指定一个消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
    // 消费者每次最多POLL的数量
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
    // 消费者POLL的时间间隔
    properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC, "");
    // 设置是否自动提交,默认为true
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");  
    // 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
    properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");   
    // 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    更多配置信息查看ConsumerConfig类

    offset设置方式

    如代码所示,设置offset的几种方式:

    • 指定 offset,需要自己维护 offset,方便重试。
    • 指定从头开始消费。
    • 指定 offset 为最近可用的 offset (默认)。
    • 根据时间戳获取 offset,设置 offset。

    代码仓库

    https://gitee.com/codeWBG/learn_kafka

  • 相关阅读:
    IM即时通讯开发iOS多设备字体适配方案
    R语言 | 数据框
    回溯算法笔记
    JavaScript 运算符【‘算术运算符’,‘赋值运算符’,‘一元运算符’,‘比较运算符’,‘逻辑运算符’,‘运算符优先级’】 详解和配案例 开发当中很重要
    uniapp(uncloud) 使用生态开发接口详情5(云公共模块)
    Davinci 集成NvM协议栈的步骤
    python 异步Web框架sanic
    【nvm】
    【LeetCode】94. 二叉树的中序遍历 [ 左子树 根结点 右子树 ]
    Java StringBuffer.reverse具有什么功能呢?
  • 原文地址:https://blog.csdn.net/qq_28314431/article/details/128135288