• Kafka 消息的生产和消费


    原文链接:
    Kafka Tutorial: Creating a Kafka Producer in Java (cloudurable.com)
    Kafka Tutorial: Creating a Kafka Consumer in Java (cloudurable.com)

    承接着上文我们配置好了Kafka之后,我们现在用java 来实现向Kafka里发送message 和消费message。

    首先Kafka的依赖我们需要加一下:
    添加kafka-clients依赖就可以了。其他的jar,会自动download 下来。

                org.apache.kafka

                <artifactId>kafka-clients

                0.11.0.2

            


    第一步就是要创建一个Producer.创建一个Producer,我们需要知道我们要连的kafka 的bootstrap server和要往哪个topic里面发送消息。这里假设我们的服务器有3台:"localhost:9092,localhost:9093,localhost:9094"

    Topic我们来mock一个:"my-example-topic"

    下面是我们创建KafkaProducer的代码。

    private static Producer createProducer() {

            Properties props = new Properties();

            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

            props.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducer");

            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

            return new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        }


    这里用到了序列化,这里Kafka 的key 和value 都需要序列化,这样才能持久化到硬盘。这里我们使用的序列化类是:StringSerializer和LongSerializer.

    第二步就是我们开始往Topic里发送消息。
    使用如下的代码:

    static void runProducer(final int sendMessageCount) throws Exception {

            final Producer producer = createProducer();

            long time = System.currentTimeMillis();

            try {

                for (long index =  time; index < time + sendMessageCount; index++) {

                    final ProducerRecord record =  new ProducerRecord<>(TOPIC, index, "Hello World " +index);

                    RecordMetadata metadata = producer.send(record).get();

                    long elapsedTime =  System.currentTimeMillis() -  time;

                    System.out.printf("sent record(key=%s value=%s " + "meta(partition=%d, offset=%d) time=%d \n",

                    record.key(),record.value(),metadata.partition(), metadata.offset(),elapsedTime);

                    LOGGER.info("sent record(key=" +  record.key() + ", value=%s " + record.value() +  "), meta(partition= " + metadata.partition() + ", offset=" + metadata.offset()  + ", time= " +elapsedTime + ")");

                }

            } finally {

                producer.flush();

                producer.close();

            }

        }


    然后run一个main 方法就可以运行了。


    接下来我们来写消费端。

    第一步:同样的,我们也要有一个KafkaConsumer。定义如下:

     private static Consumer createConsumer() {

            final Properties props = new Properties();

            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                    BOOTSTRAP_SERVERS);

            props.put(ConsumerConfig.GROUP_ID_CONFIG,

                    "KafkaExampleConsumer");

            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                    LongDeserializer.class.getName());

            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                    StringDeserializer.class.getName());

            // Create the consumer using props.

            final Consumer consumer =

                    new KafkaConsumer<>(props);

            // Subscribe to the topic.

            consumer.subscribe(Collections.singletonList(TOPIC));

            return consumer;

        }


    跟生产端一样,这里我们使用了StringDeserializer和LongDeserializer来反序列化Value和Key。

    第二步:有了KafkaConsumer,我们就可以消费消息了。

    static void runConsumer() {

            final Consumer consumer = createConsumer();

            final int giveUp = 100;

            int noRecordsCount = 0;

            while (true) {

                final ConsumerRecords consumerRecords =

                        consumer.poll(1000);

                if(consumerRecords.count() == 0) {

                    noRecordsCount ++;

                    if(noRecordsCount > giveUp) break;

                    else continue;

                }

    //            System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

                consumerRecords.forEach(record -> {

                    LOGGER.info("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset());

                });

                consumer.commitSync();

            }

            consumer.close();

            System.out.printf("Done.");

        }

    这样我们有了生产者,消费者,也有了Kafka Server.我们就可以做一个回路测试了。



    消费者:

     

     

  • 相关阅读:
    用户请求经过哪些处理(公网)
    隐藏Zotero批注图标(便利贴)
    Gradle之属性Properties
    从需求角度介绍PasteSpider(K8S平替部署工具适合于任何开发语言)
    金山WPS:我们当年上了微软的当,现在终于扳回一局了
    Redis Cluster
    如果面试官让你设计美团外卖的分库分表架构,就该这么说!
    华为GAUSSDB集成
    中华人民共和国网络安全法
    二叉搜索树
  • 原文地址:https://blog.csdn.net/xiaofeixia22222/article/details/126562781