• Java代码操作Kafka


    1、消息的发送与接收

    • 生产者主要的对象有: KafkaProducer ProducerRecord
    • 其中KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。 

    KafkaProducer 的创建需要指定的参数和含义:

    参数说明
    bootstrap.servers配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
    key.serializer要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
    value.serializer要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
    acks

    默认值:all。

    acks=0

    • 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
    • 该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。

    acks=1

    • 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。
    • 在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。

    acks=all

    • 首领分区会等待所有的ISR副本分区确认记录。
    • 该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
    • 这是Kafka最强的可靠性保证,等效于acks=-1
    retriesretries重试次数
    • 当消息发送出现错误的时候,系统会重发消息。
    • 跟客户端收到错误时重发一样。
    • 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
    • 否则在重试此失败消息的时候,其他的消息可能发送成功了

    其他参数可以从org.apache.kafka.clients.producer.ProducerConfig 中找到。

    • 消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。
    • 同步确认效率低,异步确认效率高,但是需要设置回调对象。

    生产者:

    1. package com.example.producer;
    2. import org.apache.kafka.clients.producer.Callback;
    3. import org.apache.kafka.clients.producer.KafkaProducer;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import org.apache.kafka.clients.producer.RecordMetadata;
    6. import org.apache.kafka.common.header.Header;
    7. import org.apache.kafka.common.header.internals.RecordHeader;
    8. import org.apache.kafka.common.serialization.IntegerSerializer;
    9. import org.apache.kafka.common.serialization.StringSerializer;
    10. import java.util.ArrayList;
    11. import java.util.HashMap;
    12. import java.util.List;
    13. import java.util.Map;
    14. import java.util.concurrent.ExecutionException;
    15. import java.util.concurrent.Future;
    16. public class MyProducer1 {
    17. public static void main(String[] args) throws ExecutionException, InterruptedException {
    18. Map configs = new HashMap<>();
    19. // 指定初始连接用到的broker地址,如果是集群,则可以通过此初始连接发现集群中的其他broker
    20. configs.put("bootstrap.servers", "192.168.80.121:9092");
    21. // 指定key的序列化类
    22. configs.put("key.serializer", IntegerSerializer.class);
    23. // 指定value的序列化类
    24. configs.put("value.serializer", StringSerializer.class);
    25. /* configs.put("acks", "all");
    26. configs.put("retries", 3); */
    27. // 创建kafkaProducer对象
    28. KafkaProducer producer = new KafkaProducer(configs);
    29. // 用于设置用户自定义的消息头字段
    30. List
      headers = new ArrayList<>();
    31. headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
    32. // 封装消息
    33. ProducerRecord record = new ProducerRecord(
    34. "topic_1", // 主题名称
    35. 0, // 分区编号,现在只有一个分区,所以是0
    36. 0, // 数字作为key
    37. "hello world", // 字符串作为value
    38. headers
    39. );
    40. // 消息的同步确认
    41. /* Future future = producer.send(record);
    42. RecordMetadata metadata = future.get();
    43. System.out.println("消息的主题:" + metadata.topic());
    44. System.out.println("消息的分区:" + metadata.partition());
    45. System.out.println("消息的偏移量:" + metadata.offset()); */
    46. // 消息的异步确认
    47. producer.send(record, new Callback() {
    48. @Override
    49. public void onCompletion(RecordMetadata metadata, Exception e) {
    50. if (e==null) {
    51. System.out.println("消息的主题:" + metadata.topic());
    52. System.out.println("消息的分区:" + metadata.partition());
    53. System.out.println("消息的偏移量:" + metadata.offset());
    54. }else{
    55. System.out.println("异常消息:"+e.getMessage());
    56. }
    57. }
    58. });
    59. // 关闭生产者
    60. producer.close();
    61. }
    62. }

    消费者:

    1. package com.example.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.serialization.IntegerDeserializer;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.util.Arrays;
    9. import java.util.HashMap;
    10. import java.util.Map;
    11. import java.util.function.Consumer;
    12. public class MyConsumer {
    13. public static void main(String[] args) {
    14. Map configs = new HashMap<>();
    15. // 指定初始连接用到的broker地址
    16. // configs.put("bootstrap.servers", "192.168.80.121:9092");
    17. // 上面方式如果怕写错,可以尝试下面这种方法
    18. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.121:9092");
    19. // 指定key的反序列化类
    20. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    21. // 指定value的反序列化类
    22. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    23. // 配置消费组id
    24. configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo");
    25. // earliest:如果找不到当前消费者的有效偏移量,自自动重置到最开始
    26. // latest:表示直接重置到消息偏移量的最后一个
    27. configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    28. KafkaConsumer consumer = new KafkaConsumer(configs);
    29. // 先订阅,在消费
    30. consumer.subscribe(Arrays.asList("topic_1"));
    31. /* while (true) {
    32. ConsumerRecords consumerRecords = consumer.poll(3_000);
    33. } */
    34. // 如果主题中没有可消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
    35. // 如果还没有拉取到,过三秒再次拉取,防止while循环过于密集的poll调用
    36. // 批量从主题的分区拉取消息
    37. ConsumerRecords consumerRecords = consumer.poll(3_000); // 指定拉取消息的时间间隔
    38. // 遍历本次从主题分区拉取的批量消息
    39. consumerRecords.forEach(new Consumer>() {
    40. @Override
    41. public void accept(ConsumerRecord record) {
    42. System.out.println("========================================");
    43. System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
    44. System.out.println("消息的key:" + record.key());
    45. System.out.println("消息的偏移量:" + record.offset());
    46. System.out.println("消息的分区号:" + record.partition());
    47. System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
    48. System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
    49. System.out.println("消息的时间戳:" + record.timestamp());
    50. System.out.println("消息的时间戳类型:" + record.timestampType());
    51. System.out.println("消息的主题:" + record.topic());
    52. System.out.println("消息的值:" + record.value());
    53. }
    54. });
    55. consumer.close();
    56. }
    57. }

    2、SpringBoot Kafka

    (1)pom.xml文件

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <parent>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-parentartifactId>
    8. <version>2.2.8.RELEASEversion>
    9. <relativePath/>
    10. parent>
    11. <groupId>com.examplegroupId>
    12. <artifactId>demo_02_springboot-kafkaartifactId>
    13. <version>0.0.1-SNAPSHOTversion>
    14. <name>demo_02_springboot-kafkaname>
    15. <description>demo_02_springboot-kafkadescription>
    16. <properties>
    17. <java.version>8java.version>
    18. properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.springframework.bootgroupId>
    22. <artifactId>spring-boot-starter-webartifactId>
    23. dependency>
    24. <dependency>
    25. <groupId>org.springframework.kafkagroupId>
    26. <artifactId>spring-kafkaartifactId>
    27. dependency>
    28. <dependency>
    29. <groupId>org.springframework.bootgroupId>
    30. <artifactId>spring-boot-starter-testartifactId>
    31. <scope>testscope>
    32. <exclusions>
    33. <exclusion>
    34. <groupId>org.junit.vintagegroupId>
    35. <artifactId>junit-vintage-engineartifactId>
    36. exclusion>
    37. exclusions>
    38. dependency>
    39. <dependency>
    40. <groupId>org.springframework.kafkagroupId>
    41. <artifactId>spring-kafka-testartifactId>
    42. <scope>testscope>
    43. dependency>
    44. dependencies>
    45. <build>
    46. <plugins>
    47. <plugin>
    48. <groupId>org.springframework.bootgroupId>
    49. <artifactId>spring-boot-maven-pluginartifactId>
    50. plugin>
    51. plugins>
    52. build>
    53. project>

    (2)application.properties

    1. spring.application.name=springboot-kafka-02
    2. server.port=8080
    3. # 用于建立初始连接的broker地址
    4. spring.kafka.bootstrap-servers=192.168.80.121:9092
    5. # producer用到的key和value的序列化类
    6. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
    7. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    8. # 默认的批处理记录数
    9. spring.kafka.producer.batch-size=16384
    10. # 32MB的总发送缓存
    11. spring.kafka.producer.buffer-memory=33554432
    12. # consumer用到的key和value的反序列化类
    13. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
    14. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    15. # consumer的消费组id
    16. spring.kafka.consumer.group-id=springboot-consumer02
    17. # 是否自动提交消费者偏移量
    18. spring.kafka.consumer.enable-auto-commit=true
    19. # 每隔100ms向broker提交一次偏移量
    20. spring.kafka.consumer.auto-commit-interval=100
    21. # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
    22. spring.kafka.consumer.auto-offset-reset=earliest

    (3)启动类

    1. package com.example.demo;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class Demo02SpringbootKafkaApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(Demo02SpringbootKafkaApplication.class, args);
    8. }
    9. }

    (4)KafkaConfig(在这里可以进行主题的创建、自定义了kafkaAdmin对象等一系列配置,也可以省略,如果kafka在连接主题时,发现没有,KafkaAdmin这个类会自动帮我们创建)

    1. package com.example.demo.config;
    2. import org.apache.kafka.clients.admin.NewTopic;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class KafkaConfig {
    7. // 创建主题
    8. @Bean
    9. public NewTopic topic1() {
    10. return new NewTopic("nptc-01",3, (short) 1);
    11. }
    12. @Bean
    13. public NewTopic topic2() {
    14. return new NewTopic("nptc-02",5, (short) 1);
    15. }
    16. }

    (5)生产者

    同步方式 

    1. package com.example.demo.controlller;
    2. import org.apache.kafka.clients.producer.RecordMetadata;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.kafka.core.KafkaTemplate;
    5. import org.springframework.kafka.support.SendResult;
    6. import org.springframework.util.concurrent.ListenableFuture;
    7. import org.springframework.web.bind.annotation.PathVariable;
    8. import org.springframework.web.bind.annotation.RequestMapping;
    9. import org.springframework.web.bind.annotation.RestController;
    10. import java.util.concurrent.CompletableFuture;
    11. import java.util.concurrent.ExecutionException;
    12. @RestController
    13. public class KafkaSyncProducerController {
    14. @Autowired
    15. private KafkaTemplate template;
    16. @RequestMapping("send/sync/{message}")
    17. public String send(@PathVariable String message) throws ExecutionException, InterruptedException {
    18. ListenableFuture> future = template.send("topic-spring-01", 0, 0, message);
    19. // 同步发送消息
    20. SendResult sendResult = future.get();
    21. RecordMetadata metadata = sendResult.getRecordMetadata();
    22. System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
    23. return "success";
    24. }
    25. }

    异步回调方式

    1. package com.example.demo.controlller;
    2. import org.apache.kafka.clients.producer.RecordMetadata;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.kafka.core.KafkaTemplate;
    5. import org.springframework.kafka.support.SendResult;
    6. import org.springframework.util.concurrent.ListenableFuture;
    7. import org.springframework.util.concurrent.ListenableFutureCallback;
    8. import org.springframework.web.bind.annotation.PathVariable;
    9. import org.springframework.web.bind.annotation.RequestMapping;
    10. import org.springframework.web.bind.annotation.RestController;
    11. @RestController
    12. public class KafkaAsyncProducerController {
    13. @Autowired
    14. private KafkaTemplate template;
    15. @RequestMapping("send/async/{message}")
    16. public String send(@PathVariable String message) {
    17. ListenableFuture> future = (ListenableFuture>) template.send("topic-spring-01", 0, 1, message);
    18. future.addCallback(new ListenableFutureCallback>() {
    19. @Override
    20. public void onFailure(Throwable ex) {
    21. System.out.println("发送消息失败:" + ex.getMessage());
    22. }
    23. @Override
    24. public void onSuccess(SendResult result) {
    25. RecordMetadata metadata = result.getRecordMetadata();
    26. System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
    27. }
    28. });
    29. return "success";
    30. }
    31. }

    (6)消费者

    1. package com.example.demo.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.springframework.kafka.annotation.KafkaListener;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class MyConsumer {
    7. @KafkaListener(topics = "topic-spring-01")
    8. public void onMessage(ConsumerRecord record) {
    9. System.out.println("消费者收到的消息:" + record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
    10. }
    11. }
  • 相关阅读:
    【git】取消git代理
    大数据技术基础实验十三:Kafka实验——订阅推送示例
    使用pdf.js实现pdf的预览与打印
    Java基础static关键字
    精读A Study of Face Obfuscation in ImageNet
    浅谈系统安全测试
    学生如何利用假期提升个人能力?
    综合布线系统计算机辅助设计nVisual
    js基础算法
    远程管理SSH服务
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/128173630