• 手拉手springboot整合kafka发送消息


    环境介绍
    技术栈springboot+mybatis-plus+mysql+rocketmq
    软件版本
    mysql8
    IDEAIntelliJ IDEA 2022.2.1
    JDK17
    Spring Boot3.1.7
    kafka2.13-3.7.0

    创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

    springboot加入依赖kafka

    1. org.springframework.kafka
    2. spring-kafka

    加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

    application.yml配置连接kafka

    1. spring:
    2. kafka:
    3. bootstrap-servers: 192.168.68.133:9092

    生产者

    发送消息

    1. @Resource
    2. private KafkaTemplate kafkaTemplate;
    3. @Test
    4. void kafkaSendTest(){
    5. kafkaTemplate.send("kafkamsg01","hello kafka");
    6. }

    消费者

    接收消息

    1. @Component
    2. public class KafkaConsumer {
    3. @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
    4. public void consume(String message){
    5. System.out.println("接收到消息:"+message);
    6. }
    7. }

    若没有配置groupid

    Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

    1. @Component
    2. public class KafkaConsumer {
    3. @KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
    4. public void consume(String message){
    5. System.out.println("接收到消息:"+message);
    6. }
    7. }

    想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

    application.yml需要将auto.offset.reset设置为earliest

    spring:
    kafka:
    bootstrap-servers: 192.168.68.133:9092
    consumer:
    auto-offset-reset: earliest

    Earliest:将偏移量重置为最早的偏移量

    Latest: 将偏移量重置为最新的偏移量

    None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

    Exception: 向消费者抛出异常

    重置消费者组偏移量

    ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

    重置完成

    Spring-kafka生产者发送消息

    .send与sendDefault()方法都返回CompletableFuture>;

    CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

    1. @Resource
    2. private KafkaTemplate kafkaTemplate;
    3. @Test
    4. void kafkaSendTest(){
    5. kafkaTemplate.send("kafkamsg01","hello kafka");
    6. }

    发送Message

    1. @Test
    2. void kafkaSendMessageTest1(){
    3. //通过构建器模式创建Message
    4. Message message = MessageBuilder.withPayload("hello kafka send message")
    5. .setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
    6. .build();
    7. kafkaTemplate.send(message);
    8. }

    SendProducerRecord

    String topic, Integer partition, Long timestamp, K key, V value, Iterable

    headers

    1. @Test
    2. void kafkaSendProducerRecordTest1() {
    3. //参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable
      headers
    4. Headers headers = new RecordHeaders();
    5. headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
    6. ProducerRecord record = new ProducerRecord(
    7. "kafkaTopic01",
    8. 0,
    9. System.currentTimeMillis(),
    10. "key",
    11. "hello kafka send message");
    12. kafkaTemplate.send(record);
    13. }

    默认主题发送消息

    yml配置默认主题

    template:
    default-topic: default-topic

    1. @Test
    2. void kafkaSendDefaultTest01(){
    3. kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
    4. }

    发送Object消息

    序列化默认为String

    1. @Resource
    2. private KafkaTemplate kafkaTemplate1;
    3. @Test
    4. void kafkaSendObject(){
    5. MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
    6. //分区是null,kafka自行决定消息发送到哪个分区
    7. kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
    8. }

    Replica副本

    Replica副本:为实现备份公共,保证集群中的某个节点发生故障时,确保节点上的partition数据不丢失,且kafka仍然能够正常运行。
    Replica副本分为leader Replica和Follower Replica
    leader:每个分区多个副本中的主副本,生产者发送数据以及消费者消费数据都来说leader副本。
    Follower:每个分区多个副本中的从副本,实时从leader副本中同步数据,保持和leader副本数据同步,当leader副本发送故障,节点中的某个Follower副本会变成新的leader副本。

    指定topic分区及副本

    • 通过脚本命令创建topic时指定分区和副本

    --replication-factor需要小于等于节点个数,不能为0,默认为1

    --replication-factor 1表示只有本身

    --replication-factor 2 表示本身+副本

    ./kafka-topics.sh --create --topic testTopic --partitions 3 --replication-factor 2 --bootstrap-server 127.0.0.1:9092
    • 代码指定分区及副本

    配置bean

    1. @Configuration
    2. public class kafkaConfig {
    3. @Bean
    4. public NewTopic newTopic(){
    5. return new NewTopic("topic1", 3, (short) 2);
    6. }
    7. }

    分区策略

    Kafak根据不同策略将数据分配到不同的分区

    • 默认分配策略:BuiltlnPartitioner
    • 轮询分配策略:RoundRobinPartitioner 接口:Partitioner

    1. @Configuration
    2. public class kafkaConfig {
    3. //读取application.yml bootstrap-servers
    4. @Value("${spring.kafka.bootstrap-servers}")
    5. private String bootstrapServers;
    6. //读取application.yml bootstrap-servers
    7. @Value("${spring.kafka.producer.value-serializer}")
    8. private String valueSerializer;
    9. @Value("${spring.kafka.producer.key-serializer}")
    10. private String keySerializer;
    11. @Bean
    12. public KafkaTemplate kafkaTemplate(){
    13. return new KafkaTemplate<>(producerFactory());
    14. }
    15. public Map producerConfigs(){
    16. Map props = new HashMap<>();
    17. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
    18. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
    19. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
    20. props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
    21. return props;
    22. }
    23. /**
    24. * 生产者工厂
    25. * @return
    26. */
    27. public ProducerFactory producerFactory(){
    28. return new DefaultKafkaProducerFactory<>(producerConfigs());
    29. }
    30. }

    • 手动指定

    • 自定义策略:自定义类实现Partitioner接口;

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

  • 相关阅读:
    【Java基础】方法重写、修饰符、权限修饰符及final、static关键字
    MySQL主从同步原理
    【shell】shell自动化脚本笔记与注意事项【未完待续...】
    RabbitMQ入门案例之Direct模式
    Milvus 编译环境演进
    【HBZ分享】高并发下如何设计缓存来提升系统性能?
    SpringSecurity基础:授权
    Flask狼书笔记 | 05_数据库
    几个简单的JavaScript面试题
    JS类型判断与转换
  • 原文地址:https://blog.csdn.net/weixin_47268883/article/details/139335518