• java生产消费kafka消息


    maven依赖

    
        org.apache.kafka
        kafka-clients
        2.3.0
    

    生产者代码

    package com.kafka.demo;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class KafkaProducerTest implements Runnable {
    
        private final KafkaProducer producer;
        private final String topic;
    
    
        public  KafkaProducerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "xxx.xxx.xx.xx:9092");
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", ByteArraySerializer.class);
            this.producer = new KafkaProducer(props);
            this.topic = topicName;
    
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            try {
                for (; ; ) {
                    String messageStr = "hello,world";
                    System.out.println("-----------------------"+messageStr);
                    producer.send(new ProducerRecord(topic, messageStr.getBytes()));
                    //生产了100条就打印
                    if (messageNo % 2 == 0) {
                        System.out.println("--------------------------------------------:" + messageNo + "条");
                        System.out.println("发送的信息:" + messageStr);
                        break;
                    }
                    //生产1000条就退出
                    if (messageNo / 1000000 == 1) {
                        System.out.println("成功发送了" + messageNo + "条");
                        break;
                    }
                    int num = (int)(Math.random()*10) + 1;
                    System.out.println(num);
                    Thread.sleep(num*1000);
                    messageNo++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
    
        }
    
        public static void main(String args[]) {
            KafkaProducerTest test = new KafkaProducerTest("kafka-send");
            Thread thread = new Thread(test);
            thread.start();
        }
    }
    

    消费者代码

    package com.kafka.demo;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    
    public class KafkaConsumerTest implements Runnable {
    
        private final KafkaConsumer consumer;
        private ConsumerRecords msgList;
        private final String topic;
        private static final String GROUPID = "c_test";//消费者组,随便填
    
        public KafkaConsumerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("security.protocol", "PLAINTEXT");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");//从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                for (; ; ) {
                    msgList = consumer.poll(1000);
                    if (null != msgList && msgList.count() > 0) {
                        for (ConsumerRecord record : msgList) {
                            String value = record.value();
                            System.out.println(messageNo+"-----------------------------------------------------"+value);
                            messageNo++;
                            consumer.commitAsync();
                        }
                    } else {
                        Thread.sleep(100);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    
        public static void main(String args[]) {
            KafkaConsumerTest test1 = new KafkaConsumerTest("kafka-send");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    }
    
  • 相关阅读:
    计算机毕业设计Java新冠疫苗预约系统(源码+系统+mysql数据库+Lw文档)
    用C++语言写一个可读的回调函数
    【luogu AT5147】Negative Cycle(差分约束)(DP)
    开发模型的特点对照表
    Docker
    RollBack Rx Professional RMC 安装教程
    05-Nebula Graph 图数据 可视化
    给el-form-item,添加key的场景
    受电诱骗快充取电芯片XSP08:PD+QC+华为+三星多种协议9V12V15V20V
    JVM如何优化
  • 原文地址:https://blog.csdn.net/u010479989/article/details/126779664