• java生产消费kafka消息


    maven依赖

    
        org.apache.kafka
        kafka-clients
        2.3.0
    

    生产者代码

    package com.kafka.demo;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class KafkaProducerTest implements Runnable {
    
        private final KafkaProducer producer;
        private final String topic;
    
    
        public  KafkaProducerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "xxx.xxx.xx.xx:9092");
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", ByteArraySerializer.class);
            this.producer = new KafkaProducer(props);
            this.topic = topicName;
    
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            try {
                for (; ; ) {
                    String messageStr = "hello,world";
                    System.out.println("-----------------------"+messageStr);
                    producer.send(new ProducerRecord(topic, messageStr.getBytes()));
                    //生产了100条就打印
                    if (messageNo % 2 == 0) {
                        System.out.println("--------------------------------------------:" + messageNo + "条");
                        System.out.println("发送的信息:" + messageStr);
                        break;
                    }
                    //生产1000条就退出
                    if (messageNo / 1000000 == 1) {
                        System.out.println("成功发送了" + messageNo + "条");
                        break;
                    }
                    int num = (int)(Math.random()*10) + 1;
                    System.out.println(num);
                    Thread.sleep(num*1000);
                    messageNo++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
    
        }
    
        public static void main(String args[]) {
            KafkaProducerTest test = new KafkaProducerTest("kafka-send");
            Thread thread = new Thread(test);
            thread.start();
        }
    }
    

    消费者代码

    package com.kafka.demo;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    
    public class KafkaConsumerTest implements Runnable {
    
        private final KafkaConsumer consumer;
        private ConsumerRecords msgList;
        private final String topic;
        private static final String GROUPID = "c_test";//消费者组,随便填
    
        public KafkaConsumerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("security.protocol", "PLAINTEXT");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");//从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                for (; ; ) {
                    msgList = consumer.poll(1000);
                    if (null != msgList && msgList.count() > 0) {
                        for (ConsumerRecord record : msgList) {
                            String value = record.value();
                            System.out.println(messageNo+"-----------------------------------------------------"+value);
                            messageNo++;
                            consumer.commitAsync();
                        }
                    } else {
                        Thread.sleep(100);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    
        public static void main(String args[]) {
            KafkaConsumerTest test1 = new KafkaConsumerTest("kafka-send");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    }
    
  • 相关阅读:
    算法入门(二):简单排序算法
    【面向对象程序设计】Java大作业 汽车租赁管理系统V4.0
    使用sonar-scanner扫描代码
    一个接口有多个实现类时,调用接口时,如何判定调用的哪个实现类?
    JavaScript基础之八JavaScript面向对象
    基于 Github 平台的 .NET 开源项目模板. 嘎嘎实用!
    appium ios webview
    让您了解GPS北斗卫星时间同步系统(NTP时钟同步)重要性
    冷门CSS属性你了解多少?
    计算机毕业设计(附源码)python幼儿影视节目智能推荐系统
  • 原文地址:https://blog.csdn.net/u010479989/article/details/126779664