<1> 创建mian线程
<2> 调用send() 方法
<3> 经过拦截器interceptors ,生产中用的较少
<4> 经过序列化器 serializer ,数据量较少
<5> 经过分区器 partitioner ,判断发送到哪一个区分,
<6>数据发送到缓冲区(双端队列record accumulator),一个分区创建一个队列,都是在内存中完成,内存总大小为32M,数据 发送每一个批次大小默认为16k
batch.size: 只有数据积累到了batch.size 之后,sender 才会发送数据,默认为16k
linger.ms:如果数据迟迟未到到batch.size ,sender等待linger.ms设置的时间到了之后会发送数据,单位ms。默认是0ms。表示没有延迟。
<7> sender线程,sender从缓冲区(record accumulator)中读取,发送到kafka集群。默认每个broker节点最多缓存5个请求。
<8> selector 创建消息发送链路
<9> 收到消息进行副本同步,然后会进行应答。应答级别如下
0:生产者发送过来消息后,不需要等待数据落盘应答。
1:生产者发送过来消息后,Leader收到数据后就进行应答。
-1(all):生产者发送过来消息后,Leader和ISR队列里面的所有节点收起数据后应答,-1和all等价
<10> 应答成功后,清理请求,清理缓冲区,应答不成功 sender进行重试,重试的次数为int的最大值。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.kafkagroupId>
<artifactId>KafkaDemoartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>3.0.0version>
dependency>
dependencies>
project>
package com.kafkademo.case1.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author : hechaogao
* @createTime : 2022/9/19 17:09
*/
public class CustomProducer {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic";
public static void main(String[] args) {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3. 发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, "test message no : " + i));
}
//4. 关闭资源
kafkaProducer.close();
}
}
package com.kafkademo.case1.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author : hechaogao
* @createTime : 2022/9/19 21:22
*
* 带回调函数的异步发送消息案例
*/
public class CustomProducerCallBack {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic";
public static void main(String[] args) {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3. 发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, "test message no : " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("topic : "+recordMetadata.topic()+", partition : "+recordMetadata.partition());
}
}
});
}
//4. 关闭资源
kafkaProducer.close();
}
}
当缓冲区(Record Accumulator)中的数据全部发送到Kafka集群之后,外部数据再写入缓冲区(Record Accumulator)。
package com.kafkademo.case1.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @author : hechaogao
* @createTime : 2022/9/19 17:09
*
* 同步发送消息案例
*/
public class CustomProducerSync {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3. 发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, "test message no : " + i)).get();
}
//4. 关闭资源
kafkaProducer.close();
}
}
默认的分区器 DefaultPartitioner
package com.kafkademo.case2;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @author : hechaogao
* @createTime : 2022/9/20 10:21
*/
public class MyPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition;
// 获取数据
String msgValues = value.toString();
if(msgValues.contains("test")){
partition = 0;
}else{
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.kafkademo.case2;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author : hechaogao
* @createTime : 2022/9/19 21:22
*
* 带回调函数的异步发送消息案例
*/
public class CustomProducerMyPartition {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic"
public static void main(String[] args) {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1.3 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafkademo.case2.MyPartition");
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3. 发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, " message no : " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("topic : "+recordMetadata.topic()+", partition : "+recordMetadata.partition());
}
}
});
}
//4. 关闭资源
kafkaProducer.close();
}
}
package com.kafkademo.case3;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author : hechaogao
* @createTime : 2022/9/20 10:52
*
* 修改参数提高生产者吞吐量案例
*/
public class CustomProducerParamenters {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic";
public static void main(String[] args) {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 缓冲区大小 修改为 32M , 采用默认值
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 批次大小 默认为16k , 采用默认值
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms 默认为0ms 改为 1ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//压缩 压缩方式 采用snappy
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3. 发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, "test message no : " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("topic : "+recordMetadata.topic()+", partition : "+recordMetadata.partition());
}
}
});
}
//4. 关闭资源
kafkaProducer.close();
}
}
生产者发送过来消息后,不需要等待数据落盘应答。
生产者发送过来消息后,Leader收到数据后就进行应答。
生产者发送过来消息后,Leader和ISR队列里面的所有节点收起数据后应答,-1和all等价
Leader 收到数据,所有Follower都开始同步数据,但是有个一Follower因为某种故障,迟迟不能和Leader进行同步数据,则消息不能进行应答,那么这个问题咋那么解决?
ISR :Leader 维护了一个动态的in-sync-replica set (ISR) ,意为和Leader 保持同步的Follower 和Leader集合(leader:0 ,isr:0,1,2)
如果Follower长时间没有向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该事件阈值由replica.lag.time.max.ms 参数设定,默认为30s。例如 2超时。I
SR为0,1
这样就不用等长期联系不上或者已经出故障的节点。
如果分区副本设置为l个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader: 0, isr:0)。
当Leader 接收到数据,并进行Follower 数据同步之后,在消息应答之前 Leader发生故障,此时此时未进行消息应答,集群会重新选举产证Leader,上一条消息会重复发送给Ledaer ,然后重复发送到Follower ,这样就行接收到两份数据,导致消息重复。
ACK级别设置为 -1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
package com.kafkademo.case4;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author : hechaogao
* @createTime : 2022/9/19 17:09
*
* Kafka 生产者事务案例
*/
public class CustomProducerTransactions {
private static final String HOST = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
private static final String TOPIC = "test_topic";
public static void main(String[] args) {
//1. 配置
Properties properties = new Properties();
//1.1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
//1.2 指定对应的的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定 唯一的 Transaction id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01");
//2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//3.1 初始化事务
kafkaProducer.initTransactions();
//3.2 开启事务
kafkaProducer.beginTransaction();
//4. 发送数据
try {
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, "test message no : " + i));
}
//int i = 1 / 0;
//3.3 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
//3.4 放弃事务
kafkaProducer.abortTransaction();
} finally {
//5. 关闭资源
kafkaProducer.close();
}
}
}
max.in.flight.requests.per.connection需要设置为1(不需要考虑是否开启幂等性)。
(1)未开启幂等性 max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的