• kafka


    1. kafka介绍

    kafka最早诞生就是为了收集用户活跃数、用户页面访问等,并不是作为mq的一般需求而诞生的。
    早期的ActiveMQ可以满足,但是经常出翔阻塞、服务不可用的状况。
    为了更好满足需求,于是乎就诞生了kafka。
    
    • 1
    • 2
    • 3

    1.1 应用场景

    • 大数据领域
    • 数据收集
    • 流计算集成

    1.1.1 消息传递

    消息传递就是发送数据,作为TCP HTTP 或者RPC,实现异步、削峰、解耦。因为kafka的吞吐量更高,所以在超大量数据的情况下优势明显。
    
    • 1
    1. 用户行为收集,监控、实时处理、报表
    2. 实现日志聚合,讲分布式的日志可以通过kafka配合其他组件实现日志监控 ELK
    3. 应用指标监控,监控业务数据或者运维相关指标(cpu、内存、磁盘)

    1.1.2 数据集成和流计算

    数据集成是指把kafka的数据导入Hadpoop、HBase等离线数据仓库,实现数据分析。
    流是指作为没有边界、源源不断产生的数据,流计算是指对stream做实时计算。
    
    • 1
    • 2

    2. kafka架构分析

    在这里插入图片描述

    2.1 Broker介绍

    Broker作为kafka的一个组件,主要是用户存储和转发消息的,它做的事情就像是中介,kafka的一台服务器就是一个broker,对外开发发送、接收消息的端口默认为9092。生产者和消费者都需要和broker建立连接才能收发消息。
    
    • 1

    2.2 Record(message)消息介绍

    客户端之间传输的数据叫做消息,或者叫做记录(一个名称而已)。在客户端中,Record一般是一个kv键值对。
    生产者封装消息类 ProducerRecord,消费者封装类是ConsumerRecord。(感觉message叫的习惯一点)
    有数据传输就会涉及到传输格式问题,所以消息是可以被格式化的。kafka官方提供的java api是有提供对消息序列化和反序列的。
    
    • 1
    • 2
    • 3

    2.3 生产者

    发送消息的一方叫做生产者。
    消息发送可以是指定批次大小发送,也可以是在若干时间内批次发送。(当固定时间内并没有足够的批次消息)
    
    • 1
    • 2

    2.4 消费者

    一般来说,所有的组件都会面临者对数据获取方式的选择,一种是pull,一种是push,这里其实比较奇怪,两个动作,在同一时间内说出来,主语是改变了的。
    push是说Broker主动去推送数据给消费者,主语是Broker服务器。
    pull是说消费者主动去找服务器Broker拉取数据,主语是消费者。
    kafka只有pull模式,具体原因相信多数人能找到答案。减少不利的模式选择,才是好的做法。
    消费者可以自己控制一次获取多少消息,默认是500

    2.5 topic

    生产者和消费者对应的生成消费数据的关系,通过topic来关联,这个概念其实多数开发者都是清除的,但是没有像rabbitMQ一样那么的复杂,kafka是全字符串匹配。消费者可以同时绑定消费多个topic,但是一般不建议。
    
    • 1

    2.6 Partition

    kafka引入了一个分区概念,一个topic的消息可以放在不同的分区中存放。每个topic建议设置分区数量为boker的数量。
    每个partiton有一个物理目录。在配置的数据目录下(日志就是数据):/tmp/kafka-logs/
    > mytopic-0
    > mytopic-1
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2.7 Partition副本Replica

    partition数据只有一份的话,如果宕机或者网络故障,那么对应的分区数据就不可用。这个时候就需要对partition做副本。有副本就有选举,这个功能是借助zk完成的。
    
    • 1

    2.8 Segment

    kafka的数据是放在后缀 .log的文件里面。如果一个partition只有一个log文件,消息不断的追加,这个log文件就会越来越大,这个时候要检索效率就很低。
    于是就把partition切分开来,切分出来的单位就叫做段segment。
    默认的存储路径/tmp/kafka-logs/
    每个segment至少有1个数据文件和2个索引文件,这3个文件是成套出现的。
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    2.9 Consumer Group

    如果生产者产生的消息速度过快,就会造成broker的堆积,影响broker的性能。
    consumer group消费族概念,在代码中通过group id来配置。消费同一个topic的消息不一定时同一个组,只有group id相同的消费者才是同一个消费组。
    
    • 1
    • 2

    由于分区数量和消费者的数量可能存在差异:当分区数量大于消费者数据量时,部分消费者可能出现同时消费多个partition。但是如果消费者数量大于partiton的数据量,却不会出校多个消费者消费相同的partiton。在同一消费组中,是不能同时消费同一个partiton

    2.10 consumer offset

    由于partiton中的数据是顺序切消费不删除,所以在消费之断开链接后重新继续消费的时候,需要延续上次的消费位置来继续消费。
    kafka对消息进行了编号,用来标志唯一消息。这个编号我们把它叫做offset偏移量。
    offset记录者吓一跳将要发送给consumer的消息序号。
    这个消费者跟partition之间的偏移量没有保存在zk,而是直接保存在服务器中的。
    
    • 1
    • 2
    • 3
    • 4

    3. kafka java 开发

    3.1 非springboot项目

    • 依赖一如
    <dependency>
    	<groupId>org.apache.kafkagroupId>
    	<artifactId>kafka-clientsartifacId>
    	<version>2.6.0vsersion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 消费者
    public class SimpleConsumer { 
    	public static void main(String[] args) { 
    		Properties props= new Properties();
    		props.put("bootstrap.servers","192.168.8.147:9092"); 
    		props.put("group.id","gp-test-group"); // 是否自动提交偏移量,只有 commit 之后才更新消费组的 
    		offset props.put("enable.auto.commit","true"); // 消费者自动提交的间隔 
    		props.put("auto.commit.interval.ms","1000"); // 从最早的数据开始消费 earliest | latest | none 
    		props.put("auto.offset.reset","earliest"); 
    		props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    		props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    		KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props); // 订阅队列
    		consumer.subscribe(Arrays.asList("mytopic")); 
    		try {
    			while (true){ 
    				ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
    			 	for (ConsumerRecord<String,String> record:records){ 
    				 	System.out.printf("offset = %d ,key =%s, value= %s, partition= %s%n",record.offset(),record.key(),record.value(),record.partition()); 
    				 	} 
    		 	} 
    		}finally { 
    			consumer.close(); 
    		} 
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 生产者
    public class SimpleProducer { 
    	public static void main(String[] args) { 
    		Properties pros=new Properties(); 
    		pros.put("bootstrap.servers","192.168.8.147:9092");
    	 	pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    	  	pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    	  	// 0 发出去就确认 | 1 leader 落盘就确认| all 所有 Follower 同步完才确认 
    	  	pros.put("acks","1"); 
    	  	// 异常自动重试次数 
    	  	pros.put("retries",3); 
    	  	// 多少条数据发送一次,默认 16K
    	   	pros.put("batch.size",16384); 
    	   	// 批量发送的等待时间 
    	  	pros.put("linger.ms",5); 
    		// 客户端缓冲区大小,默认 32M,满了也会触发消息发送 
    		pros.put("buffer.memory",33554432); 
    		// 获取元数据时生产者的阻塞时间,超时后抛出异常 
    		pros.put("max.block.ms",3000); 
    		Producer<String,String> producer = new KafkaProducer<String,String>(pros); 
    		for (int i =0 ;i<100;i++) { 
    			producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i))); 
    			// System.out.println("发送:"+i); 
    		}
    		producer.close(); 
    		} 
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3.2 springboot项目

    • 依赖
    <dependency> 
    	<groupId>org.springframework.kafkagroupId> 
    	<artifactId>spring-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 配置
    server.port=7271 
    spring.kafka.bootstrap-servers=192.168.8.147:9092 
    
    # producer 
    spring.kafka.producer.retries=1 
    spring.kafka.producer.batch-size=16384 
    spring.kafka.producer.buffer-memory=33554432 
    spring.kafka.producer.acks=1
    spring.kafka.producer.properties.linger.ms=5 
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 
    
    # consumer 
    spring.kafka.consumer.auto-offset-reset=earliest 
    spring.kafka.consumer.enable-auto-commit=true 
    spring.kafka.consumer.auto-commit-interval=1000 
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 消费者
    @Component 
    public class ConsumerListener { 
    	@KafkaListener(topics = "springboottopic",groupId = "springboottopic-group") 
    	public void onMessage(String msg){ 
    		System.out.println("----收到消息:"+msg+"----");
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 生产者
    @Component 
    public class KafkaProducer { 
    	@Autowired 
    	private KafkaTemplate<String,Object> kafkaTemplate; 
    	public String send(@RequestParam String msg){ 
    		kafkaTemplate.send("springboottopic", msg); 
    		return "ok"; 
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注入模板方法 KafkaTemplate 发送消息。 注意 send 方法有很多重载。异步回调 ListenableFuture。

  • 相关阅读:
    【C++编程语言】之类和对象---类对象作为类成员
    我的UI自动化测试的感悟
    OpenMMLap之Hook机制详解
    Jmeter的学习
    【深度学习】6-卷积过程中数据的结构变化
    Redis之cluster集群
    基于javaweb+mysql的甜品冰淇淋奶茶店网上订餐系统(前台、后台)
    Springboot添加静态资源映射addResourceHandlers,可实现url访问
    保洁企业怎么实施智能软件增加客户的互动
    力扣(LeetCode)10. 正则表达式匹配(C++)
  • 原文地址:https://blog.csdn.net/weixin_43704834/article/details/126164227