ApacheKafka 是一个分布式的流处理平台。它具有以下特点:
Kafka 的基本数据单元被称为 message(消息),为减少网络开销,提高效率,多个消息会被放入同一批次 (Batch) 中后再写入。
Kafka 的消息通过 Topics(主题) 进行分类,一个主题可以被分为若干个 Partitions(分区),一个分区就是一个提交日志 (commit log)。消息以追加的方式写入分区,然后以陷入先出的顺序读取。Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上,这意味着一个 Topic 可以横跨多个服务器,以提供比单个服务器更强大的性能。
由于一个 Topic 包含多个分区,因此无法在整个 Topic 范围内保证消息的顺序性,但可以保证消息在单个分区内的顺序性。

1. 生产者
生产者负责创建消息。一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过自定义分区器来实现。
2. 消费者
消费者是消费者群组的一部分,消费者负责消费消息。消费者可以订阅一个或者多个主题,并按照消息生成的顺序来读取它们。消费者通过检查消息的偏移量 (offset) 来区分读取过的消息。偏移量是一个不断递增的数值,在创建消息时,Kafka 会把它添加到其中,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。

一个分区只能被同一个消费者群组里面的一个消费者读取,但可以被不同消费者群组中所组成的多个消费者共同读取。多个消费者群组中消费者共同读取同一个主题时,彼此之间互不影响。

一个独立的 Kafka 服务器被称为 Broker。Broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。Broker 为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘的消息。
Broker 是集群 (Cluster) 的组成部分。每一个集群都会选举出一个 Broker 作为集群控制器 (Controller),集群控制器负责管理工作,包括将分区分配给 Broker 和监控 Broker。
在集群中,一个分区 (Partition) 从属一个 Broker,该 Broker 被称为分区的首领 (Leader)。一个分区可以分配给多个 Brokers,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个 Broker 失效,其他 Broker 可以接管领导权。

官网下载
百度网盘分享
链接:https://pan.baidu.com/s/1is099wFQwP9WaLmLMWedXg
提取码:yyds
tar -zxvf kafka_2.12-3.0.0.tgz
配置文件server.properties修改以下几处
broker.id=2 (每一台的不能一样用来唯一标识一台主机)
存储数据的路径
log.dirs=/home/bigdata/module/kafka_2.12-3.0.0/data
配置zookeeper的存储数据路径
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
分发到集群的其他机器
./xsync ../module/kafka_2.12-3.0.0/
xsync可以看我的shell专栏
其他机器分别修改成
broker.id=3
broker.id=4
集群管理脚本
vi kafka.sh
- #!/bin/bash
-
- case $1 in
- "start"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------启动 $i Kafka-------"
- ssh $i "/home/bigdata/module/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /home/bigdata/module/kafka_2.12-3.0.0/config/server.properties "
- done
- };;
- "stop"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------停止 $i Kafka-------"
- ssh $i "/home/bigdata/module/kafka_2.12-3.0.0/bin/kafka-server-stop.sh stop"
- done
- };;
- esac
chmod 744 kafka.sh
启动之前先启动zookeeper
./kafka.sh start
查看zookeeper的节点数据如果可以看到以下数据说明集群启动成功(下面的工具在我的zookeeper专栏的百度网盘连接里面)

创建主题
bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 3 --partitions 1 --topic first
列出出题
bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
查看主题详细信息
bin/kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic first
修改主题的分区(只能增加不能减少)
bin/kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 3

./kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
./kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
首先介绍一下 Kafka 生产者发送消息的过程:

acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
buffer.memory
设置生产者内存缓冲区的大小,默认是33554432,32M。
compression.type
默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。
retries
发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误,默认是2147483647。
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,默认是16384,16k。
linger.ms
该参数制定了生产者在发送批次之前等待更多消息加入批次的时间,默认是0ms。
max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。
timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
max.request.size
该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。
receive.buffer.bytes & send.buffer.byte
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
先构建一个消费者
./kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
定义一个生产者CustomProducer
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 100; i++) {
- producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
- }
- producer.close();
- }
- }
带回调函数的生产者
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 100; i++) {
- producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.offset());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- producer.close();
- }
- }
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 100; i++) {
- producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
- }
- producer.close();
- }
- }
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,会使用粘性分区,开始会随机的选择一个分区,并尽可能的使用这个分区,待batch满了或者是已经完成,那么就会在随机一个分区(和上一次不同的分区)
指定分区测试
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<String, String>("first",1, Integer.toString(i), Integer.toString(i)), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.partition());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- producer.close();
- }
- }
指定key的情况测试
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 5; i++) {
- producer.send(new ProducerRecord<String, String>("first", "io","msg"+i), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.partition());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- producer.close();
- }
- }
粘性分区测试
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- props.put("acks", "all");
-
- //重试次数
- props.put("retries", 1);
-
- //批次大小
- props.put("batch.size", 16384);
-
- //等待时间
- props.put("linger.ms", 1);
-
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432);
-
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 500; i++) {
- producer.send(new ProducerRecord<String, String>("first", "msg"+i), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.partition());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- producer.close();
- }
- }
自定义分区器
- /**
- * 自定义分区器
- */
- public class CustomPartitioner implements Partitioner {
-
- private int batchSize;
-
- @Override
- public void configure(Map<String, ?> configs) {
- /*从生产者配置中获取分数线*/
- batchSize = (Integer) configs.get("batch.size");
- System.out.println(batchSize);
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value,
- byte[] valueBytes, Cluster cluster) {
- return 0;
- }
-
- @Override
- public void close() {
- System.out.println("分区器关闭");
- }
- }
使用
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.study.hadoop.kafka.CustomPartitioner");
- //批次大小
- props.put("batch.size", 16384);
- //等待时间
- props.put("linger.ms", 1);
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432*2);
- //消息压缩
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
消息的完全可靠

At Least Once
将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。
At Most Once
将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义
Exactly Once(精准一次)
At Least Once + 幂等性 = Exactly Once
幂等性的原理
开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
开始幂等性enable.idempotence设置为true(默认就是开启的)

注意:
PID重启就会变化,那么也就是说如果生产者重新启动,还是会出现重复的数据
如果要完全保证精准一次性,那么就还需要加入事务
开启事务必须先开启幂等性
下图是事务的原理

使用事务的例子代码
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- //批次大小
- props.put("batch.size", 16384);
- //等待时间
- props.put("linger.ms", 1);
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432 * 2);
- //消息压缩
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- //初始化事务
- producer.initTransactions();
- //开启事务
- producer.beginTransaction();
-
- try {
- for (int i = 0; i < 500; i++) {
- producer.send(new ProducerRecord<String, String>("first", "msg" + i), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.partition());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- // int intersection=9/0;
- //提交事务
- producer.commitTransaction();
- } catch (Exception e) {
- producer.abortTransaction();
- } finally {
- producer.close();
- }
- }
- }
分区内乱序的原因:
默认生产者会缓存每一个broker的5个请求
比如:
5 4 3 2 1
在缓存发送到broker的过程是并行执行的,也就是一块发送,如果发送3出现了异常,然后经过重试以后,得到的数据就是
3 5 4 2 1
那么就出现了乱序
乱序解决办法:
在1.x版本之前的时候可以设置max.in.flight.requests.per.connection=1(默认值是5),也就是每一个分区只缓存一个生产者的请求
在1.x版本之后:
如果没有开启幂等性,那么max.in.flight.requests.per.connection=1
如果开启的幂等性,只要max.in.flight.requests.per.connection小于等于5就可以,原因是幂等性会排序后在落盘


Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

查看下主题的分区分配情况
bin/kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic first

对于要重分配的主题生成配置文件,下面以first主题为例
vi topics-to-move.json
- {
- "topics": [
- {"topic": "first"}
- ],
- "version": 1
- }
执行下面的操作,生成重分区计划(--broker-list "2,3,4" 表示要把分区分配到那些broker,后面的数字是brokerid)
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3,4" --generate
生成下面的执行计划(下面我把执行计划文件做了修改)
vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,4,3],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[4,3,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,2,4],"log_dirs":["any","any","any"]}]}
没有执行之前分区0 leader4 副本 4,3,2,我修改成了2,4,3

执行以后
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
校验执行是否成功
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

如图修改分区的位置成功
总结:
也就是根据主题生成配置文件以后,自己就可以根据机器的情况,修改配置文件的信息,比如把主题分配到指定的broker,指定的副本到哪个broker都可以定制化




cd /home/bigdata/module/kafka_2.12-3.0.0/data/first-0

查看日志文件内容
/home/bigdata/module/kafka_2.12-3.0.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index


上面的绝对Offset实际是没有的,只是为了便于理解
相关的配置参数
以 segment 中所有记录中的最大时间戳作为该文件时间戳。也就是002不会删除
2)compact 日志压缩


页缓存 + 零拷贝技术






注意:
在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组
消费者代码
- public class CustomConsumer {
- public static void main(String[] args) {
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop102:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- // 配置消费者组(组名任意起名) 必须
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- // 创建消费者对象
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<String, String>(properties);
- // 注册要消费的主题(可以消费多个主题)
- ArrayList<String> topics = new ArrayList<>();
- topics.add("first");
- kafkaConsumer.subscribe(topics);
- // 拉取数据打印
- while (true) {
- // 设置 1s 中消费一批数据
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 打印消费到的数据
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
生产者使用shell
./kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
- public class CustomConsumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- // 配置消费者组(必须),名字可以任意起
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 消费某个主题的某个分区数据
- ArrayList<TopicPartition> topicPartitions = new
- ArrayList<>();
- topicPartitions.add(new TopicPartition("first", 2));
- kafkaConsumer.assign(topicPartitions);
- while (true){
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }

消费者组id相同那么他们就在一个消费者组
消费者代码
- public class CustomConsumer2 {
- public static void main(String[] args) {
- // 1.创建消费者的配置对象
- Properties properties = new Properties();
- // 2.给消费者配置对象添加参数
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop102:9092");
- // 配置序列化 必须
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- // 配置消费者组(组名任意起名) 必须
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- // 创建消费者对象
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<String, String>(properties);
- // 注册要消费的主题(可以消费多个主题)
- ArrayList<String> topics = new ArrayList<>();
- topics.add("first");
- kafkaConsumer.subscribe(topics);
- // 拉取数据打印
- while (true) {
- // 设置 1s 中消费一批数据
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 打印消费到的数据
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
生产者代码
- public class CustomProducer {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
-
- //kafka集群,broker-list
- props.put("bootstrap.servers", "hadoop102:9092");
-
- //批次大小
- props.put("batch.size", 16384);
- //等待时间
- props.put("linger.ms", 1);
- //RecordAccumulator缓冲区大小
- props.put("buffer.memory", 33554432 * 2);
- //消息压缩
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
- //初始化事务
- producer.initTransactions();
- //开启事务
- producer.beginTransaction();
-
- try {
- for (int i = 0; i < 500; i++) {
- producer.send(new ProducerRecord<String, String>("first", i+"","msg" + i), new Callback() {
-
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println("success->" + metadata.partition());
- } else {
- exception.printStackTrace();
- }
- }
- });
- }
- // int intersection=9/0;
- //提交事务
- producer.commitTransaction();
- Thread.sleep(1);
- } catch (Exception e) {
- producer.abortTransaction();
- } finally {
- producer.close();
- }
- }
- }
效果如图

配置参数
partition.assignment.strategy
默认策略就是Range

修改分区数
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7
总结:

总结:
尽量随机均匀分部在每一个broker(生产环境尽量用它)
- // 是否自动提交 offset
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
- // 提交 offset 的时间周期 1000ms,默认 5s
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
- // 是否自动提交 offset
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
- // 提交 offset 的时间周期 1000ms,默认 5s
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
同步手动提交
kafkaConsumer.commitSync();
异步手动提交
kafkaConsumer.commitAsync();
- public class CustomConsumerSeek {
- public static void main(String[] args) {
- // 0 配置信息
- Properties properties = new Properties();
- // 连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop102:9092");
- // key value 反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
- // 1 创建一个消费者
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 2 订阅一个主题
- ArrayList<String> topics = new ArrayList<>();
- topics.add("first");
- kafkaConsumer.subscribe(topics);
- Set<TopicPartition> assignment= new HashSet<>();
- while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
- // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
- for (TopicPartition tp: assignment) {
- kafkaConsumer.seek(tp, 100);
- }
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
注意:
每一次启动都会在指定位置消费,与消费者组无关
- public class CustomConsumerForTime {
- public static void main(String[] args) {
- Properties properties = new Properties();
- // 连接
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "hadoop102:9092");
- // key value 反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
- // 1 创建一个消费者
- KafkaConsumer<String, String> kafkaConsumer = new
- KafkaConsumer<>(properties);
- // 2 订阅一个主题
- ArrayList<String> topics = new ArrayList<>();
- topics.add("first");
- kafkaConsumer.subscribe(topics);
- Set<TopicPartition> assignment = new HashSet<>();
- while (assignment.size() == 0) {
- kafkaConsumer.poll(Duration.ofSeconds(1));
- // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
- assignment = kafkaConsumer.assignment();
- }
- HashMap<TopicPartition, Long> timestampToSearch = new
- HashMap<>();
- // 封装集合存储,每个分区对应一天前的数据
- for (TopicPartition topicPartition : assignment) {
- timestampToSearch.put(topicPartition,
- System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
- }
- // 获取从 1 天前开始消费的每个分区的 offset
- Map<TopicPartition, OffsetAndTimestamp> offsets =
- kafkaConsumer.offsetsForTimes(timestampToSearch);
- // 遍历每个分区,对每个分区设置消费时间。
- for (TopicPartition topicPartition : assignment) {
- OffsetAndTimestamp offsetAndTimestamp =
- offsets.get(topicPartition);
- // 根据时间指定开始消费的位置
- if (offsetAndTimestamp != null){
- kafkaConsumer.seek(topicPartition,
- offsetAndTimestamp.offset());
- }
- }
- // 3 消费该主题数据
- while (true) {
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, String> consumerRecord :
- consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
为什么要事务?


相关参数
max.poll.records
一次 poll 拉取数据返回消息的最大条数,默认是 500 条
安装mysql
这里为了简便,我使用的是docker
- sudo yum install docker
- sudo systemctl start docker
- sudo systemctl enable docker
注意: 要保证时间和客户端的同步不然又证书问题,如果客户端的时间和mysql的时间不对会报错
- sudo yum install -y ntpdate
- sudo ntpdate 120.24.81.91
冲突时使用这个
sudo sudo systemctl stop ntp
镜像加速
- sudo mkdir -p /etc/docker
- sudo tee /etc/docker/daemon.json <<-'EOF'
- {
- "registry-mirrors": ["https://obnqc505.mirror.aliyuncs.com"]
- }
- EOF
- sudo systemctl daemon-reload
- sudo systemctl restart docker
启动一个mysql容器并且开机自启
sudo docker run -itd --name hive-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root --restart=always mysql:5.7
kafka
修改kafka的配置
vi kafka-server-start.sh
把
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- fi
修改成
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
- export JMX_PORT="9999"
- #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
- fi
分发配置文件
./xsync /home/bigdata/module/kafka_2.12-3.0.0/bin/kafka-server-start.sh
xsync分发脚本在我的shell专栏
重启kafka集群
tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
cd kafka-eagle-bin-2.0.8/
tar -zxvf efak-web-2.0.8-bin.tar.gz
cd efak-web-2.0.8/
vi /home/bigdata/module/kafka-eagle-bin-2.0.8/efak-web-2.0.8/conf/system-config.properties
修改的地方(后面的kafka是kafka存储在zookeeper的路径更具自己的情况进行更改)
- #有多个集群就配置多个就是一个就删除cluster2的配置
- efak.zk.cluster.alias=cluster1
- cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
- # offset 保存在 kafka
- cluster1.efak.offset.storage=kafka
-
- # 配置 mysql 连接
- efak.driver=com.mysql.jdbc.Driver
- efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- efak.username=root
- efak.password=root
配置环境变量
sudo vi /etc/profile.d/my_env.sh
- # kafkaEFAK
- export KE_HOME=/home/bigdata/module/kafka-eagle-bin-2.0.8/efak-web-2.0.8
- export PATH=$PATH:$KE_HOME/bin
source /etc/profile.d/my_env.sh
启动监控应用
ke.sh start
访问

现在不太常用,以后常用了在研究
本专栏参考了尚硅谷kafka入门到精通3.0.0写