• kafka安装和使用的入门教程


    这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

    一、安装kafka

    访问kafka官网Apache Kafka,然后点击快速开始

    紧接着,点击Download

    最后点击下载链接下载安装包

    如果下载缓慢,博主已经把安装包上传到百度网盘:

    链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
    提取码:3aoh
    --来自百度网盘超级会员V3的分享

    二、启动kafka

    经过上一步下载完成后,按照页面的提示启动kafka

    1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

    2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

    1. cd /usr
    2. tar -zxzf kafka_2.13-3.6.0.tgz

    3、启动zookeeper

    修改配置文件confg/zookeeper.properties,修改一下数据目录

    dataDir=/usr/local/zookeeper

    然后通过以下命令启动kafka自带的zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    4、启动kafka

    修改配置文件confg/server.properties,修改一下kafka保存日志的目录

    log.dirs=/usr/local/kafka/logs

    然后新开一个连接窗口,通过以下命令启动kafka

    bin/kafka-server-start.sh config/server.properties

    三、kafka发送、接收消息

    创建topic

    bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

    生产消息

    往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

    消费消息

    消费最新的消息

    新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

    消费之前的消息
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

    指定偏移量消费

     指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

    消息的分组消费
    每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
    通过以下命令在启动消费者时设置分组:
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

    四、Java中使用kafka

    通过maven官网搜索kafka的maven依赖版本

    https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0modelVersion>
    6. <groupId>org.examplegroupId>
    7. <artifactId>kafkaartifactId>
    8. <version>1.0-SNAPSHOTversion>
    9. <properties>
    10. <maven.compiler.source>8maven.compiler.source>
    11. <maven.compiler.target>8maven.compiler.target>
    12. properties>
    13. <dependencies>
    14. <dependency>
    15. <groupId>org.apache.kafkagroupId>
    16. <artifactId>kafka_2.12artifactId>
    17. <version>3.6.0version>
    18. dependency>
    19. dependencies>
    20. project>

    创建消息生产者

    生产者工厂类
    1. package producer;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.Producer;
    4. import org.apache.kafka.clients.producer.ProducerConfig;
    5. import java.util.Properties;
    6. /**
    7. * 消息生产者工厂类
    8. * @author heyunlin
    9. * @version 1.0
    10. */
    11. public class MessageProducerFactory {
    12. private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
    13. public static Producer getProducer() {
    14. //PART1:设置发送者相关属性
    15. Properties props = new Properties();
    16. // 此处配置的是kafka的端口
    17. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    18. // 配置key的序列化类
    19. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    20. // 配置value的序列化类
    21. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    22. return new KafkaProducer<>(props);
    23. }
    24. }

    测试发送消息
    1. package producer;
    2. import org.apache.kafka.clients.producer.Callback;
    3. import org.apache.kafka.clients.producer.Producer;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import org.apache.kafka.clients.producer.RecordMetadata;
    6. /**
    7. * @author heyunlin
    8. * @version 1.0
    9. */
    10. public class MessageProducer {
    11. private static final String TOPIC = "hello";
    12. public static void main(String[] args) {
    13. ProducerRecord record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");
    14. Producer producer = MessageProducerFactory.getProducer();
    15. // 同步发送消息
    16. producer.send(record);
    17. // 异步发送消息
    18. producer.send(record, new Callback() {
    19. @Override
    20. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    21. String topic = recordMetadata.topic();
    22. long offset = recordMetadata.offset();
    23. int partition = recordMetadata.partition();
    24. String message = recordMetadata.toString();
    25. System.out.println("topic = " + topic);
    26. System.out.println("offset = " + offset);
    27. System.out.println("message = " + message);
    28. System.out.println("partition = " + partition);
    29. }
    30. });
    31. // 加上这行代码才会发送消息
    32. producer.close();
    33. }
    34. }

    创建消息消费者

    消费者工厂类
    1. package consumer;
    2. import org.apache.kafka.clients.consumer.Consumer;
    3. import org.apache.kafka.clients.consumer.ConsumerConfig;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import java.util.Properties;
    6. /**
    7. * 消息生产者工厂类
    8. * @author heyunlin
    9. * @version 1.0
    10. */
    11. public class MessageConsumerFactory {
    12. private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";
    13. public static Consumer getConsumer() {
    14. //PART1:设置发送者相关属性
    15. Properties props = new Properties();
    16. //kafka地址
    17. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    18. //每个消费者要指定一个group
    19. props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");
    20. //key序列化类
    21. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    22. //value序列化类
    23. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    24. return new KafkaConsumer<>(props);
    25. }
    26. }

    测试消费消息
    1. package consumer;
    2. import org.apache.kafka.clients.consumer.Consumer;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import java.time.Duration;
    6. import java.util.Collections;
    7. /**
    8. * @author heyunlin
    9. * @version 1.0
    10. */
    11. public class MessageConsumer {
    12. private static final String TOPIC = "hello";
    13. public static void main(String[] args) {
    14. Consumer consumer = MessageConsumerFactory.getConsumer();
    15. consumer.subscribe(Collections.singletonList(TOPIC));
    16. while (true) {
    17. ConsumerRecords records = consumer.poll(Duration.ofNanos(100));
    18. for (ConsumerRecord record : records) {
    19. System.out.println(record.key() + ": " + record.value());
    20. }
    21. // 提交偏移量,避免消息重复推送
    22. consumer.commitSync(); // 同步提交
    23. // consumer.commitAsync(); // 异步提交
    24. }
    25. }
    26. }

    五、springboot整合kafka

    开始前的准备工作

    然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    然后修改application.yml,添加kafka相关配置

    1. spring:
    2. kafka:
    3. bootstrap-servers: 192.168.254.128:9092
    4. producer:
    5. acks: 1
    6. retries: 3
    7. batch-size: 16384
    8. properties:
    9. linger:
    10. ms: 0
    11. buffer-memory: 33554432
    12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    14. consumer:
    15. group-id: helloGroup
    16. enable-auto-commit: false
    17. auto-commit-interval: 1000
    18. auto-offset-reset: latest
    19. properties:
    20. request:
    21. timeout:
    22. ms: 18000
    23. session:
    24. timeout:
    25. ms: 12000
    26. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    27. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    创建消息生产者

    1. package com.example.springboot.kafka.producer;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.kafka.core.KafkaTemplate;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RequestMethod;
    6. import org.springframework.web.bind.annotation.RestController;
    7. /**
    8. * @author heyunlin
    9. * @version 1.0
    10. */
    11. @RestController
    12. @RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
    13. public class KafkaProducer {
    14. private final KafkaTemplate kafkaTemplate;
    15. @Autowired
    16. public KafkaProducer(KafkaTemplate kafkaTemplate) {
    17. this.kafkaTemplate = kafkaTemplate;
    18. }
    19. @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
    20. public String sendMessage(String message) {
    21. kafkaTemplate.send("hello", message);
    22. return "发送成功~";
    23. }
    24. }

    创建消息消费者

    1. package com.example.springboot.kafka.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.springframework.kafka.annotation.KafkaListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * @author heyunlin
    7. * @version 1.0
    8. */
    9. @Component
    10. public class KafkaConsumer {
    11. @KafkaListener(topics = "hello")
    12. public void receiveMessage(ConsumerRecord record) {
    13. String topic = record.topic();
    14. long offset = record.offset();
    15. int partition = record.partition();
    16. System.out.println("topic = " + topic);
    17. System.out.println("offset = " + offset);
    18. System.out.println("partition = " + partition);
    19. }
    20. }

    然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

     

    文章涉及的项目已经上传到gitee,按需获取~

    Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

    springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

  • 相关阅读:
    Spring Boot进阶(94):从入门到精通:Spring Boot和Prometheus监控系统的完美结合
    numpy对行操作总结
    PostgreSQL数据库----pgAdmin客户端工具的使用
    Flutter Set存储自定义对象时 如何保证唯一
    html5 语义化标签实用指南
    Linux每日智囊
    背包问题 [动态规划] (01,完全,多重,混合)
    什么是模型
    java基础---01
    java毕业设计基于javaweb+mysql数据库实现的在线学习网站|在线课堂含论文+开题报告
  • 原文地址:https://blog.csdn.net/heyl163_/article/details/133841851