1、Kafka实战应用场景
2、 Kafka基本概念
Kafka broker。kafka服务端,Consumer消费者 Producer生产者
Topic与分区是一对多的关系;offset是消息分区中的唯一标识,通过offset定位具体的分区找到消息所在。
分区:可看成是一个可追加的日志文件。
分区是有序的,Topic是无序的。
分区指定好了,后期也是能够修改的,扩展性
ISR(In Sync Replicas)详解
3、zookeeper集群环境搭建
3.1修改/etc/hostname
vim /etc/hostname
3.2修改/etc/hosts
vim /etc/hosts
3.3 注意关闭防火墙状态
3.4 上传apache-zookeeper-3.8.0.tar.gz压缩包到服务器的/home/software/8-apache-zookeeper-3.8.0/目录下。
压缩包下载地址:Index of /zookeeper/zookeeper-3.8.0
注意:要下载apache-zookeeper-3.8.0-bin.tar.gz这个。
cd /home/software/8-apache-zookeeper-3.8.0/
解压:
tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz
重命名:
mv apache-zookeeper-3.8.0-bin zookeeper
3.5 修改环境变量:
vim /etc/profile
添加zookeeper的全局变量
- # zookeeper environment
- export ZOOKEEPER_HOME=/home/software/8-apache-zookeeper-3.8.0/zookeeper
- export PATH=.:$ZOOKEEPER_HOME/bin
刷新环境变量:
source /etc/profile
/etc/profile内容:
- # /etc/profile
-
- # System wide environment and startup programs, for login setup
- # Functions and aliases go in /etc/bashrc
-
- # It's NOT a good idea to change this file unless you know what you
- # are doing. It's much better to create a custom.sh shell script in
- # /etc/profile.d/ to make custom changes to your environment, as this
- # will prevent the need for merging in future updates.
-
- pathmunge () {
- case ":${PATH}:" in
- *:"$1":*)
- ;;
- *)
- if [ "$2" = "after" ] ; then
- PATH=$PATH:$1
- else
- PATH=$1:$PATH
- fi
- esac
- }
-
-
- if [ -x /usr/bin/id ]; then
- if [ -z "$EUID" ]; then
- # ksh workaround
- EUID=`/usr/bin/id -u`
- UID=`/usr/bin/id -ru`
- fi
- USER="`/usr/bin/id -un`"
- LOGNAME=$USER
- MAIL="/var/spool/mail/$USER"
- fi
-
- # Path manipulation
- if [ "$EUID" = "0" ]; then
- pathmunge /usr/sbin
- pathmunge /usr/local/sbin
- else
- pathmunge /usr/local/sbin after
- pathmunge /usr/sbin after
- fi
-
- HOSTNAME=`/usr/bin/hostname 2>/dev/null`
- HISTSIZE=1000
- if [ "$HISTCONTROL" = "ignorespace" ] ; then
- export HISTCONTROL=ignoreboth
- else
- export HISTCONTROL=ignoredups
- fi
-
- export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL
-
- # By default, we want umask to get set. This sets it for login shell
- # Current threshold for system reserved uid/gids is 200
- # You could check uidgid reservation validity in
- # /usr/share/doc/setup-*/uidgid file
- if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then
- umask 002
- else
- umask 022
- fi
-
- for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do
- if [ -r "$i" ]; then
- if [ "${-#*i}" != "$-" ]; then
- . "$i"
- else
- . "$i" >/dev/null
- fi
- fi
- done
-
- unset i
- unset -f pathmunge
-
-
-
- # java environment
- export JAVA_HOME=/home/software/1-jdk/jdk1.8.0_341
- export CLASSPATH=.:$JAVA_HOME/lib
- # export PATH=.:$JAVA_HOME/bin:$JAVA_HOME/lib:$PATH
-
- # Erlang environment
- ERLANG_HOME=/usr/local/erlang/
- #export PATH=$PATH:$ERLANG_HOME/bin
- export ERLANG_HOME
-
- # RabbitMQ environment
- # export PATH=$PATH:$RABBITMQ_HOME/sbin/
- export RABBITMQ_HOME=/usr/local/rabbitmq/rabbitmq_server-3.10.7
-
- # zookeeper environment
- export ZOOKEEPER_HOME=/home/software/8-apache-zookeeper-3.8.0/zookeeper
- export PATH=.:$JAVA_HOME/bin:$JAVA_HOME/lib:$ZOOKEEPER_HOME/bin:$PATH:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin/
3.6 修改zookeeper配置文件:
3.6.1 首先到指定目录:
cd /home/software/8-apache-zookeeper-3.8.0/zookeeper/conf/
3.6.2 然后重命名zoo_sample.cfg文件,重命名后为zoo.cfg。
mv zoo_sample.cfg zoo.cfg
3.6.3 修改两处地方,然后保存退出:
vim /home/software/8-apache-zookeeper-3.8.0/zookeeper/conf/zoo.cfg
dataDir=/home/software/8-apache-zookeeper-3.8.0/zookeeper/data
server.0=centos130:2888:3888
3.6.4 增加服务器表示配置,需要2个步骤,第一步是创建文件夹和文件,第二是添加配置内容:
mkdir /home/software/8-apache-zookeeper-3.8.0/zookeeper/data/
vim /home/software/8-apache-zookeeper-3.8.0/zookeeper/data/myid
注意这里每一台服务器的myid文件内容不同,分别修改里面的值为0,1,2;与我们之前的zoo.cfg配置文件里:server.0,server.1,server.2顺序相对应,然后保存退出
3.7 到此为止,Zookeeper集群环境大功告成(我只弄一台虚拟主机,生产环境可以按照这个文档,配置多个虚拟主机)!启动Zookeeper命令:
启动路径:/home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/(也可以在任意目录,因为配置了环境变量)
执行命令:zkServer.sh start(注意这里3台机器都要进行启动,启动之后可以查看状态。)
查看状态:zkServer.sh status(在三个节点上校验zk的mode,会看到一个leader和两个follower)
zkCli.sh进入zookeeper客户端
根据提示命令进行操作:
查找:ls / ls zookeeper
创建并赋值:create /test zookeeoer
获取:get /test
设值:set /test zookeeper1234
PS1:任意节点都可以看到zookeeper集群的数据一致性
PS2:创建节点有两种类型:短暂(ephemeral)和持久(persistent)。
3.8 zookeeper开机启动
cd /etc/rc.d/init.d
touch zookeeper
chmod -X 777 zookeeper
vim zookeeper
- #! /bin/bash
- #chkconfig:2345 20 90
- #description:zookeeper
- #processname:zookeeper
- export JAVA_HOME=/home/software/1-jdk/jdk1.8.0_341
- export PATH=$JAVA_HOME/bin:$PATH
- case $1 in
- start) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh start;;
- stop) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh stop;;
- status) /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh status;;
- restart /home/software/8-apache-zookeeper-3.8.0/zookeeper/bin/zkServer.sh restart ;;
- *) echo "require start|stop|status|restart" ;;
- esac
开机启动配置:
chkconfig zookeeper on
验证:
chkconfig --add zookeeper
chkconfig --list zookeeper
执行reboot重启命令后,执行zkServer.sh status命令查看zookeeper状态。
Zookeeper数据查看工具ZooInspector
下载地址:https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
下载后解压,在解压目录/build目录下,可以看到编译后的zookeeper-dev-ZooInspector.jar包。
直接双击jar包就可以打开页面。
连接Zookeeper
4、kafka集群环境搭建
下载地址:https://kafka.apache.org/downloads
kafka环境搭建准备:
4.1 将kafka安装包上传服务器的/home/software/9-kafka/目录下
4.2 进入/home/software/9-kafka/,解压复制到/usr/local目录下
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
4.3 进入/usr/local 进入重命名
mv kafka_2.13-3.2.1 kafka_2.13
4.4 进入/usr/local/kafka_2.13/config目录下,修改server.properties配置文件
vim /usr/local/kafka_2.13/config/server.properties
##4.4.1 集群参数
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
##4.4.2 修改log.dirs
log.dirs=/usr/local/kafka_2.13/kafka-logs
##4.4.3修改num.partition
num.partitions=5
##4.4.4 新增参数 192.168.110.130 服务器ip 9092 #默认端口
host.name=192.168.110.130
advertised.host.name=192.168.110.130
port=9092
##4.4.5 增加zookeeper地址,以下三台zk的地址,是我提前搭建了.
##zookeeper.connect=192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181
zookeeper.connect=192.168.110.130:2181
##4.4.6 打开advertised.listeners配置,修改里面的IP
advertised.listeners=PLAINTEXT://真实服务器IP:9092
##4.4.7 修改完成,退出创建/usr/local/kafka_2.13/kafka-logs 目录下
mkdir /usr/local/kafka_2.13/kafka-logs
#4.4.8 进入bin目录下 执行启动脚本,看到输出 KafkaServer id=0] started (kafka.server.KafkaServer) 完成了
启动命令:
/usr/local/kafka_2.13/bin/kafka-server-start.sh /usr/local/kafka_2.13/config/server.properties &
kafka控制台管理工具安装
kafka-manager 工具目前改名为cmak,下载地址为:
CMAK(kafka manager)安装包下载 | Wolfogre's Blog
4.5 将kafka-manager-2.0.0.2.zip 上传到/home/software/10-kafka-manager/目录下 ,然后解压到 /usr/local/
unzip kafka-manager-2.0.0.2.zip -d /usr/local/
4.6 进入/usr/local/kafka-manager-2.0.0.2/conf 配置文件修改参数application.conf,kafka-manager.zkhosts
#kafka-manager.zkhosts="192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181"
kafka-manager.zkhosts="192.168.110.130:2181"
4.7 启动控制台
/usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &
4.8 浏览器访问控制台,默认端口9000
http://192.168.110.130:9000/
4.9 在页面上添加集群
#填充
#192.168.33.111:2181,192.168.33.113:2181,192.168.33.114:2181
192.168.110.130:2181
4.10 集群验证:
4.10.1创建tpoic
4.10.2 消息发送与接收验证
cd /usr/local/kafka_2.13/bin/
4.11 启动发送消息的脚本
kafka-console-producer.sh --broker-list 192.168.110.130:9092 --topic test
注:
4.12 另开一个窗口,启动接收消息的脚本
kafka-console-consumer.sh --bootstrap-server 192.168.110.130:9092 --topic test
#启动消费者之后,可以在生产者中输出消费,消费者窗口就能收到了
5、Kafka入门编码
演示代码:
- public interface Const {
-
- String TOPIC_QUICKSTART = "topic-quickstart";
-
- String TOPIC_NORMAL = "topic-normal";
-
- String TOPIC_INTERCEPTOR = "topic-interceptor";
-
- String TOPIC_SERIAL = "topic-serial";
-
- String TOPIC_PARTITION = "topic-partition";
-
- String TOPIC_MODULE = "topic-module";
-
- String TOPIC_CORE = "topic-core";
-
- String TOPIC_REBALANCE = "topic-rebalance";
-
- String TOPIC_MT1 = "topic-mt1";
-
- String TOPIC_MT2 = "topic-mt2";
-
- }
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class User {
-
- private String id;
-
- private String name;
-
- }
- import com.alibaba.fastjson.JSON;
- import com.lvxiaosha.kafka.api.Const;
- import com.lvxiaosha.kafka.api.User;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class QuickStartProducer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- // 1.配置生产者启动的关键属性参数
-
- // 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
- // 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
- properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
- // 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
- // Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
- // A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
- // 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
- // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // VALUE: 实际发送消息的内容
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 2.创建kafka生产者对象 传递properties属性参数集合
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
- for(int i = 0; i <10; i ++) {
- // 3.构造消息内容
- User user = new User("00" + i, "张三");
- ProducerRecord<String, String> record =
- // arg1:topic , arg2:实际的消息体内容
- new ProducerRecord<String, String>(Const.TOPIC_QUICKSTART,
- JSON.toJSONString(user));
-
- // 4.发送消息
- producer.send(record);
- }
-
-
- // 5.关闭生产者
- producer.close();
-
- }
- }
- import com.lvxiaosha.kafka.api.Const;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
-
- public class QuickStartConsumer {
-
- public static void main(String[] args) {
-
- // 1. 配置属性参数
- Properties properties = new Properties();
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");
-
- // org.apache.kafka.common.serialization.StringDeserializer
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // 非常重要的属性配置:与我们消费者订阅组有关系
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
- // 常规属性:会话连接超时时间
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- // 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
-
- // 2. 创建消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- // 3. 订阅你感兴趣的主题:Const.TOPIC_QUICKSTART
- consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));
-
- System.err.println("quickstart consumer started...");
-
- try {
- // 4.采用拉取消息的方式消费数据
- while(true) {
- // 等待多久拉取一次消息
- // 拉取TOPIC_QUICKSTART主题里面所有的消息
- // topic 和 partition是 一对多的关系,一个topic可以有多个partition
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- // 因为消息是在partition中存储的,所以需要遍历partition集合
- for(TopicPartition topicPartition : records.partitions()) {
- // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- // 获取TopicPartition对应的主题名称
- String topic = topicPartition.topic();
- // 获取当前topicPartition下的消息条数
- int size = partitionRecords.size();
-
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
- topic,
- topicPartition.partition(),
- size));
-
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- // 实际的数据内容
- String value = consumerRecord.value();
- // 当前获取的消息偏移量
- long offset = consumerRecord.offset();
- // ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
- // 表示下一次从什么位置(offset)拉取消息
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s",
- value, offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>Kafka</artifactId>
- <groupId>com.lvxiaosha</groupId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>learn-kafka</artifactId>
-
- <properties>
- <maven.compiler.source>17</maven.compiler.source>
- <maven.compiler.target>17</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <!-- 排除spring-boot-starter-logging -->
- <exclusions>
- <exclusion>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <!-- log4j2 -->
- <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-log4j2</artifactId>
- <version>2.5.6</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.58</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>2.1.1</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <finalName>learn-kafka</finalName>
- <!-- 打包时包含properties、xml -->
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <!-- 是否替换资源中的属性-->
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <mainClass>com.lvxiaosha.kafka.api.LearnKafkaApplication</mainClass>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-
- </project>
6、Kafka基本配置参数详解
zookeeper.connect 必填项ip:port, 多个zk节点用逗号隔开。
listeners 用的比较少。指明Kafka监听的客户端地址列表。
broker.id 比较重要,必须不同
log.dir 和 log.dirs 用来存储Kafka文件的目录
message.max.bytes:用来指定broker能够接受的单个消息最大值,默认1M左右。
group.initial.rebalance.delay.ms,默认是3秒钟。用户需要在server.properties文件中自行修改为想要配置的值。这个参数的主要效果就是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。目前来看,这个参数的使用还是很方便的。