kafka最早诞生就是为了收集用户活跃数、用户页面访问等,并不是作为mq的一般需求而诞生的。
早期的ActiveMQ可以满足,但是经常出翔阻塞、服务不可用的状况。
为了更好满足需求,于是乎就诞生了kafka。
消息传递就是发送数据,作为TCP HTTP 或者RPC,实现异步、削峰、解耦。因为kafka的吞吐量更高,所以在超大量数据的情况下优势明显。
- 用户行为收集,监控、实时处理、报表
- 实现日志聚合,讲分布式的日志可以通过kafka配合其他组件实现日志监控 ELK
- 应用指标监控,监控业务数据或者运维相关指标(cpu、内存、磁盘)
数据集成是指把kafka的数据导入Hadpoop、HBase等离线数据仓库,实现数据分析。
流是指作为没有边界、源源不断产生的数据,流计算是指对stream做实时计算。
Broker作为kafka的一个组件,主要是用户存储和转发消息的,它做的事情就像是中介,kafka的一台服务器就是一个broker,对外开发发送、接收消息的端口默认为9092。生产者和消费者都需要和broker建立连接才能收发消息。
客户端之间传输的数据叫做消息,或者叫做记录(一个名称而已)。在客户端中,Record一般是一个kv键值对。
生产者封装消息类 ProducerRecord,消费者封装类是ConsumerRecord。(感觉message叫的习惯一点)
有数据传输就会涉及到传输格式问题,所以消息是可以被格式化的。kafka官方提供的java api是有提供对消息序列化和反序列的。
发送消息的一方叫做生产者。
消息发送可以是指定批次大小发送,也可以是在若干时间内批次发送。(当固定时间内并没有足够的批次消息)
一般来说,所有的组件都会面临者对数据获取方式的选择,一种是pull,一种是push,这里其实比较奇怪,两个动作,在同一时间内说出来,主语是改变了的。
push是说Broker主动去推送数据给消费者,主语是Broker服务器。
pull是说消费者主动去找服务器Broker拉取数据,主语是消费者。
kafka只有pull模式,具体原因相信多数人能找到答案。减少不利的模式选择,才是好的做法。
消费者可以自己控制一次获取多少消息,默认是500
生产者和消费者对应的生成消费数据的关系,通过topic来关联,这个概念其实多数开发者都是清除的,但是没有像rabbitMQ一样那么的复杂,kafka是全字符串匹配。消费者可以同时绑定消费多个topic,但是一般不建议。
kafka引入了一个分区概念,一个topic的消息可以放在不同的分区中存放。每个topic建议设置分区数量为boker的数量。
每个partiton有一个物理目录。在配置的数据目录下(日志就是数据):/tmp/kafka-logs/
> mytopic-0
> mytopic-1
partition数据只有一份的话,如果宕机或者网络故障,那么对应的分区数据就不可用。这个时候就需要对partition做副本。有副本就有选举,这个功能是借助zk完成的。
kafka的数据是放在后缀 .log的文件里面。如果一个partition只有一个log文件,消息不断的追加,这个log文件就会越来越大,这个时候要检索效率就很低。
于是就把partition切分开来,切分出来的单位就叫做段segment。
默认的存储路径/tmp/kafka-logs/
每个segment至少有1个数据文件和2个索引文件,这3个文件是成套出现的。
如果生产者产生的消息速度过快,就会造成broker的堆积,影响broker的性能。
consumer group消费族概念,在代码中通过group id来配置。消费同一个topic的消息不一定时同一个组,只有group id相同的消费者才是同一个消费组。
由于分区数量和消费者的数量可能存在差异:当分区数量大于消费者数据量时,部分消费者可能出现同时消费多个partition。但是如果消费者数量大于partiton的数据量,却不会出校多个消费者消费相同的partiton。在同一消费组中,是不能同时消费同一个partiton
由于partiton中的数据是顺序切消费不删除,所以在消费之断开链接后重新继续消费的时候,需要延续上次的消费位置来继续消费。
kafka对消息进行了编号,用来标志唯一消息。这个编号我们把它叫做offset偏移量。
offset记录者吓一跳将要发送给consumer的消息序号。
这个消费者跟partition之间的偏移量没有保存在zk,而是直接保存在服务器中的。
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifacId>
<version>2.6.0vsersion>
dependency>
public class SimpleConsumer {
public static void main(String[] args) {
Properties props= new Properties();
props.put("bootstrap.servers","192.168.8.147:9092");
props.put("group.id","gp-test-group"); // 是否自动提交偏移量,只有 commit 之后才更新消费组的
offset props.put("enable.auto.commit","true"); // 消费者自动提交的间隔
props.put("auto.commit.interval.ms","1000"); // 从最早的数据开始消费 earliest | latest | none
props.put("auto.offset.reset","earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props); // 订阅队列
consumer.subscribe(Arrays.asList("mytopic"));
try {
while (true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records){
System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n",record.offset(),record.key(),record.value(),record.partition());
}
}
}finally {
consumer.close();
}
}
}
public class SimpleProducer {
public static void main(String[] args) {
Properties pros=new Properties();
pros.put("bootstrap.servers","192.168.8.147:9092");
pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 0 发出去就确认 | 1 leader 落盘就确认| all 所有 Follower 同步完才确认
pros.put("acks","1");
// 异常自动重试次数
pros.put("retries",3);
// 多少条数据发送一次,默认 16K
pros.put("batch.size",16384);
// 批量发送的等待时间
pros.put("linger.ms",5);
// 客户端缓冲区大小,默认 32M,满了也会触发消息发送
pros.put("buffer.memory",33554432);
// 获取元数据时生产者的阻塞时间,超时后抛出异常
pros.put("max.block.ms",3000);
Producer<String,String> producer = new KafkaProducer<String,String>(pros);
for (int i =0 ;i<100;i++) {
producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));
// System.out.println("发送:"+i);
}
producer.close();
}
}
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
server.port=7271
spring.kafka.bootstrap-servers=192.168.8.147:9092
# producer
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.properties.linger.ms=5
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
@Component
public class ConsumerListener {
@KafkaListener(topics = "springboottopic",groupId = "springboottopic-group")
public void onMessage(String msg){
System.out.println("----收到消息:"+msg+"----");
}
}
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public String send(@RequestParam String msg){
kafkaTemplate.send("springboottopic", msg);
return "ok";
}
}
注入模板方法 KafkaTemplate 发送消息。 注意 send 方法有很多重载。异步回调 ListenableFuture。