1.kafka是一个高吞吐的分布式消息系统,是一个消息队列。
2.生产者负责生产数据 ,消费者负责消费数据
3.特点:
生存者消费者模型,FIFO
高性能:单节点支持上千个客户端,百MB/s吞吐
持久性:消息直接持久化在普通磁盘上且性能好
分布式:数据副本冗余、流量负载均衡、可扩展
很灵活:消息长时间持久化+Client维护消费状态
4.性能好的原因
kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写
kafka还用了sendFile的0拷贝技术,提高速度
而且还用到了批量读写,一批批往里写,64K为单位
- # 解压
- tar -zxvf kafka_2.11-1.0.0.tgz -C ../
- mv kafka_2.11-1.0.0 kafka-1.0.0
-
-
- # 配置环境变量
- vim /etc/profile
-
- export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
- export PATH=$PATH:$KAFKA_HOME/bin
-
- source /etc/profile
1.修改config目录下的server.properties文件
- broker.id=0 每一个节点broker.id 要不一样
- zookeeper.connect=master:2181,node1:2181,node2:2181/kafka
- log.dirs=/usr/local/soft/kafka-1.0.0/data 数据存放的位置
- # 同步kafka文件
- scp -r kafka-1.0.0/ node1:`pwd`
- scp -r kafka-1.0.0/ node2:`pwd`
-
- # 将master中的而环境变量同步到node1和node2中
- scp /etc/profile node1:/etc/
- scp /etc/profile node2:/etc/
-
- # 在ndoe1和node2中执行source
- source /etc/profile
- # node1
- broker.id=1
- # node2
- broker.id=2
1、需要启动zookeeper, kafka使用zk保存元数据
需要在每隔节点中执行启动的命令
zkServer.sh start
2.每个节点中都要启动(去中心化的架构)
# -daemon后台启动
/usr/local/soft/kafka-1.0.0/config/server.properties:配置文件的路径要写全
kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties
3.测试
任意一个节点输入
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic shujia
topic shujia:只是名字而已可以随便取
另一个节点输入
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic shujia
但是主要最后这个节点后面的取名也跟前面的一样
4.数据存放在data目录下 但是不知道是哪个节点下的
5.默认生产者产生的数据可以保存7天,消费者可以在这7天内使用这些数据,7天一过,数据自动销毁
1.producer:消息生存者
2.consumer:消息消费者
3.broker:kafka集群的server:负责处理消息读、写请求,存储消息
4.topic:消息队列/分类
5.broker就是代理,在kafka cluster这一层这里,其实里面是有很多个broker
6.topic就相当于queue
1.一个topic分成多个partition。一个topic可以看成一张表,被分成多个分区。一个partition可以看作一个并行度。
2.每个partition内部消息强有序,其中的每个消息都有一个序号叫offset。每个分区内部的数据有有序的,先进先出,但是不同的partition里面的数据出来的顺序是随机的。
3.一个partition只对应一个broker,一个broker可以管多个partition。
4.消息不经过内存缓冲,直接写入文件。
5.根据时间策略删除,而不是消费完就删除
6.producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略
7.consumer自己维护消费到哪个offset
8.每个consumer都有对应的group
9.group内是queue消费模型 :
各个consumer消费不同的partition
因此一个消息在group内只消费一次
10.group间是publish-subscribe消费模型
各个group各自独立消费,互不影响
因此一个消息在被每个group消费一次
1.输入命令
kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata
--replication-factor ---每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量
--partition --分区数, 根据数据量设置
--zookeeper zk的地址,将topic的元数据保存在zookeeper中
2.在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic
1.输入命令
kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181/kafka --topic bigdata
partition的数字是分区编号,leader后面的数字是节点,后面是存储的分区编号,但是不是匹配的,这里的leader0的分区编号是0与1,leader1的分区编号是1与2,leader2的分区是0与2.
kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka
2.__consumer_offsetsL kafka用于保存消费便宜量的topic
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata
- # 1、保存的文件
- /usr/local/soft/kafka_2.11-1.0.0/data
-
- # 2,每一个分区每一个副本对应一个目录
-
- # 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
- 00000000000000000000.log
- 00000000000000000001.log
- 00000000000000000002.log
-
- # 4、滚动生成文件的策略
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
-
- # 5、文件删除的策略,默认时7天,以文件为单位删除
- log.retention.hours=168
1.先在配置文件vim config/server.properties加一行代码delete.topic.enable=true
再执行命令:
kafka-topics.sh --delete --topic students_hash --zookeeper master:2181,node1:2181,node2:2181/kafka
1.先需要创建Properties对象,并设置broker列表以及kv的数据类型
- Properties properties = new Properties();
-
- //指定broker列表
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
-
- //指定key和value的数据格式
- properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
2.创建KafkaProducer对象,泛型是上面kv的数据类型,里面传入Properties的对象
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
3.生产数据:使用KafkaProducer对象中的send方法,里面传入ProducerRecord的对象。前面是topic,后面是数据
producer.send(new ProducerRecord<>("words","java"));
4.刷新并关闭
5.查看去命令行查看,相关命令去看使用Kafka
6.全部代码
- package com.shujia.kafka;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.util.Properties;
-
- public class Demo2StudentToKafka {
- public static void main(String[] args)throws Exception {
-
- Properties properties = new Properties();
-
- //指定broker列表
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
-
- //指定key和value的数据格式
- properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer
producer = new KafkaProducer(properties); -
- BufferedReader br = new BufferedReader(new FileReader("flink/data/students.csv"));
- String line =null;
- while ((line=br.readLine())!=null){
- producer.send(new ProducerRecord<>("students",line));
- producer.flush();
- }
-
- br.close();
- producer.close();
-
- }
- }
7.hash分区代码
这个分区是先在命令行中已经分好的,然后通过某一个字段的哈希值值分组
kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash
- package com.shujia.kafka;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.util.Properties;
-
- public class Demo4StudentToKafka {
- public static void main(String[] args)throws Exception {
-
- Properties properties = new Properties();
-
- //指定broker列表
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
-
- //指定key和value的数据格式
- properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<String, String>(properties);
-
- BufferedReader br = new BufferedReader(new FileReader("flink/data/students.csv"));
- String line =null;
- while ((line=br.readLine())!=null){
-
- //通过hash分区,hash分区就是将那个字段的hash值对分区数取余
- String clazz = line.split(",")[4];
- int partition = Math.abs(clazz.hashCode()) % 3;
- //kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash
-
-
- //ProducerRecord(String topic, Integer partition, K key, V value)
- producer.send(new ProducerRecord<>("students_hash",partition,null,line));
- producer.flush();
- }
-
- br.close();
- producer.close();
-
- }
- }
1. 先需要创建Properties对象,并设置broker列表,kv的数据类型,读取的方式,以及指定消费者组
其中的earliest是全局消费,latest是提交一次offset消费一次,如果不指定,默认是latest
- Properties properties = new Properties();
- //kafka 集群列表
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
-
- //读取数据的格式
- properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- /*
- * earliest
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- * latest 默认
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
- * none
- * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- *
- */
- properties.setProperty("auto.offset.reset", "earliest");
- //指定消费者组,一条数据在一个组内只消费一次
- properties.setProperty("group.id", "asdsada");
2.创建KafkaConsume对象,泛型是上面kv的数据类型,里面传入Properties的对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
3.订阅topic,封装成一个集合。再使用KafkaConsume对象的subscribe方法,里面传入的是集合
- //订阅topic
- ArrayList<String> topics = new ArrayList<>();
- topics.add("students");
- //subscribe方法里面传的是一个集合,所以要把topic封装成一个集合对象
- consumer.subscribe(topics);
4.设置消费的间隔时间,获取一个迭代器
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
5.遍历迭代器获取里面的元素,包括topic,partition,offset,value,timestamp等等
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- String topic = consumerRecord.topic();
- int partition = consumerRecord.partition();
- long offset = consumerRecord.offset();
- String value = consumerRecord.value();
- long timestamp = consumerRecord.timestamp();
- System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
-
- }
6.一般一组只能取500个,想获取完要么改分组名,要么改成死循环,让他一直消费,只要有数据产生,就能一直消费
7.一直取的代码
- package com.shujia.kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
- import java.util.ArrayList;
- import java.util.Properties;
-
- public class Demo3Consumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- //kafka 集群列表
- properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
-
- //读取数据的格式
- properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- /*
- * earliest
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- * latest 默认
- * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
- * none
- * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
- *
- */
- properties.setProperty("auto.offset.reset", "earliest");
- //指定消费者组,一条数据在一个组内只消费一次
- properties.setProperty("group.id", "asdsada");
-
- //创建消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- //订阅topic
- ArrayList<String> topics = new ArrayList<>();
- topics.add("students");
- //subscribe方法里面传的是一个集合,所以要把topic封装成一个集合对象
- consumer.subscribe(topics);
-
-
- while (true){
- ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- String topic = consumerRecord.topic();
- int partition = consumerRecord.partition();
- long offset = consumerRecord.offset();
- String value = consumerRecord.value();
- long timestamp = consumerRecord.timestamp();
- System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
-
- }
- }
-
-
-
-
- }
- }
1.见代码
- package com.shujia.flink.kafka;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.connector.kafka.source.KafkaSource;
- import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class Demo1KafkaSource {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //创建kafka source
- KafkaSource<String> source = KafkaSource.<String>builder()
- .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
- .setTopics("students")//指定消费的topic
- // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
- //.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
- // 从最早位点开始消费
- //.setStartingOffsets(OffsetsInitializer.earliest())
- // 从最末尾位点开始消费
- // .setStartingOffsets(OffsetsInitializer.latest())
- .setStartingOffsets(OffsetsInitializer.earliest())
- .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
- .build();
-
- //使用kafka source
- DataStreamSource<String> studentsDS = env
- .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
-
- studentsDS.print();
-
- env.execute();
- }
- }
- package com.shujia.flink.kafka;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.connector.base.DeliveryGuarantee;
- import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
- import org.apache.flink.connector.kafka.sink.KafkaSink;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class Demo2KafkaSink {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<String> carsDS = env.readTextFile("flink/data/cars_sample.json");
-
- //创建kafka sink
- KafkaSink<String> sink = KafkaSink.<String>builder()
- .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
- .setRecordSerializer(KafkaRecordSerializationSchema.builder()
- .setTopic("cars")//指定topic
- .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
- .build()
- )
- //指定数据处理的语义
- .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .build();
-
- //使用kafka sink
- carsDS.sinkTo(sink);
-
- env.execute();
- }
- }
1.导入依赖,将flink-sql-connector-kafka-1.15.2.jar放到flink/lib目录下
2.执行命令随便选一个
flink run-application -t yarn-application -c com.shujia.flink.kafka.Demo3Cars flink-1.0.jar